1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4from __future__ import absolute_import
5import itertools
6import mpd
7import mpd.asyncio
8import os
9import socket
10import sys
11import types
12import warnings
13
14import unittest
15
16try:
17    from twisted.python.failure import Failure
18
19    TWISTED_MISSING = False
20except ImportError:
21    warnings.warn(
22        "No twisted installed: skip twisted related tests! "
23        + "(twisted is not available for python >= 3.0 && python < 3.3)"
24    )
25    TWISTED_MISSING = True
26
27import asyncio
28
29try:
30    import mock
31except ImportError:
32    print("Please install mock from PyPI to run tests!")
33    sys.exit(1)
34
35# show deprecation warnings
36warnings.simplefilter("default")
37
38
39TEST_MPD_HOST, TEST_MPD_PORT = ("example.com", 10000)
40TEST_MPD_UNIXHOST = "/example/test/host"
41TEST_MPD_UNIXTIMEOUT = 0.5
42
43
44class TestMPDClient(unittest.TestCase):
45
46    longMessage = True
47
48    def setUp(self):
49        self.socket_patch = mock.patch("mpd.base.socket")
50        self.socket_mock = self.socket_patch.start()
51        self.socket_mock.getaddrinfo.return_value = [range(5)]
52
53        self.socket_mock.socket.side_effect = (
54            lambda *a, **kw:
55            # Create a new socket.socket() mock with default attributes,
56            # each time we are calling it back (otherwise, it keeps set
57            # attributes across calls).
58            # That's probablyy what we want, since reconnecting is like
59            # reinitializing the entire connection, and so, the mock.
60            mock.MagicMock(name="socket.socket")
61        )
62
63        self.client = mpd.MPDClient()
64        self.client.connect(TEST_MPD_HOST, TEST_MPD_PORT)
65        self.client._sock.reset_mock()
66        self.MPDWillReturn("ACK don't forget to setup your mock\n")
67
68    def tearDown(self):
69        self.socket_patch.stop()
70
71    def MPDWillReturn(self, *lines):
72        # Return what the caller wants first, then do as if the socket was
73        # disconnected.
74        innerIter = itertools.chain(lines, itertools.repeat(""))
75        if sys.version_info >= (3, 0):
76            self.client._rbfile.readline.side_effect = (
77                x.encode("utf-8") for x in innerIter
78            )
79        else:
80            self.client._rbfile.readline.side_effect = innerIter
81
82    def assertMPDReceived(self, *lines):
83        self.client._wfile.write.assert_called_with(*lines)
84
85    def test_abstract_functions(self):
86        MPDClientBase = mpd.base.MPDClientBase
87        self.assertRaises(
88            NotImplementedError,
89            lambda: MPDClientBase.add_command("command_name", lambda x: x),
90        )
91        client = MPDClientBase()
92        self.assertRaises(NotImplementedError, lambda: client.noidle())
93        self.assertRaises(NotImplementedError, lambda: client.command_list_ok_begin())
94        self.assertRaises(NotImplementedError, lambda: client.command_list_end())
95
96    def test_metaclass_commands(self):
97        # just some random functions
98        self.assertTrue(hasattr(self.client, "commands"))
99        self.assertTrue(hasattr(self.client, "save"))
100        self.assertTrue(hasattr(self.client, "random"))
101        # space should be replaced
102        self.assertFalse(hasattr(self.client, "sticker get"))
103        self.assertTrue(hasattr(self.client, "sticker_get"))
104
105    def test_duplicate_tags(self):
106        self.MPDWillReturn("Track: file1\n", "Track: file2\n", "OK\n")
107        song = self.client.currentsong()
108        self.assertIsInstance(song, dict)
109        self.assertIsInstance(song["track"], list)
110        self.assertMPDReceived("currentsong\n")
111
112    def test_parse_nothing(self):
113        self.MPDWillReturn("OK\n", "OK\n")
114
115        self.assertIsNone(self.client.ping())
116        self.assertMPDReceived("ping\n")
117
118        self.assertIsNone(self.client.clearerror())
119        self.assertMPDReceived("clearerror\n")
120
121    def test_parse_list(self):
122        self.MPDWillReturn(
123            "tagtype: Artist\n", "tagtype: ArtistSort\n", "tagtype: Album\n", "OK\n"
124        )
125
126        result = self.client.tagtypes()
127        self.assertMPDReceived("tagtypes\n")
128        self.assertIsInstance(result, list)
129        self.assertEqual(result, ["Artist", "ArtistSort", "Album",])
130
131    def test_parse_list_groups(self):
132        self.MPDWillReturn(
133            "Album: \n",
134            "Album: 20th_Century_Masters_The_Millenium_Collection\n",
135            "Album: Aerosmith's Greatest Hits\n",
136            "OK\n",
137        )
138
139        result = self.client.list("album")
140        self.assertMPDReceived('list "album"\n')
141        self.assertIsInstance(result, list)
142        self.assertEqual(
143            result,
144            [
145                {"album": ""},
146                {"album": "20th_Century_Masters_The_Millenium_Collection"},
147                {"album": "Aerosmith's Greatest Hits"},
148            ],
149        )
150
151        self.MPDWillReturn(
152            "Album: \n",
153            "Album: 20th_Century_Masters_The_Millenium_Collection\n",
154            "Artist: Eric Clapton\n",
155            "Album: Aerosmith's Greatest Hits\n",
156            "Artist: Aerosmith\n",
157            "OK\n",
158        )
159
160        result = self.client.list("album", "group", "artist")
161        self.assertMPDReceived('list "album" "group" "artist"\n')
162        self.assertIsInstance(result, list)
163        self.assertEqual(
164            result,
165            [
166                {"album": ""},
167                {
168                    "album": "20th_Century_Masters_The_Millenium_Collection",
169                    "artist": "Eric Clapton",
170                },
171                {"album": "Aerosmith's Greatest Hits", "artist": "Aerosmith"},
172            ],
173        )
174
175    def test_parse_item(self):
176        self.MPDWillReturn("updating_db: 42\n", "OK\n")
177        self.assertIsNotNone(self.client.update())
178
179    def test_parse_object(self):
180        # XXX: _read_objects() doesn't wait for the final OK
181        self.MPDWillReturn("volume: 63\n", "OK\n")
182        status = self.client.status()
183        self.assertMPDReceived("status\n")
184        self.assertIsInstance(status, dict)
185
186        # XXX: _read_objects() doesn't wait for the final OK
187        self.MPDWillReturn("OK\n")
188        stats = self.client.stats()
189        self.assertMPDReceived("stats\n")
190        self.assertIsInstance(stats, dict)
191
192    def test_parse_songs(self):
193        self.MPDWillReturn("file: my-song.ogg\n", "Pos: 0\n", "Id: 66\n", "OK\n")
194        playlist = self.client.playlistinfo()
195
196        self.assertMPDReceived("playlistinfo\n")
197        self.assertIsInstance(playlist, list)
198        self.assertEqual(1, len(playlist))
199        e = playlist[0]
200        self.assertIsInstance(e, dict)
201        self.assertEqual("my-song.ogg", e["file"])
202        self.assertEqual("0", e["pos"])
203        self.assertEqual("66", e["id"])
204
205    def test_readcomments(self):
206        self.MPDWillReturn(
207            "major_brand: M4V\n", "minor_version: 1\n", "lyrics: Lalala\n", "OK\n"
208        )
209        comments = self.client.readcomments()
210        self.assertMPDReceived("readcomments\n")
211        self.assertEqual(comments["major_brand"], "M4V")
212        self.assertEqual(comments["minor_version"], "1")
213        self.assertEqual(comments["lyrics"], "Lalala")
214
215    def test_iterating(self):
216        self.MPDWillReturn("file: my-song.ogg\n", "Pos: 0\n", "Id: 66\n", "OK\n")
217        self.client.iterate = True
218        playlist = self.client.playlistinfo()
219        self.assertMPDReceived("playlistinfo\n")
220        self.assertIsInstance(playlist, types.GeneratorType)
221        for song in playlist:
222            self.assertIsInstance(song, dict)
223            self.assertEqual("my-song.ogg", song["file"])
224            self.assertEqual("0", song["pos"])
225            self.assertEqual("66", song["id"])
226
227    def test_add_and_remove_command(self):
228        self.MPDWillReturn("ACK awesome command\n")
229
230        self.client.add_command("awesome command", mpd.MPDClient._parse_nothing)
231        self.assertTrue(hasattr(self.client, "awesome_command"))
232        # should be unknown by mpd
233        self.assertRaises(mpd.CommandError, self.client.awesome_command)
234
235        self.client.remove_command("awesome_command")
236        self.assertFalse(hasattr(self.client, "awesome_command"))
237
238        # remove non existing command
239        self.assertRaises(ValueError, self.client.remove_command, "awesome_command")
240
241    def test_partitions(self):
242        self.MPDWillReturn("partition: default\n", "partition: partition2\n", "OK\n")
243        partitions = self.client.listpartitions()
244        self.assertMPDReceived("listpartitions\n")
245        self.assertEqual(
246            [
247                {"partition": "default"},
248                {"partition": "partition2"},
249            ],
250            partitions
251        )
252
253        self.MPDWillReturn("OK\n")
254        self.assertIsNone(self.client.newpartition("Another Partition"))
255        self.assertMPDReceived('newpartition "Another Partition"\n')
256
257        self.MPDWillReturn("OK\n")
258        self.assertIsNone(self.client.partition("Another Partition"))
259        self.assertMPDReceived('partition "Another Partition"\n')
260
261        self.MPDWillReturn("OK\n")
262        self.assertIsNone(self.client.delpartition("Another Partition"))
263        self.assertMPDReceived('delpartition "Another Partition"\n')
264
265        self.MPDWillReturn("OK\n")
266        self.assertIsNone(self.client.moveoutput("My ALSA Device"))
267        self.assertMPDReceived('moveoutput "My ALSA Device"\n')
268
269    def test_client_to_client(self):
270        # client to client is at this time in beta!
271
272        self.MPDWillReturn("OK\n")
273        self.assertIsNone(self.client.subscribe("monty"))
274        self.assertMPDReceived('subscribe "monty"\n')
275
276        self.MPDWillReturn("channel: monty\n", "OK\n")
277        channels = self.client.channels()
278        self.assertMPDReceived("channels\n")
279        self.assertEqual(["monty"], channels)
280
281        self.MPDWillReturn("OK\n")
282        self.assertIsNone(self.client.sendmessage("monty", "SPAM"))
283        self.assertMPDReceived('sendmessage "monty" "SPAM"\n')
284
285        self.MPDWillReturn("channel: monty\n", "message: SPAM\n", "OK\n")
286        msg = self.client.readmessages()
287        self.assertMPDReceived("readmessages\n")
288        self.assertEqual(msg, [{"channel": "monty", "message": "SPAM"}])
289
290        self.MPDWillReturn("OK\n")
291        self.assertIsNone(self.client.unsubscribe("monty"))
292        self.assertMPDReceived('unsubscribe "monty"\n')
293
294        self.MPDWillReturn("OK\n")
295        channels = self.client.channels()
296        self.assertMPDReceived("channels\n")
297        self.assertEqual([], channels)
298
299    def test_unicode_as_command_args(self):
300        self.MPDWillReturn("OK\n")
301        res = self.client.find("file", "☯☾☝♖✽")
302        self.assertIsInstance(res, list)
303        self.assertMPDReceived('find "file" "☯☾☝♖✽"\n')
304
305    def test_numbers_as_command_args(self):
306        self.MPDWillReturn("OK\n")
307        self.client.find("file", 1)
308        self.assertMPDReceived('find "file" "1"\n')
309
310    def test_commands_without_callbacks(self):
311        self.MPDWillReturn("\n")
312        self.client.close()
313        self.assertMPDReceived("close\n")
314
315        # XXX: what are we testing here?
316        #      looks like reconnection test?
317        self.client._reset()
318        self.client.connect(TEST_MPD_HOST, TEST_MPD_PORT)
319
320    def test_set_timeout_on_client(self):
321        self.client.timeout = 1
322        self.client._sock.settimeout.assert_called_with(1)
323        self.assertEqual(self.client.timeout, 1)
324
325        self.client.timeout = None
326        self.client._sock.settimeout.assert_called_with(None)
327        self.assertEqual(self.client.timeout, None)
328
329    def test_set_timeout_from_connect(self):
330        self.client.disconnect()
331        with warnings.catch_warnings(record=True) as w:
332            self.client.connect("example.com", 10000, timeout=5)
333            self.client._sock.settimeout.assert_called_with(5)
334            self.assertEqual(len(w), 1)
335            self.assertIn("Use MPDClient.timeout", str(w[0].message))
336
337    @unittest.skipIf(
338        sys.version_info < (3, 3), "BrokenPipeError was introduced in python 3.3"
339    )
340    def test_broken_pipe_error(self):
341        self.MPDWillReturn("volume: 63\n", "OK\n")
342        self.client._wfile.write.side_effect = BrokenPipeError
343        self.socket_mock.error = Exception
344
345        with self.assertRaises(mpd.ConnectionError):
346            self.client.status()
347
348    def test_connection_lost(self):
349        # Simulate a connection lost: the socket returns empty strings
350        self.MPDWillReturn("")
351        self.socket_mock.error = Exception
352
353        with self.assertRaises(mpd.ConnectionError):
354            self.client.status()
355            self.socket_mock.unpack.assert_called()
356
357        # consistent behaviour, solves bug #11 (github)
358        with self.assertRaises(mpd.ConnectionError):
359            self.client.status()
360            self.socket_mock.unpack.assert_called()
361
362        self.assertIs(self.client._sock, None)
363
364    @unittest.skipIf(
365        sys.version_info < (3, 0),
366        "Automatic decoding/encoding from the socket is only " "available in Python 3",
367    )
368    def test_force_socket_encoding_and_nonbuffering(self):
369        # Force the reconnection to refill the mock
370        self.client.disconnect()
371        self.client.connect(TEST_MPD_HOST, TEST_MPD_PORT)
372        self.assertEqual(
373            [
374                mock.call("rb", newline="\n"),
375                mock.call("w", encoding="utf-8", newline="\n"),
376            ],
377            # We are only interested into the 2 first entries,
378            # otherwise we get all the readline() & co...
379            self.client._sock.makefile.call_args_list[0:2],
380        )
381
382    def test_ranges_as_argument(self):
383        self.MPDWillReturn("OK\n")
384        self.client.move((1, 2), 2)
385        self.assertMPDReceived('move "1:2" "2"\n')
386
387        self.MPDWillReturn("OK\n")
388        self.client.move((1,), 2)
389        self.assertMPDReceived('move "1:" "2"\n')
390
391        # old code still works!
392        self.MPDWillReturn("OK\n")
393        self.client.move("1:2", 2)
394        self.assertMPDReceived('move "1:2" "2"\n')
395
396        # empty ranges
397        self.MPDWillReturn("OK\n")
398        self.client.rangeid(1, ())
399        self.assertMPDReceived('rangeid "1" ":"\n')
400
401        with self.assertRaises(ValueError):
402            self.MPDWillReturn("OK\n")
403            self.client.move((1, "garbage"), 2)
404            self.assertMPDReceived('move "1:" "2"\n')
405
406    def test_parse_changes(self):
407        self.MPDWillReturn(
408            "cpos: 0\n",
409            "Id: 66\n",
410            "cpos: 1\n",
411            "Id: 67\n",
412            "cpos: 2\n",
413            "Id: 68\n",
414            "cpos: 3\n",
415            "Id: 69\n",
416            "cpos: 4\n",
417            "Id: 70\n",
418            "OK\n",
419        )
420        res = self.client.plchangesposid(0)
421        self.assertEqual(
422            [
423                {"cpos": "0", "id": "66"},
424                {"cpos": "1", "id": "67"},
425                {"cpos": "2", "id": "68"},
426                {"cpos": "3", "id": "69"},
427                {"cpos": "4", "id": "70"},
428            ],
429            res,
430        )
431
432    def test_parse_database(self):
433        self.MPDWillReturn(
434            "directory: foo\n",
435            "Last-Modified: 2014-01-23T16:42:46Z\n",
436            "file: bar.mp3\n",
437            "size: 59618802\n",
438            "Last-Modified: 2014-11-02T19:57:00Z\n",
439            "OK\n",
440        )
441        self.client.listfiles("/")
442
443    def test_parse_mounts(self):
444        self.MPDWillReturn(
445            "mount: \n",
446            "storage: /home/foo/music\n",
447            "mount: foo\n",
448            "storage: nfs://192.168.1.4/export/mp3\n",
449            "OK\n",
450        )
451        res = self.client.listmounts()
452        self.assertEqual(
453            [
454                {"mount": "", "storage": "/home/foo/music"},
455                {"mount": "foo", "storage": "nfs://192.168.1.4/export/mp3"},
456            ],
457            res,
458        )
459
460    def test_parse_neighbors(self):
461        self.MPDWillReturn(
462            "neighbor: smb://FOO\n", "name: FOO (Samba 4.1.11-Debian)\n", "OK\n"
463        )
464        res = self.client.listneighbors()
465        self.assertEqual(
466            [{"name": "FOO (Samba 4.1.11-Debian)", "neighbor": "smb://FOO"}], res
467        )
468
469    def test_parse_outputs(self):
470        self.MPDWillReturn(
471            "outputid: 0\n",
472            "outputname: My ALSA Device\n",
473            "outputenabled: 0\n",
474            "OK\n",
475        )
476        res = self.client.outputs()
477        self.assertEqual(
478            [{"outputenabled": "0", "outputid": "0", "outputname": "My ALSA Device"}],
479            res,
480        )
481
482    def test_parse_playlist(self):
483        self.MPDWillReturn(
484            "0:file: Weezer - Say It Ain't So.mp3\n",
485            "1:file: Dire Straits - Walk of Life.mp3\n",
486            "2:file: 01 - Love Delicatessen.mp3\n",
487            "3:file: Guns N' Roses - Paradise City.mp3\n",
488            "4:file: Nirvana - Lithium.mp3\n",
489            "OK\n",
490        )
491        res = self.client.playlist()
492        self.assertEqual(
493            [
494                "file: Weezer - Say It Ain't So.mp3",
495                "file: Dire Straits - Walk of Life.mp3",
496                "file: 01 - Love Delicatessen.mp3",
497                "file: Guns N' Roses - Paradise City.mp3",
498                "file: Nirvana - Lithium.mp3",
499            ],
500            res,
501        )
502
503    def test_parse_playlists(self):
504        self.MPDWillReturn(
505            "playlist: Playlist\n", "Last-Modified: 2016-08-13T10:55:56Z\n", "OK\n"
506        )
507        res = self.client.listplaylists()
508        self.assertEqual(
509            [{"last-modified": "2016-08-13T10:55:56Z", "playlist": "Playlist"}], res
510        )
511
512    def test_parse_plugins(self):
513        self.MPDWillReturn(
514            "plugin: vorbis\n",
515            "suffix: ogg\n",
516            "suffix: oga\n",
517            "mime_type: application/ogg\n",
518            "mime_type: application/x-ogg\n",
519            "mime_type: audio/ogg\n",
520            "mime_type: audio/vorbis\n",
521            "mime_type: audio/vorbis+ogg\n",
522            "mime_type: audio/x-ogg\n",
523            "mime_type: audio/x-vorbis\n",
524            "mime_type: audio/x-vorbis+ogg\n",
525            "OK\n",
526        )
527        res = self.client.decoders()
528        self.assertEqual(
529            [
530                {
531                    "mime_type": [
532                        "application/ogg",
533                        "application/x-ogg",
534                        "audio/ogg",
535                        "audio/vorbis",
536                        "audio/vorbis+ogg",
537                        "audio/x-ogg",
538                        "audio/x-vorbis",
539                        "audio/x-vorbis+ogg",
540                    ],
541                    "plugin": "vorbis",
542                    "suffix": ["ogg", "oga"],
543                }
544            ],
545            list(res),
546        )
547
548    def test_parse_raw_stickers(self):
549        self.MPDWillReturn("sticker: foo=bar\n", "OK\n")
550        res = self.client._parse_raw_stickers(self.client._read_lines())
551        self.assertEqual([("foo", "bar")], list(res))
552
553        self.MPDWillReturn("sticker: foo=bar\n", "sticker: l=b\n", "OK\n")
554        res = self.client._parse_raw_stickers(self.client._read_lines())
555        self.assertEqual([("foo", "bar"), ("l", "b")], list(res))
556
557    def test_parse_raw_sticker_with_special_value(self):
558        self.MPDWillReturn("sticker: foo==uv=vu\n", "OK\n")
559        res = self.client._parse_raw_stickers(self.client._read_lines())
560        self.assertEqual([("foo", "=uv=vu")], list(res))
561
562    def test_parse_sticket_get_one(self):
563        self.MPDWillReturn("sticker: foo=bar\n", "OK\n")
564        res = self.client.sticker_get("song", "baz", "foo")
565        self.assertEqual("bar", res)
566
567    def test_parse_sticket_get_no_sticker(self):
568        self.MPDWillReturn("ACK [50@0] {sticker} no such sticker\n")
569        self.assertRaises(
570            mpd.CommandError, self.client.sticker_get, "song", "baz", "foo"
571        )
572
573    def test_parse_sticker_list(self):
574        self.MPDWillReturn("sticker: foo=bar\n", "sticker: lom=bok\n", "OK\n")
575        res = self.client.sticker_list("song", "baz")
576        self.assertEqual({"foo": "bar", "lom": "bok"}, res)
577
578        # Even with only one sticker, we get a dict
579        self.MPDWillReturn("sticker: foo=bar\n", "OK\n")
580        res = self.client.sticker_list("song", "baz")
581        self.assertEqual({"foo": "bar"}, res)
582
583    def test_command_list(self):
584        self.MPDWillReturn(
585            "list_OK\n",
586            "list_OK\n",
587            "list_OK\n",
588            "list_OK\n",
589            "list_OK\n",
590            "volume: 100\n",
591            "repeat: 1\n",
592            "random: 1\n",
593            "single: 0\n",
594            "consume: 0\n",
595            "playlist: 68\n",
596            "playlistlength: 5\n",
597            "mixrampdb: 0.000000\n",
598            "state: play\n",
599            "xfade: 5\n",
600            "song: 0\n",
601            "songid: 56\n",
602            "time: 0:259\n",
603            "elapsed: 0.000\n",
604            "bitrate: 0\n",
605            "nextsong: 2\n",
606            "nextsongid: 58\n",
607            "list_OK\n",
608            "OK\n",
609        )
610        self.client.command_list_ok_begin()
611        self.client.clear()
612        self.client.load("Playlist")
613        self.client.random(1)
614        self.client.repeat(1)
615        self.client.play(0)
616        self.client.status()
617        res = self.client.command_list_end()
618        self.assertEqual(None, res[0])
619        self.assertEqual(None, res[1])
620        self.assertEqual(None, res[2])
621        self.assertEqual(None, res[3])
622        self.assertEqual(None, res[4])
623        self.assertEqual(
624            [
625                ("bitrate", "0"),
626                ("consume", "0"),
627                ("elapsed", "0.000"),
628                ("mixrampdb", "0.000000"),
629                ("nextsong", "2"),
630                ("nextsongid", "58"),
631                ("playlist", "68"),
632                ("playlistlength", "5"),
633                ("random", "1"),
634                ("repeat", "1"),
635                ("single", "0"),
636                ("song", "0"),
637                ("songid", "56"),
638                ("state", "play"),
639                ("time", "0:259"),
640                ("volume", "100"),
641                ("xfade", "5"),
642            ],
643            sorted(res[5].items()),
644        )
645
646
647# MPD client tests which do not mock the socket, but rather replace it
648# with a real socket from a socket
649@unittest.skipIf(
650    not hasattr(socket, "socketpair"), "Socketpair is not supported on this platform"
651)
652class TestMPDClientSocket(unittest.TestCase):
653
654    longMessage = True
655
656    def setUp(self):
657        self.connect_patch = mock.patch("mpd.MPDClient._connect_unix")
658        self.connect_mock = self.connect_patch.start()
659
660        test_socketpair = socket.socketpair()
661
662        self.connect_mock.return_value = test_socketpair[0]
663        self.server_socket = test_socketpair[1]
664        self.server_socket_reader = self.server_socket.makefile("rb")
665        self.server_socket_writer = self.server_socket.makefile("wb")
666
667        self.MPDWillReturnBinary(b"OK MPD 0.21.24\n")
668
669        self.client = mpd.MPDClient()
670        self.client.connect(TEST_MPD_UNIXHOST)
671        self.client.timeout = TEST_MPD_UNIXTIMEOUT
672
673        self.connect_mock.assert_called_once()
674
675    def tearDown(self):
676        self.close_server_socket()
677        self.connect_patch.stop()
678
679    def close_server_socket(self):
680        self.server_socket_reader.close()
681        self.server_socket_writer.close()
682        self.server_socket.close()
683
684    def MPDWillReturnBinary(self, byteStr):
685        self.server_socket_writer.write(byteStr)
686        self.server_socket_writer.flush()
687
688    def assertMPDReceived(self, byteStr):
689        """
690        Assert MPD received the given bytestring.
691        Note: this disconnects the client.
692        """
693        # to ensure we don't block, close the socket on client side
694        self.client.disconnect()
695
696        # read one extra to ensure nothing extraneous was written
697        received = self.server_socket_reader.read(len(byteStr) + 1)
698
699        self.assertEqual(received, byteStr)
700
701    def test_readbinary_error(self):
702        self.MPDWillReturnBinary(b"ACK [50@0] {albumart} No file exists\n")
703
704        self.assertRaises(
705            mpd.CommandError, lambda: self.client.albumart("a/full/path.mp3")
706        )
707
708        self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n')
709
710    def test_binary_albumart_disconnect_afterchunk(self):
711        self.MPDWillReturnBinary(b"size: 17\nbinary: 3\n" b"\x00\x00\x00\nOK\n")
712
713        # we're expecting a timeout
714        self.assertRaises(
715            socket.timeout, lambda: self.client.albumart("a/full/path.mp3")
716        )
717
718        self.assertMPDReceived(
719            b'albumart "a/full/path.mp3" "0"\nalbumart "a/full/path.mp3" "3"\n'
720        )
721        self.assertIs(self.client._sock, None)
722
723    def test_binary_albumart_disconnect_midchunk(self):
724        self.MPDWillReturnBinary(b"size: 8\nbinary: 8\n\x00\x01\x02\x03")
725
726        # we're expecting a timeout or error of some form
727        self.assertRaises(
728            socket.timeout, lambda: self.client.albumart("a/full/path.mp3")
729        )
730
731        self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n')
732        self.assertIs(self.client._sock, None)
733
734    def test_binary_albumart_singlechunk_networkmultiwrite(self):
735        # length 16
736        expected_binary = (
737            b"\xA0\xA1\xA2\xA3\xA4\xA5\xA6\xA7\xA8\xA9\xAA\xAB\xAC\xAD\xAE\xAF"
738        )
739
740        self.MPDWillReturnBinary(b"binary: 16\n")
741        self.MPDWillReturnBinary(expected_binary[0:4])
742        self.MPDWillReturnBinary(expected_binary[4:9])
743        self.MPDWillReturnBinary(expected_binary[9:14])
744        self.MPDWillReturnBinary(expected_binary[14:16])
745        self.MPDWillReturnBinary(b"\nOK\n")
746
747        real_binary = self.client.albumart("a/full/path.mp3")
748        self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n')
749        self.assertEqual(real_binary, {"binary": expected_binary})
750
751    def test_binary_albumart_singlechunk_nosize(self):
752        # length: 16
753        expected_binary = (
754            b"\x01\x02\x00\x03\x04\x00\xFF\x05\x07\x08\x0A\x0F\xF0\xA5\x00\x01"
755        )
756
757        self.MPDWillReturnBinary(b"binary: 16\n" + expected_binary + b"\nOK\n")
758
759        real_binary = self.client.albumart("a/full/path.mp3")
760        self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n')
761        self.assertEqual(real_binary, {"binary": expected_binary})
762
763    def test_binary_albumart_singlechunk_sizeheader(self):
764        # length: 16
765        expected_binary = (
766            b"\x01\x02\x00\x03\x04\x00\xFF\x05\x07\x08\x0A\x0F\xF0\xA5\x00\x01"
767        )
768
769        self.MPDWillReturnBinary(
770            b"size: 16\nbinary: 16\n" + expected_binary + b"\nOK\n"
771        )
772
773        real_binary = self.client.albumart("a/full/path.mp3")
774        self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n')
775        self.assertEqual(real_binary, {"binary": expected_binary})
776
777    def test_binary_albumart_even_multichunk(self):
778        # length: 16 each
779        expected_chunk1 = (
780            b"\x01\x02\x00\x03\x04\x00\xFF\x05\x07\x08\x0A\x0F\xF0\xA5\x00\x01"
781        )
782        expected_chunk2 = (
783            b"\x0A\x0B\x0C\x0D\x0E\x0F\x10\x1F\x2F\x2D\x33\x0D\x00\x00\x11\x13"
784        )
785        expected_chunk3 = (
786            b"\x99\x88\x77\xDD\xD0\xF0\x20\x70\x71\x17\x13\x31\xFF\xFF\xDD\xFF"
787        )
788        expected_binary = expected_chunk1 + expected_chunk2 + expected_chunk3
789
790        # 3 distinct commands expected
791        self.MPDWillReturnBinary(
792            b"size: 48\nbinary: 16\n"
793            + expected_chunk1
794            + b"\nOK\nsize: 48\nbinary: 16\n"
795            + expected_chunk2
796            + b"\nOK\nsize: 48\nbinary: 16\n"
797            + expected_chunk3
798            + b"\nOK\n"
799        )
800
801        real_binary = self.client.albumart("a/full/path.mp3")
802
803        self.assertMPDReceived(
804            b'albumart "a/full/path.mp3" "0"\nalbumart "a/full/path.mp3" "16"'
805            b'\nalbumart "a/full/path.mp3" "32"\n'
806        )
807        self.assertEqual(real_binary, {"binary": expected_binary})
808
809    def test_binary_albumart_odd_multichunk(self):
810        # lengths: 17, 15, 1
811        expected_chunk1 = (
812            b"\x01\x02\x00\x03\x04\x00\xFF\x05\x07\x08\x0A\x0F\xF0\xA5\x00\x01\x13"
813        )
814        expected_chunk2 = (
815            b"\x0A\x0B\x0C\x0D\x0E\x0F\x10\x1F\x2F\x2D\x33\x0D\x00\x00\x11"
816        )
817        expected_chunk3 = b"\x99"
818        expected_binary = expected_chunk1 + expected_chunk2 + expected_chunk3
819
820        # 3 distinct commands expected
821        self.MPDWillReturnBinary(
822            b"size: 33\nbinary: 17\n"
823            + expected_chunk1
824            + b"\nOK\nsize: 33\nbinary: 15\n"
825            + expected_chunk2
826            + b"\nOK\nsize: 33\nbinary: 1\n"
827            + expected_chunk3
828            + b"\nOK\n"
829        )
830
831        real_binary = self.client.albumart("a/full/path.mp3")
832        self.assertMPDReceived(
833            b'albumart "a/full/path.mp3" "0"\nalbumart "a/full/path.mp3" "17"\n'
834            b'albumart "a/full/path.mp3" "32"\n'
835        )
836        self.assertEqual(real_binary, {"binary": expected_binary})
837
838    # MPD server can return empty response if a file exists but is empty
839    def test_binary_albumart_emptyresponse(self):
840        self.MPDWillReturnBinary(b"size: 0\nbinary: 0\n\nOK\n")
841
842        real_binary = self.client.albumart("a/full/path.mp3")
843        self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n')
844        self.assertEqual(real_binary, {"binary": b""})
845
846    # readpicture returns empty object if the song exists but has no picture
847    def test_binary_readpicture_emptyresponse(self):
848        self.MPDWillReturnBinary(b"OK\n")
849
850        real_binary = self.client.readpicture("plainsong.mp3")
851        self.assertMPDReceived(b'readpicture "plainsong.mp3" "0"\n')
852        self.assertEqual(real_binary, {})
853
854    def test_binary_readpicture_untyped(self):
855        # length: 16 each
856        expected_chunk1 = (
857            b"\x01\x02\x00\x03\x04\x00\xFF\x05\x07\x08\x0A\x0F\xF0\xA5\x00\x01"
858        )
859        expected_chunk2 = (
860            b"\x0A\x0B\x0C\x0D\x0E\x0F\x10\x1F\x2F\x2D\x33\x0D\x00\x00\x11\x13"
861        )
862        expected_chunk3 = (
863            b"\x99\x88\x77\xDD\xD0\xF0\x20\x70\x71\x17\x13\x31\xFF\xFF\xDD\xFF"
864        )
865        expected_binary = expected_chunk1 + expected_chunk2 + expected_chunk3
866
867        # 3 distinct commands expected
868        self.MPDWillReturnBinary(
869            b"size: 48\nbinary: 16\n"
870            + expected_chunk1
871            + b"\nOK\nsize: 48\nbinary: 16\n"
872            + expected_chunk2
873            + b"\nOK\nsize: 48\nbinary: 16\n"
874            + expected_chunk3
875            + b"\nOK\n"
876        )
877
878        real_binary = self.client.readpicture("a/full/path.mp3")
879
880        self.assertMPDReceived(
881            b'readpicture "a/full/path.mp3" "0"\nreadpicture "a/full/path.mp3" "16"'
882            b'\nreadpicture "a/full/path.mp3" "32"\n'
883        )
884        self.assertEqual(real_binary, {"binary": expected_binary})
885
886    def test_binary_readpicture_typed(self):
887        # length: 16 each
888        expected_binary = bytes(range(48))
889
890        # 3 distinct commands expected
891        self.MPDWillReturnBinary(
892            b"size: 48\ntype: image/png\nbinary: 16\n"
893            + expected_binary[0:16]
894            + b"\nOK\nsize: 48\ntype: image/png\nbinary: 16\n"
895            + expected_binary[16:32]
896            + b"\nOK\nsize: 48\ntype: image/png\nbinary: 16\n"
897            + expected_binary[32:48]
898            + b"\nOK\n"
899        )
900
901        real_binary = self.client.readpicture("a/full/path.mp3")
902
903        self.assertMPDReceived(
904            b'readpicture "a/full/path.mp3" "0"\nreadpicture "a/full/path.mp3" "16"'
905            b'\nreadpicture "a/full/path.mp3" "32"\n'
906        )
907        self.assertEqual(real_binary, {"binary": expected_binary, "type": "image/png"})
908
909    def test_binary_readpicture_badheaders(self):
910        expected_binary = bytes(range(32))
911
912        # inconsistent type header from response 1 to response 2
913        # exception is expected
914        self.MPDWillReturnBinary(
915            b"size: 32\ntype: image/jpeg\nbinary: 16\n"
916            + expected_binary[0:16]
917            + b"\nOK\nsize: 32\ntype: image/png\nbinary: 16\n"
918            + expected_binary[16:32]
919            + b"\nOK\n"
920        )
921
922        self.assertRaises(
923            mpd.CommandError, lambda: self.client.readpicture("song.mp3")
924        )
925
926        self.assertMPDReceived(
927            b'readpicture "song.mp3" "0"\nreadpicture "song.mp3" "16"\n'
928        )
929
930
931class MockTransport(object):
932    def __init__(self):
933        self.written = list()
934
935    def clear(self):
936        self.written = list()
937
938    def write(self, data):
939        self.written.append(data)
940
941
942@unittest.skipIf(TWISTED_MISSING, "requires twisted to be installed")
943class TestMPDProtocol(unittest.TestCase):
944    def init_protocol(self, default_idle=True, idle_result=None):
945        self.protocol = mpd.MPDProtocol(
946            default_idle=default_idle, idle_result=idle_result
947        )
948        self.protocol.transport = MockTransport()
949
950    def test_create_command(self):
951        self.init_protocol(default_idle=False)
952        self.assertEqual(self.protocol._create_command("play"), b"play")
953        self.assertEqual(
954            self.protocol._create_command("rangeid", args=["1", ()]), b'rangeid "1" ":"'
955        )
956        self.assertEqual(
957            self.protocol._create_command("rangeid", args=["1", (1,)]),
958            b'rangeid "1" "1:"',
959        )
960        self.assertEqual(
961            self.protocol._create_command("rangeid", args=["1", (1, 2)]),
962            b'rangeid "1" "1:2"',
963        )
964
965    def test_success(self):
966        self.init_protocol(default_idle=False)
967
968        def success(result):
969            expected = {
970                "file": "Dire Straits - Walk of Life.mp3",
971                "artist": "Dire Straits",
972                "title": "Walk of Life",
973                "genre": "Rock/Pop",
974                "track": "3",
975                "album": "Brothers in Arms",
976                "id": "13",
977                "last-modified": "2016-08-11T10:57:03Z",
978                "pos": "4",
979                "time": "253",
980            }
981            self.assertEqual(expected, result)
982
983        self.protocol.currentsong().addCallback(success)
984        self.assertEqual([b"currentsong\n"], self.protocol.transport.written)
985
986        for line in [
987            b"file: Dire Straits - Walk of Life.mp3",
988            b"Last-Modified: 2016-08-11T10:57:03Z",
989            b"Time: 253",
990            b"Artist: Dire Straits",
991            b"Title: Walk of Life",
992            b"Album: Brothers in Arms",
993            b"Track: 3",
994            b"Genre: Rock/Pop",
995            b"Pos: 4",
996            b"Id: 13",
997            b"OK",
998        ]:
999            self.protocol.lineReceived(line)
1000
1001    def test_failure(self):
1002        self.init_protocol(default_idle=False)
1003
1004        def error(result):
1005            self.assertIsInstance(result, Failure)
1006            self.assertEqual(result.getErrorMessage(), "[50@0] {load} No such playlist")
1007
1008        self.protocol.load("Foo").addErrback(error)
1009        self.assertEqual([b'load "Foo"\n'], self.protocol.transport.written)
1010        self.protocol.lineReceived(b"ACK [50@0] {load} No such playlist")
1011
1012    def test_default_idle(self):
1013        def idle_result(result):
1014            self.assertEqual(list(result), ["player"])
1015
1016        self.init_protocol(idle_result=idle_result)
1017        self.protocol.lineReceived(b"OK MPD 0.18.0")
1018        self.assertEqual([b"idle\n"], self.protocol.transport.written)
1019        self.protocol.transport.clear()
1020        self.protocol.lineReceived(b"changed: player")
1021        self.protocol.lineReceived(b"OK")
1022        self.assertEqual([b"idle\n"], self.protocol.transport.written)
1023
1024    def test_noidle_when_default_idle(self):
1025        self.init_protocol()
1026        self.protocol.lineReceived(b"OK MPD 0.18.0")
1027        self.protocol.pause()
1028        self.protocol.lineReceived(b"OK")
1029        self.protocol.lineReceived(b"OK")
1030        self.assertEqual(
1031            [b"idle\n", b"noidle\n", b"pause\n", b"idle\n"],
1032            self.protocol.transport.written,
1033        )
1034
1035    def test_already_idle(self):
1036        self.init_protocol(default_idle=False)
1037        self.protocol.idle()
1038        self.assertRaises(mpd.CommandError, lambda: self.protocol.idle())
1039
1040    def test_already_noidle(self):
1041        self.init_protocol(default_idle=False)
1042        self.assertRaises(mpd.CommandError, lambda: self.protocol.noidle())
1043
1044    def test_command_list(self):
1045        self.init_protocol(default_idle=False)
1046
1047        def success(result):
1048            self.assertEqual([None, None], result)
1049
1050        self.protocol.command_list_ok_begin()
1051        self.protocol.play()
1052        self.protocol.stop()
1053        self.protocol.command_list_end().addCallback(success)
1054        self.assertEqual(
1055            [b"command_list_ok_begin\n", b"play\n", b"stop\n", b"command_list_end\n",],
1056            self.protocol.transport.written,
1057        )
1058        self.protocol.transport.clear()
1059        self.protocol.lineReceived(b"list_OK")
1060        self.protocol.lineReceived(b"list_OK")
1061        self.protocol.lineReceived(b"OK")
1062
1063    def test_command_list_failure(self):
1064        self.init_protocol(default_idle=False)
1065
1066        def load_command_error(result):
1067            self.assertIsInstance(result, Failure)
1068            self.assertEqual(result.getErrorMessage(), "[50@0] {load} No such playlist")
1069
1070        def command_list_general_error(result):
1071            self.assertIsInstance(result, Failure)
1072            self.assertEqual(result.getErrorMessage(), "An earlier command failed.")
1073
1074        self.protocol.command_list_ok_begin()
1075        self.protocol.load("Foo").addErrback(load_command_error)
1076        self.protocol.play().addErrback(command_list_general_error)
1077        self.protocol.command_list_end().addErrback(load_command_error)
1078        self.assertEqual(
1079            [
1080                b"command_list_ok_begin\n",
1081                b'load "Foo"\n',
1082                b"play\n",
1083                b"command_list_end\n",
1084            ],
1085            self.protocol.transport.written,
1086        )
1087        self.protocol.lineReceived(b"ACK [50@0] {load} No such playlist")
1088
1089    def test_command_list_when_default_idle(self):
1090        self.init_protocol()
1091        self.protocol.lineReceived(b"OK MPD 0.18.0")
1092
1093        def success(result):
1094            self.assertEqual([None, None], result)
1095
1096        self.protocol.command_list_ok_begin()
1097        self.protocol.play()
1098        self.protocol.stop()
1099        self.protocol.command_list_end().addCallback(success)
1100        self.assertEqual(
1101            [
1102                b"idle\n",
1103                b"noidle\n",
1104                b"command_list_ok_begin\n",
1105                b"play\n",
1106                b"stop\n",
1107                b"command_list_end\n",
1108            ],
1109            self.protocol.transport.written,
1110        )
1111        self.protocol.transport.clear()
1112        self.protocol.lineReceived(b"OK")
1113        self.protocol.lineReceived(b"list_OK")
1114        self.protocol.lineReceived(b"list_OK")
1115        self.protocol.lineReceived(b"OK")
1116        self.assertEqual([b"idle\n"], self.protocol.transport.written)
1117
1118    def test_command_list_failure_when_default_idle(self):
1119        self.init_protocol()
1120        self.protocol.lineReceived(b"OK MPD 0.18.0")
1121
1122        def load_command_error(result):
1123            self.assertIsInstance(result, Failure)
1124            self.assertEqual(result.getErrorMessage(), "[50@0] {load} No such playlist")
1125
1126        def command_list_general_error(result):
1127            self.assertIsInstance(result, Failure)
1128            self.assertEqual(result.getErrorMessage(), "An earlier command failed.")
1129
1130        self.protocol.command_list_ok_begin()
1131        self.protocol.load("Foo").addErrback(load_command_error)
1132        self.protocol.play().addErrback(command_list_general_error)
1133        self.protocol.command_list_end().addErrback(load_command_error)
1134        self.assertEqual(
1135            [
1136                b"idle\n",
1137                b"noidle\n",
1138                b"command_list_ok_begin\n",
1139                b'load "Foo"\n',
1140                b"play\n",
1141                b"command_list_end\n",
1142            ],
1143            self.protocol.transport.written,
1144        )
1145        self.protocol.transport.clear()
1146        self.protocol.lineReceived(b"OK")
1147        self.protocol.lineReceived(b"ACK [50@0] {load} No such playlist")
1148        self.assertEqual([b"idle\n"], self.protocol.transport.written)
1149
1150    def test_command_list_item_is_generator(self):
1151        self.init_protocol(default_idle=False)
1152
1153        def success(result):
1154            self.assertEqual(
1155                result,
1156                [
1157                    [
1158                        "Weezer - Say It Ain't So.mp3",
1159                        "Dire Straits - Walk of Life.mp3",
1160                        "01 - Love Delicatessen.mp3",
1161                        "Guns N' Roses - Paradise City.mp3",
1162                    ]
1163                ],
1164            )
1165
1166        self.protocol.command_list_ok_begin()
1167        self.protocol.listplaylist("Foo")
1168        self.protocol.command_list_end().addCallback(success)
1169        self.protocol.lineReceived(b"file: Weezer - Say It Ain't So.mp3")
1170        self.protocol.lineReceived(b"file: Dire Straits - Walk of Life.mp3")
1171        self.protocol.lineReceived(b"file: 01 - Love Delicatessen.mp3")
1172        self.protocol.lineReceived(b"file: Guns N' Roses - Paradise City.mp3")
1173        self.protocol.lineReceived(b"list_OK")
1174        self.protocol.lineReceived(b"OK")
1175
1176    def test_already_in_command_list(self):
1177        self.init_protocol(default_idle=False)
1178        self.protocol.command_list_ok_begin()
1179        self.assertRaises(
1180            mpd.CommandListError, lambda: self.protocol.command_list_ok_begin()
1181        )
1182
1183    def test_not_in_command_list(self):
1184        self.init_protocol(default_idle=False)
1185        self.assertRaises(
1186            mpd.CommandListError, lambda: self.protocol.command_list_end()
1187        )
1188
1189    def test_invalid_command_in_command_list(self):
1190        self.init_protocol(default_idle=False)
1191        self.protocol.command_list_ok_begin()
1192        self.assertRaises(mpd.CommandListError, lambda: self.protocol.kill())
1193
1194    def test_close(self):
1195        self.init_protocol(default_idle=False)
1196
1197        def success(result):
1198            self.assertEqual(result, None)
1199
1200        self.protocol.close().addCallback(success)
1201
1202
1203class AsyncMockServer:
1204    def __init__(self):
1205        self._output = asyncio.Queue()
1206        self._expectations = []
1207
1208    def get_streams(self):
1209        result = asyncio.Future()
1210        result.set_result((self, self))
1211        return result
1212
1213    def readline(self):
1214        # directly passing around the awaitable
1215        return self._output.get()
1216
1217    async def readexactly(self, length):
1218        ret = await self._output.get()
1219        if len(ret) != length:
1220            self.error("Mock data is not chuncked in the way the client expects to read it")
1221        return ret
1222
1223    def write(self, data):
1224        try:
1225            next_write = self._expectations[0][0][0]
1226        except IndexError:
1227            self.error("Data written to mock even though none expected: %r" % data)
1228        if next_write == data:
1229            self._expectations[0][0].pop(0)
1230            self._feed()
1231        else:
1232            self.error("Mock got %r, expected %r" % (data, next_write))
1233
1234    def close(self):
1235        # todo: make sure calls to self.write fail after calling close
1236        pass
1237
1238    def error(self, message):
1239        raise AssertionError(message)
1240
1241    def _feed(self):
1242        if len(self._expectations[0][0]) == 0:
1243            _, response_lines = self._expectations.pop(0)
1244            for l in response_lines:
1245                self._output.put_nowait(l)
1246
1247    def expect_exchange(self, request_lines, response_lines):
1248        self._expectations.append((request_lines, response_lines))
1249        self._feed()
1250
1251
1252class TestAsyncioMPD(unittest.TestCase):
1253    def init_client(self, odd_hello=None):
1254        self.loop = asyncio.get_event_loop()
1255
1256        self.mockserver = AsyncMockServer()
1257        asyncio.open_connection = mock.MagicMock(
1258            return_value=self.mockserver.get_streams()
1259        )
1260
1261        if odd_hello is None:
1262            hello_lines = [b"OK MPD mocker\n"]
1263        else:
1264            hello_lines = odd_hello
1265
1266        self.mockserver.expect_exchange([], hello_lines)
1267
1268        self.client = mpd.asyncio.MPDClient()
1269        self._await(self.client.connect(TEST_MPD_HOST, TEST_MPD_PORT, loop=self.loop))
1270
1271        asyncio.open_connection.assert_called_with(
1272            TEST_MPD_HOST, TEST_MPD_PORT, loop=self.loop
1273        )
1274
1275    def _await(self, future):
1276        return self.loop.run_until_complete(future)
1277
1278    def test_oddhello(self):
1279        self.assertRaises(
1280            mpd.base.ProtocolError, self.init_client, odd_hello=[b"NOT OK\n"]
1281        )
1282
1283    @unittest.skipIf(
1284        os.getenv("RUN_SLOW_TESTS") is None,
1285        "This test would add 5 seconds of idling to the run (export RUN_SLOW_TESTS=1 to run anyway)",
1286    )
1287    def test_noresponse(self):
1288        self.assertRaises(mpd.base.ConnectionError, self.init_client, odd_hello=[])
1289
1290    def test_status(self):
1291        self.init_client()
1292
1293        self.mockserver.expect_exchange(
1294            [b"status\n"],
1295            [
1296                b"volume: 70\n",
1297                b"repeat: 0\n",
1298                b"random: 1\n",
1299                b"single: 0\n",
1300                b"consume: 0\n",
1301                b"playlist: 416\n",
1302                b"playlistlength: 7\n",
1303                b"mixrampdb: 0.000000\n",
1304                b"state: play\n",
1305                b"song: 4\n",
1306                b"songid: 19\n",
1307                b"time: 28:403\n",
1308                b"elapsed: 28.003\n",
1309                b"bitrate: 465\n",
1310                b"duration: 403.066\n",
1311                b"audio: 44100:16:2\n",
1312                b"OK\n",
1313            ],
1314        )
1315
1316        status = self._await(self.client.status())
1317        self.assertEqual(
1318            status,
1319            {
1320                "audio": "44100:16:2",
1321                "bitrate": "465",
1322                "consume": "0",
1323                "duration": "403.066",
1324                "elapsed": "28.003",
1325                "mixrampdb": "0.000000",
1326                "playlist": "416",
1327                "playlistlength": "7",
1328                "random": "1",
1329                "repeat": "0",
1330                "single": "0",
1331                "song": "4",
1332                "songid": "19",
1333                "state": "play",
1334                "time": "28:403",
1335                "volume": "70",
1336            },
1337        )
1338
1339    async def _test_outputs(self):
1340        self.mockserver.expect_exchange(
1341            [b"outputs\n"],
1342            [
1343                b"outputid: 0\n",
1344                b"outputname: My ALSA Device\n",
1345                b"plugin: alsa\n",
1346                b"outputenabled: 0\n",
1347                b"attribute: dop=0\n",
1348                b"outputid: 1\n",
1349                b"outputname: My FM transmitter\n",
1350                b"plugin: fmradio\n",
1351                b"outputenabled: 1\n",
1352                b"OK\n",
1353            ],
1354        )
1355
1356        outputs = self.client.outputs()
1357
1358        expected = iter(
1359            [
1360                {
1361                    "outputid": "0",
1362                    "outputname": "My ALSA Device",
1363                    "plugin": "alsa",
1364                    "outputenabled": "0",
1365                    "attribute": "dop=0",
1366                },
1367                {
1368                    "outputid": "1",
1369                    "outputname": "My FM transmitter",
1370                    "plugin": "fmradio",
1371                    "outputenabled": "1",
1372                },
1373            ]
1374        )
1375
1376        async for o in outputs:
1377            self.assertEqual(o, next(expected))
1378        self.assertRaises(StopIteration, next, expected)
1379
1380    def test_outputs(self):
1381        self.init_client()
1382        self._await(self._test_outputs())
1383
1384    async def _test_list(self):
1385        self.mockserver.expect_exchange(
1386            [b'list "album"\n'], [b"Album: first\n", b"Album: second\n", b"OK\n",]
1387        )
1388
1389        list_ = self.client.list("album")
1390
1391        expected = iter([{"album": "first"}, {"album": "second"},])
1392
1393        async for o in list_:
1394            self.assertEqual(o, next(expected))
1395        self.assertRaises(StopIteration, next, expected)
1396
1397    def test_list(self):
1398        self.init_client()
1399        self._await(self._test_list())
1400
1401    async def _test_albumart(self):
1402        self.mockserver.expect_exchange(
1403            [b'albumart "x.mp3" "0"\n'],
1404            [
1405                b"size: 32\n",
1406                b"binary: 16\n",
1407                bytes(range(16)),
1408                b"\n",
1409                b"OK\n",
1410            ]
1411        )
1412        self.mockserver.expect_exchange(
1413            [b'albumart "x.mp3" "16"\n'],
1414            [
1415                b"size: 32\n",
1416                b"binary: 16\n",
1417                bytes(range(16)),
1418                b"\n",
1419                b"OK\n",
1420            ],
1421        )
1422
1423        albumart = await self.client.albumart("x.mp3")
1424
1425        expected = {"binary": bytes(range(16)) + bytes(range(16))}
1426
1427        self.assertEqual(albumart, expected)
1428
1429    async def _test_readpicture(self):
1430            self.mockserver.expect_exchange(
1431                [b'readpicture "x.mp3" "0"\n'],
1432                [
1433                    b"size: 32\n",
1434                    b"type: image/jpeg\n",
1435                    b"binary: 16\n",
1436                    bytes(range(16)),
1437                    b"\n",
1438                    b"OK\n",
1439                ]
1440            )
1441            self.mockserver.expect_exchange(
1442                [b'readpicture "x.mp3" "16"\n'],
1443                [
1444                    b"size: 32\n",
1445                    b"type: image/jpeg\n",
1446                    b"binary: 16\n",
1447                    bytes(range(16)),
1448                    b"\n",
1449                    b"OK\n",
1450                ],
1451            )
1452
1453            art = await self.client.readpicture("x.mp3")
1454
1455            expected = {"binary": bytes(range(16)) + bytes(range(16)), "type": "image/jpeg"}
1456
1457            self.assertEqual(art, expected)
1458
1459    async def _test_readpicture_empty(self):
1460            self.mockserver.expect_exchange(
1461                [b'readpicture "x.mp3" "0"\n'],
1462                [
1463                    b"OK\n",
1464                ]
1465            )
1466
1467            art = await self.client.readpicture("x.mp3")
1468
1469            expected = {}
1470
1471            self.assertEqual(art, expected)
1472
1473    def test_albumart(self):
1474        self.init_client()
1475        self._await(self._test_albumart())
1476
1477    def test_readpicture(self):
1478        self.init_client()
1479        self._await(self._test_readpicture())
1480
1481    def test_readpicture_empty(self):
1482        self.init_client()
1483        self._await(self._test_readpicture_empty())
1484
1485    def test_mocker(self):
1486        """Does the mock server refuse unexpected writes?"""
1487        self.init_client()
1488
1489        self.mockserver.expect_exchange([b"expecting odd things\n"], [b""])
1490        self.assertRaises(AssertionError, self._await, self.client.status())
1491
1492
1493if __name__ == "__main__":
1494    unittest.main()
1495