1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4from __future__ import absolute_import 5import itertools 6import mpd 7import mpd.asyncio 8import os 9import socket 10import sys 11import types 12import warnings 13 14import unittest 15 16try: 17 from twisted.python.failure import Failure 18 19 TWISTED_MISSING = False 20except ImportError: 21 warnings.warn( 22 "No twisted installed: skip twisted related tests! " 23 + "(twisted is not available for python >= 3.0 && python < 3.3)" 24 ) 25 TWISTED_MISSING = True 26 27import asyncio 28 29try: 30 import mock 31except ImportError: 32 print("Please install mock from PyPI to run tests!") 33 sys.exit(1) 34 35# show deprecation warnings 36warnings.simplefilter("default") 37 38 39TEST_MPD_HOST, TEST_MPD_PORT = ("example.com", 10000) 40TEST_MPD_UNIXHOST = "/example/test/host" 41TEST_MPD_UNIXTIMEOUT = 0.5 42 43 44class TestMPDClient(unittest.TestCase): 45 46 longMessage = True 47 48 def setUp(self): 49 self.socket_patch = mock.patch("mpd.base.socket") 50 self.socket_mock = self.socket_patch.start() 51 self.socket_mock.getaddrinfo.return_value = [range(5)] 52 53 self.socket_mock.socket.side_effect = ( 54 lambda *a, **kw: 55 # Create a new socket.socket() mock with default attributes, 56 # each time we are calling it back (otherwise, it keeps set 57 # attributes across calls). 58 # That's probablyy what we want, since reconnecting is like 59 # reinitializing the entire connection, and so, the mock. 60 mock.MagicMock(name="socket.socket") 61 ) 62 63 self.client = mpd.MPDClient() 64 self.client.connect(TEST_MPD_HOST, TEST_MPD_PORT) 65 self.client._sock.reset_mock() 66 self.MPDWillReturn("ACK don't forget to setup your mock\n") 67 68 def tearDown(self): 69 self.socket_patch.stop() 70 71 def MPDWillReturn(self, *lines): 72 # Return what the caller wants first, then do as if the socket was 73 # disconnected. 74 innerIter = itertools.chain(lines, itertools.repeat("")) 75 if sys.version_info >= (3, 0): 76 self.client._rbfile.readline.side_effect = ( 77 x.encode("utf-8") for x in innerIter 78 ) 79 else: 80 self.client._rbfile.readline.side_effect = innerIter 81 82 def assertMPDReceived(self, *lines): 83 self.client._wfile.write.assert_called_with(*lines) 84 85 def test_abstract_functions(self): 86 MPDClientBase = mpd.base.MPDClientBase 87 self.assertRaises( 88 NotImplementedError, 89 lambda: MPDClientBase.add_command("command_name", lambda x: x), 90 ) 91 client = MPDClientBase() 92 self.assertRaises(NotImplementedError, lambda: client.noidle()) 93 self.assertRaises(NotImplementedError, lambda: client.command_list_ok_begin()) 94 self.assertRaises(NotImplementedError, lambda: client.command_list_end()) 95 96 def test_metaclass_commands(self): 97 # just some random functions 98 self.assertTrue(hasattr(self.client, "commands")) 99 self.assertTrue(hasattr(self.client, "save")) 100 self.assertTrue(hasattr(self.client, "random")) 101 # space should be replaced 102 self.assertFalse(hasattr(self.client, "sticker get")) 103 self.assertTrue(hasattr(self.client, "sticker_get")) 104 105 def test_duplicate_tags(self): 106 self.MPDWillReturn("Track: file1\n", "Track: file2\n", "OK\n") 107 song = self.client.currentsong() 108 self.assertIsInstance(song, dict) 109 self.assertIsInstance(song["track"], list) 110 self.assertMPDReceived("currentsong\n") 111 112 def test_parse_nothing(self): 113 self.MPDWillReturn("OK\n", "OK\n") 114 115 self.assertIsNone(self.client.ping()) 116 self.assertMPDReceived("ping\n") 117 118 self.assertIsNone(self.client.clearerror()) 119 self.assertMPDReceived("clearerror\n") 120 121 def test_parse_list(self): 122 self.MPDWillReturn( 123 "tagtype: Artist\n", "tagtype: ArtistSort\n", "tagtype: Album\n", "OK\n" 124 ) 125 126 result = self.client.tagtypes() 127 self.assertMPDReceived("tagtypes\n") 128 self.assertIsInstance(result, list) 129 self.assertEqual(result, ["Artist", "ArtistSort", "Album",]) 130 131 def test_parse_list_groups(self): 132 self.MPDWillReturn( 133 "Album: \n", 134 "Album: 20th_Century_Masters_The_Millenium_Collection\n", 135 "Album: Aerosmith's Greatest Hits\n", 136 "OK\n", 137 ) 138 139 result = self.client.list("album") 140 self.assertMPDReceived('list "album"\n') 141 self.assertIsInstance(result, list) 142 self.assertEqual( 143 result, 144 [ 145 {"album": ""}, 146 {"album": "20th_Century_Masters_The_Millenium_Collection"}, 147 {"album": "Aerosmith's Greatest Hits"}, 148 ], 149 ) 150 151 self.MPDWillReturn( 152 "Album: \n", 153 "Album: 20th_Century_Masters_The_Millenium_Collection\n", 154 "Artist: Eric Clapton\n", 155 "Album: Aerosmith's Greatest Hits\n", 156 "Artist: Aerosmith\n", 157 "OK\n", 158 ) 159 160 result = self.client.list("album", "group", "artist") 161 self.assertMPDReceived('list "album" "group" "artist"\n') 162 self.assertIsInstance(result, list) 163 self.assertEqual( 164 result, 165 [ 166 {"album": ""}, 167 { 168 "album": "20th_Century_Masters_The_Millenium_Collection", 169 "artist": "Eric Clapton", 170 }, 171 {"album": "Aerosmith's Greatest Hits", "artist": "Aerosmith"}, 172 ], 173 ) 174 175 def test_parse_item(self): 176 self.MPDWillReturn("updating_db: 42\n", "OK\n") 177 self.assertIsNotNone(self.client.update()) 178 179 def test_parse_object(self): 180 # XXX: _read_objects() doesn't wait for the final OK 181 self.MPDWillReturn("volume: 63\n", "OK\n") 182 status = self.client.status() 183 self.assertMPDReceived("status\n") 184 self.assertIsInstance(status, dict) 185 186 # XXX: _read_objects() doesn't wait for the final OK 187 self.MPDWillReturn("OK\n") 188 stats = self.client.stats() 189 self.assertMPDReceived("stats\n") 190 self.assertIsInstance(stats, dict) 191 192 def test_parse_songs(self): 193 self.MPDWillReturn("file: my-song.ogg\n", "Pos: 0\n", "Id: 66\n", "OK\n") 194 playlist = self.client.playlistinfo() 195 196 self.assertMPDReceived("playlistinfo\n") 197 self.assertIsInstance(playlist, list) 198 self.assertEqual(1, len(playlist)) 199 e = playlist[0] 200 self.assertIsInstance(e, dict) 201 self.assertEqual("my-song.ogg", e["file"]) 202 self.assertEqual("0", e["pos"]) 203 self.assertEqual("66", e["id"]) 204 205 def test_readcomments(self): 206 self.MPDWillReturn( 207 "major_brand: M4V\n", "minor_version: 1\n", "lyrics: Lalala\n", "OK\n" 208 ) 209 comments = self.client.readcomments() 210 self.assertMPDReceived("readcomments\n") 211 self.assertEqual(comments["major_brand"], "M4V") 212 self.assertEqual(comments["minor_version"], "1") 213 self.assertEqual(comments["lyrics"], "Lalala") 214 215 def test_iterating(self): 216 self.MPDWillReturn("file: my-song.ogg\n", "Pos: 0\n", "Id: 66\n", "OK\n") 217 self.client.iterate = True 218 playlist = self.client.playlistinfo() 219 self.assertMPDReceived("playlistinfo\n") 220 self.assertIsInstance(playlist, types.GeneratorType) 221 for song in playlist: 222 self.assertIsInstance(song, dict) 223 self.assertEqual("my-song.ogg", song["file"]) 224 self.assertEqual("0", song["pos"]) 225 self.assertEqual("66", song["id"]) 226 227 def test_add_and_remove_command(self): 228 self.MPDWillReturn("ACK awesome command\n") 229 230 self.client.add_command("awesome command", mpd.MPDClient._parse_nothing) 231 self.assertTrue(hasattr(self.client, "awesome_command")) 232 # should be unknown by mpd 233 self.assertRaises(mpd.CommandError, self.client.awesome_command) 234 235 self.client.remove_command("awesome_command") 236 self.assertFalse(hasattr(self.client, "awesome_command")) 237 238 # remove non existing command 239 self.assertRaises(ValueError, self.client.remove_command, "awesome_command") 240 241 def test_partitions(self): 242 self.MPDWillReturn("partition: default\n", "partition: partition2\n", "OK\n") 243 partitions = self.client.listpartitions() 244 self.assertMPDReceived("listpartitions\n") 245 self.assertEqual( 246 [ 247 {"partition": "default"}, 248 {"partition": "partition2"}, 249 ], 250 partitions 251 ) 252 253 self.MPDWillReturn("OK\n") 254 self.assertIsNone(self.client.newpartition("Another Partition")) 255 self.assertMPDReceived('newpartition "Another Partition"\n') 256 257 self.MPDWillReturn("OK\n") 258 self.assertIsNone(self.client.partition("Another Partition")) 259 self.assertMPDReceived('partition "Another Partition"\n') 260 261 self.MPDWillReturn("OK\n") 262 self.assertIsNone(self.client.delpartition("Another Partition")) 263 self.assertMPDReceived('delpartition "Another Partition"\n') 264 265 self.MPDWillReturn("OK\n") 266 self.assertIsNone(self.client.moveoutput("My ALSA Device")) 267 self.assertMPDReceived('moveoutput "My ALSA Device"\n') 268 269 def test_client_to_client(self): 270 # client to client is at this time in beta! 271 272 self.MPDWillReturn("OK\n") 273 self.assertIsNone(self.client.subscribe("monty")) 274 self.assertMPDReceived('subscribe "monty"\n') 275 276 self.MPDWillReturn("channel: monty\n", "OK\n") 277 channels = self.client.channels() 278 self.assertMPDReceived("channels\n") 279 self.assertEqual(["monty"], channels) 280 281 self.MPDWillReturn("OK\n") 282 self.assertIsNone(self.client.sendmessage("monty", "SPAM")) 283 self.assertMPDReceived('sendmessage "monty" "SPAM"\n') 284 285 self.MPDWillReturn("channel: monty\n", "message: SPAM\n", "OK\n") 286 msg = self.client.readmessages() 287 self.assertMPDReceived("readmessages\n") 288 self.assertEqual(msg, [{"channel": "monty", "message": "SPAM"}]) 289 290 self.MPDWillReturn("OK\n") 291 self.assertIsNone(self.client.unsubscribe("monty")) 292 self.assertMPDReceived('unsubscribe "monty"\n') 293 294 self.MPDWillReturn("OK\n") 295 channels = self.client.channels() 296 self.assertMPDReceived("channels\n") 297 self.assertEqual([], channels) 298 299 def test_unicode_as_command_args(self): 300 self.MPDWillReturn("OK\n") 301 res = self.client.find("file", "☯☾☝♖✽") 302 self.assertIsInstance(res, list) 303 self.assertMPDReceived('find "file" "☯☾☝♖✽"\n') 304 305 def test_numbers_as_command_args(self): 306 self.MPDWillReturn("OK\n") 307 self.client.find("file", 1) 308 self.assertMPDReceived('find "file" "1"\n') 309 310 def test_commands_without_callbacks(self): 311 self.MPDWillReturn("\n") 312 self.client.close() 313 self.assertMPDReceived("close\n") 314 315 # XXX: what are we testing here? 316 # looks like reconnection test? 317 self.client._reset() 318 self.client.connect(TEST_MPD_HOST, TEST_MPD_PORT) 319 320 def test_set_timeout_on_client(self): 321 self.client.timeout = 1 322 self.client._sock.settimeout.assert_called_with(1) 323 self.assertEqual(self.client.timeout, 1) 324 325 self.client.timeout = None 326 self.client._sock.settimeout.assert_called_with(None) 327 self.assertEqual(self.client.timeout, None) 328 329 def test_set_timeout_from_connect(self): 330 self.client.disconnect() 331 with warnings.catch_warnings(record=True) as w: 332 self.client.connect("example.com", 10000, timeout=5) 333 self.client._sock.settimeout.assert_called_with(5) 334 self.assertEqual(len(w), 1) 335 self.assertIn("Use MPDClient.timeout", str(w[0].message)) 336 337 @unittest.skipIf( 338 sys.version_info < (3, 3), "BrokenPipeError was introduced in python 3.3" 339 ) 340 def test_broken_pipe_error(self): 341 self.MPDWillReturn("volume: 63\n", "OK\n") 342 self.client._wfile.write.side_effect = BrokenPipeError 343 self.socket_mock.error = Exception 344 345 with self.assertRaises(mpd.ConnectionError): 346 self.client.status() 347 348 def test_connection_lost(self): 349 # Simulate a connection lost: the socket returns empty strings 350 self.MPDWillReturn("") 351 self.socket_mock.error = Exception 352 353 with self.assertRaises(mpd.ConnectionError): 354 self.client.status() 355 self.socket_mock.unpack.assert_called() 356 357 # consistent behaviour, solves bug #11 (github) 358 with self.assertRaises(mpd.ConnectionError): 359 self.client.status() 360 self.socket_mock.unpack.assert_called() 361 362 self.assertIs(self.client._sock, None) 363 364 @unittest.skipIf( 365 sys.version_info < (3, 0), 366 "Automatic decoding/encoding from the socket is only " "available in Python 3", 367 ) 368 def test_force_socket_encoding_and_nonbuffering(self): 369 # Force the reconnection to refill the mock 370 self.client.disconnect() 371 self.client.connect(TEST_MPD_HOST, TEST_MPD_PORT) 372 self.assertEqual( 373 [ 374 mock.call("rb", newline="\n"), 375 mock.call("w", encoding="utf-8", newline="\n"), 376 ], 377 # We are only interested into the 2 first entries, 378 # otherwise we get all the readline() & co... 379 self.client._sock.makefile.call_args_list[0:2], 380 ) 381 382 def test_ranges_as_argument(self): 383 self.MPDWillReturn("OK\n") 384 self.client.move((1, 2), 2) 385 self.assertMPDReceived('move "1:2" "2"\n') 386 387 self.MPDWillReturn("OK\n") 388 self.client.move((1,), 2) 389 self.assertMPDReceived('move "1:" "2"\n') 390 391 # old code still works! 392 self.MPDWillReturn("OK\n") 393 self.client.move("1:2", 2) 394 self.assertMPDReceived('move "1:2" "2"\n') 395 396 # empty ranges 397 self.MPDWillReturn("OK\n") 398 self.client.rangeid(1, ()) 399 self.assertMPDReceived('rangeid "1" ":"\n') 400 401 with self.assertRaises(ValueError): 402 self.MPDWillReturn("OK\n") 403 self.client.move((1, "garbage"), 2) 404 self.assertMPDReceived('move "1:" "2"\n') 405 406 def test_parse_changes(self): 407 self.MPDWillReturn( 408 "cpos: 0\n", 409 "Id: 66\n", 410 "cpos: 1\n", 411 "Id: 67\n", 412 "cpos: 2\n", 413 "Id: 68\n", 414 "cpos: 3\n", 415 "Id: 69\n", 416 "cpos: 4\n", 417 "Id: 70\n", 418 "OK\n", 419 ) 420 res = self.client.plchangesposid(0) 421 self.assertEqual( 422 [ 423 {"cpos": "0", "id": "66"}, 424 {"cpos": "1", "id": "67"}, 425 {"cpos": "2", "id": "68"}, 426 {"cpos": "3", "id": "69"}, 427 {"cpos": "4", "id": "70"}, 428 ], 429 res, 430 ) 431 432 def test_parse_database(self): 433 self.MPDWillReturn( 434 "directory: foo\n", 435 "Last-Modified: 2014-01-23T16:42:46Z\n", 436 "file: bar.mp3\n", 437 "size: 59618802\n", 438 "Last-Modified: 2014-11-02T19:57:00Z\n", 439 "OK\n", 440 ) 441 self.client.listfiles("/") 442 443 def test_parse_mounts(self): 444 self.MPDWillReturn( 445 "mount: \n", 446 "storage: /home/foo/music\n", 447 "mount: foo\n", 448 "storage: nfs://192.168.1.4/export/mp3\n", 449 "OK\n", 450 ) 451 res = self.client.listmounts() 452 self.assertEqual( 453 [ 454 {"mount": "", "storage": "/home/foo/music"}, 455 {"mount": "foo", "storage": "nfs://192.168.1.4/export/mp3"}, 456 ], 457 res, 458 ) 459 460 def test_parse_neighbors(self): 461 self.MPDWillReturn( 462 "neighbor: smb://FOO\n", "name: FOO (Samba 4.1.11-Debian)\n", "OK\n" 463 ) 464 res = self.client.listneighbors() 465 self.assertEqual( 466 [{"name": "FOO (Samba 4.1.11-Debian)", "neighbor": "smb://FOO"}], res 467 ) 468 469 def test_parse_outputs(self): 470 self.MPDWillReturn( 471 "outputid: 0\n", 472 "outputname: My ALSA Device\n", 473 "outputenabled: 0\n", 474 "OK\n", 475 ) 476 res = self.client.outputs() 477 self.assertEqual( 478 [{"outputenabled": "0", "outputid": "0", "outputname": "My ALSA Device"}], 479 res, 480 ) 481 482 def test_parse_playlist(self): 483 self.MPDWillReturn( 484 "0:file: Weezer - Say It Ain't So.mp3\n", 485 "1:file: Dire Straits - Walk of Life.mp3\n", 486 "2:file: 01 - Love Delicatessen.mp3\n", 487 "3:file: Guns N' Roses - Paradise City.mp3\n", 488 "4:file: Nirvana - Lithium.mp3\n", 489 "OK\n", 490 ) 491 res = self.client.playlist() 492 self.assertEqual( 493 [ 494 "file: Weezer - Say It Ain't So.mp3", 495 "file: Dire Straits - Walk of Life.mp3", 496 "file: 01 - Love Delicatessen.mp3", 497 "file: Guns N' Roses - Paradise City.mp3", 498 "file: Nirvana - Lithium.mp3", 499 ], 500 res, 501 ) 502 503 def test_parse_playlists(self): 504 self.MPDWillReturn( 505 "playlist: Playlist\n", "Last-Modified: 2016-08-13T10:55:56Z\n", "OK\n" 506 ) 507 res = self.client.listplaylists() 508 self.assertEqual( 509 [{"last-modified": "2016-08-13T10:55:56Z", "playlist": "Playlist"}], res 510 ) 511 512 def test_parse_plugins(self): 513 self.MPDWillReturn( 514 "plugin: vorbis\n", 515 "suffix: ogg\n", 516 "suffix: oga\n", 517 "mime_type: application/ogg\n", 518 "mime_type: application/x-ogg\n", 519 "mime_type: audio/ogg\n", 520 "mime_type: audio/vorbis\n", 521 "mime_type: audio/vorbis+ogg\n", 522 "mime_type: audio/x-ogg\n", 523 "mime_type: audio/x-vorbis\n", 524 "mime_type: audio/x-vorbis+ogg\n", 525 "OK\n", 526 ) 527 res = self.client.decoders() 528 self.assertEqual( 529 [ 530 { 531 "mime_type": [ 532 "application/ogg", 533 "application/x-ogg", 534 "audio/ogg", 535 "audio/vorbis", 536 "audio/vorbis+ogg", 537 "audio/x-ogg", 538 "audio/x-vorbis", 539 "audio/x-vorbis+ogg", 540 ], 541 "plugin": "vorbis", 542 "suffix": ["ogg", "oga"], 543 } 544 ], 545 list(res), 546 ) 547 548 def test_parse_raw_stickers(self): 549 self.MPDWillReturn("sticker: foo=bar\n", "OK\n") 550 res = self.client._parse_raw_stickers(self.client._read_lines()) 551 self.assertEqual([("foo", "bar")], list(res)) 552 553 self.MPDWillReturn("sticker: foo=bar\n", "sticker: l=b\n", "OK\n") 554 res = self.client._parse_raw_stickers(self.client._read_lines()) 555 self.assertEqual([("foo", "bar"), ("l", "b")], list(res)) 556 557 def test_parse_raw_sticker_with_special_value(self): 558 self.MPDWillReturn("sticker: foo==uv=vu\n", "OK\n") 559 res = self.client._parse_raw_stickers(self.client._read_lines()) 560 self.assertEqual([("foo", "=uv=vu")], list(res)) 561 562 def test_parse_sticket_get_one(self): 563 self.MPDWillReturn("sticker: foo=bar\n", "OK\n") 564 res = self.client.sticker_get("song", "baz", "foo") 565 self.assertEqual("bar", res) 566 567 def test_parse_sticket_get_no_sticker(self): 568 self.MPDWillReturn("ACK [50@0] {sticker} no such sticker\n") 569 self.assertRaises( 570 mpd.CommandError, self.client.sticker_get, "song", "baz", "foo" 571 ) 572 573 def test_parse_sticker_list(self): 574 self.MPDWillReturn("sticker: foo=bar\n", "sticker: lom=bok\n", "OK\n") 575 res = self.client.sticker_list("song", "baz") 576 self.assertEqual({"foo": "bar", "lom": "bok"}, res) 577 578 # Even with only one sticker, we get a dict 579 self.MPDWillReturn("sticker: foo=bar\n", "OK\n") 580 res = self.client.sticker_list("song", "baz") 581 self.assertEqual({"foo": "bar"}, res) 582 583 def test_command_list(self): 584 self.MPDWillReturn( 585 "list_OK\n", 586 "list_OK\n", 587 "list_OK\n", 588 "list_OK\n", 589 "list_OK\n", 590 "volume: 100\n", 591 "repeat: 1\n", 592 "random: 1\n", 593 "single: 0\n", 594 "consume: 0\n", 595 "playlist: 68\n", 596 "playlistlength: 5\n", 597 "mixrampdb: 0.000000\n", 598 "state: play\n", 599 "xfade: 5\n", 600 "song: 0\n", 601 "songid: 56\n", 602 "time: 0:259\n", 603 "elapsed: 0.000\n", 604 "bitrate: 0\n", 605 "nextsong: 2\n", 606 "nextsongid: 58\n", 607 "list_OK\n", 608 "OK\n", 609 ) 610 self.client.command_list_ok_begin() 611 self.client.clear() 612 self.client.load("Playlist") 613 self.client.random(1) 614 self.client.repeat(1) 615 self.client.play(0) 616 self.client.status() 617 res = self.client.command_list_end() 618 self.assertEqual(None, res[0]) 619 self.assertEqual(None, res[1]) 620 self.assertEqual(None, res[2]) 621 self.assertEqual(None, res[3]) 622 self.assertEqual(None, res[4]) 623 self.assertEqual( 624 [ 625 ("bitrate", "0"), 626 ("consume", "0"), 627 ("elapsed", "0.000"), 628 ("mixrampdb", "0.000000"), 629 ("nextsong", "2"), 630 ("nextsongid", "58"), 631 ("playlist", "68"), 632 ("playlistlength", "5"), 633 ("random", "1"), 634 ("repeat", "1"), 635 ("single", "0"), 636 ("song", "0"), 637 ("songid", "56"), 638 ("state", "play"), 639 ("time", "0:259"), 640 ("volume", "100"), 641 ("xfade", "5"), 642 ], 643 sorted(res[5].items()), 644 ) 645 646 647# MPD client tests which do not mock the socket, but rather replace it 648# with a real socket from a socket 649@unittest.skipIf( 650 not hasattr(socket, "socketpair"), "Socketpair is not supported on this platform" 651) 652class TestMPDClientSocket(unittest.TestCase): 653 654 longMessage = True 655 656 def setUp(self): 657 self.connect_patch = mock.patch("mpd.MPDClient._connect_unix") 658 self.connect_mock = self.connect_patch.start() 659 660 test_socketpair = socket.socketpair() 661 662 self.connect_mock.return_value = test_socketpair[0] 663 self.server_socket = test_socketpair[1] 664 self.server_socket_reader = self.server_socket.makefile("rb") 665 self.server_socket_writer = self.server_socket.makefile("wb") 666 667 self.MPDWillReturnBinary(b"OK MPD 0.21.24\n") 668 669 self.client = mpd.MPDClient() 670 self.client.connect(TEST_MPD_UNIXHOST) 671 self.client.timeout = TEST_MPD_UNIXTIMEOUT 672 673 self.connect_mock.assert_called_once() 674 675 def tearDown(self): 676 self.close_server_socket() 677 self.connect_patch.stop() 678 679 def close_server_socket(self): 680 self.server_socket_reader.close() 681 self.server_socket_writer.close() 682 self.server_socket.close() 683 684 def MPDWillReturnBinary(self, byteStr): 685 self.server_socket_writer.write(byteStr) 686 self.server_socket_writer.flush() 687 688 def assertMPDReceived(self, byteStr): 689 """ 690 Assert MPD received the given bytestring. 691 Note: this disconnects the client. 692 """ 693 # to ensure we don't block, close the socket on client side 694 self.client.disconnect() 695 696 # read one extra to ensure nothing extraneous was written 697 received = self.server_socket_reader.read(len(byteStr) + 1) 698 699 self.assertEqual(received, byteStr) 700 701 def test_readbinary_error(self): 702 self.MPDWillReturnBinary(b"ACK [50@0] {albumart} No file exists\n") 703 704 self.assertRaises( 705 mpd.CommandError, lambda: self.client.albumart("a/full/path.mp3") 706 ) 707 708 self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n') 709 710 def test_binary_albumart_disconnect_afterchunk(self): 711 self.MPDWillReturnBinary(b"size: 17\nbinary: 3\n" b"\x00\x00\x00\nOK\n") 712 713 # we're expecting a timeout 714 self.assertRaises( 715 socket.timeout, lambda: self.client.albumart("a/full/path.mp3") 716 ) 717 718 self.assertMPDReceived( 719 b'albumart "a/full/path.mp3" "0"\nalbumart "a/full/path.mp3" "3"\n' 720 ) 721 self.assertIs(self.client._sock, None) 722 723 def test_binary_albumart_disconnect_midchunk(self): 724 self.MPDWillReturnBinary(b"size: 8\nbinary: 8\n\x00\x01\x02\x03") 725 726 # we're expecting a timeout or error of some form 727 self.assertRaises( 728 socket.timeout, lambda: self.client.albumart("a/full/path.mp3") 729 ) 730 731 self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n') 732 self.assertIs(self.client._sock, None) 733 734 def test_binary_albumart_singlechunk_networkmultiwrite(self): 735 # length 16 736 expected_binary = ( 737 b"\xA0\xA1\xA2\xA3\xA4\xA5\xA6\xA7\xA8\xA9\xAA\xAB\xAC\xAD\xAE\xAF" 738 ) 739 740 self.MPDWillReturnBinary(b"binary: 16\n") 741 self.MPDWillReturnBinary(expected_binary[0:4]) 742 self.MPDWillReturnBinary(expected_binary[4:9]) 743 self.MPDWillReturnBinary(expected_binary[9:14]) 744 self.MPDWillReturnBinary(expected_binary[14:16]) 745 self.MPDWillReturnBinary(b"\nOK\n") 746 747 real_binary = self.client.albumart("a/full/path.mp3") 748 self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n') 749 self.assertEqual(real_binary, {"binary": expected_binary}) 750 751 def test_binary_albumart_singlechunk_nosize(self): 752 # length: 16 753 expected_binary = ( 754 b"\x01\x02\x00\x03\x04\x00\xFF\x05\x07\x08\x0A\x0F\xF0\xA5\x00\x01" 755 ) 756 757 self.MPDWillReturnBinary(b"binary: 16\n" + expected_binary + b"\nOK\n") 758 759 real_binary = self.client.albumart("a/full/path.mp3") 760 self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n') 761 self.assertEqual(real_binary, {"binary": expected_binary}) 762 763 def test_binary_albumart_singlechunk_sizeheader(self): 764 # length: 16 765 expected_binary = ( 766 b"\x01\x02\x00\x03\x04\x00\xFF\x05\x07\x08\x0A\x0F\xF0\xA5\x00\x01" 767 ) 768 769 self.MPDWillReturnBinary( 770 b"size: 16\nbinary: 16\n" + expected_binary + b"\nOK\n" 771 ) 772 773 real_binary = self.client.albumart("a/full/path.mp3") 774 self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n') 775 self.assertEqual(real_binary, {"binary": expected_binary}) 776 777 def test_binary_albumart_even_multichunk(self): 778 # length: 16 each 779 expected_chunk1 = ( 780 b"\x01\x02\x00\x03\x04\x00\xFF\x05\x07\x08\x0A\x0F\xF0\xA5\x00\x01" 781 ) 782 expected_chunk2 = ( 783 b"\x0A\x0B\x0C\x0D\x0E\x0F\x10\x1F\x2F\x2D\x33\x0D\x00\x00\x11\x13" 784 ) 785 expected_chunk3 = ( 786 b"\x99\x88\x77\xDD\xD0\xF0\x20\x70\x71\x17\x13\x31\xFF\xFF\xDD\xFF" 787 ) 788 expected_binary = expected_chunk1 + expected_chunk2 + expected_chunk3 789 790 # 3 distinct commands expected 791 self.MPDWillReturnBinary( 792 b"size: 48\nbinary: 16\n" 793 + expected_chunk1 794 + b"\nOK\nsize: 48\nbinary: 16\n" 795 + expected_chunk2 796 + b"\nOK\nsize: 48\nbinary: 16\n" 797 + expected_chunk3 798 + b"\nOK\n" 799 ) 800 801 real_binary = self.client.albumart("a/full/path.mp3") 802 803 self.assertMPDReceived( 804 b'albumart "a/full/path.mp3" "0"\nalbumart "a/full/path.mp3" "16"' 805 b'\nalbumart "a/full/path.mp3" "32"\n' 806 ) 807 self.assertEqual(real_binary, {"binary": expected_binary}) 808 809 def test_binary_albumart_odd_multichunk(self): 810 # lengths: 17, 15, 1 811 expected_chunk1 = ( 812 b"\x01\x02\x00\x03\x04\x00\xFF\x05\x07\x08\x0A\x0F\xF0\xA5\x00\x01\x13" 813 ) 814 expected_chunk2 = ( 815 b"\x0A\x0B\x0C\x0D\x0E\x0F\x10\x1F\x2F\x2D\x33\x0D\x00\x00\x11" 816 ) 817 expected_chunk3 = b"\x99" 818 expected_binary = expected_chunk1 + expected_chunk2 + expected_chunk3 819 820 # 3 distinct commands expected 821 self.MPDWillReturnBinary( 822 b"size: 33\nbinary: 17\n" 823 + expected_chunk1 824 + b"\nOK\nsize: 33\nbinary: 15\n" 825 + expected_chunk2 826 + b"\nOK\nsize: 33\nbinary: 1\n" 827 + expected_chunk3 828 + b"\nOK\n" 829 ) 830 831 real_binary = self.client.albumart("a/full/path.mp3") 832 self.assertMPDReceived( 833 b'albumart "a/full/path.mp3" "0"\nalbumart "a/full/path.mp3" "17"\n' 834 b'albumart "a/full/path.mp3" "32"\n' 835 ) 836 self.assertEqual(real_binary, {"binary": expected_binary}) 837 838 # MPD server can return empty response if a file exists but is empty 839 def test_binary_albumart_emptyresponse(self): 840 self.MPDWillReturnBinary(b"size: 0\nbinary: 0\n\nOK\n") 841 842 real_binary = self.client.albumart("a/full/path.mp3") 843 self.assertMPDReceived(b'albumart "a/full/path.mp3" "0"\n') 844 self.assertEqual(real_binary, {"binary": b""}) 845 846 # readpicture returns empty object if the song exists but has no picture 847 def test_binary_readpicture_emptyresponse(self): 848 self.MPDWillReturnBinary(b"OK\n") 849 850 real_binary = self.client.readpicture("plainsong.mp3") 851 self.assertMPDReceived(b'readpicture "plainsong.mp3" "0"\n') 852 self.assertEqual(real_binary, {}) 853 854 def test_binary_readpicture_untyped(self): 855 # length: 16 each 856 expected_chunk1 = ( 857 b"\x01\x02\x00\x03\x04\x00\xFF\x05\x07\x08\x0A\x0F\xF0\xA5\x00\x01" 858 ) 859 expected_chunk2 = ( 860 b"\x0A\x0B\x0C\x0D\x0E\x0F\x10\x1F\x2F\x2D\x33\x0D\x00\x00\x11\x13" 861 ) 862 expected_chunk3 = ( 863 b"\x99\x88\x77\xDD\xD0\xF0\x20\x70\x71\x17\x13\x31\xFF\xFF\xDD\xFF" 864 ) 865 expected_binary = expected_chunk1 + expected_chunk2 + expected_chunk3 866 867 # 3 distinct commands expected 868 self.MPDWillReturnBinary( 869 b"size: 48\nbinary: 16\n" 870 + expected_chunk1 871 + b"\nOK\nsize: 48\nbinary: 16\n" 872 + expected_chunk2 873 + b"\nOK\nsize: 48\nbinary: 16\n" 874 + expected_chunk3 875 + b"\nOK\n" 876 ) 877 878 real_binary = self.client.readpicture("a/full/path.mp3") 879 880 self.assertMPDReceived( 881 b'readpicture "a/full/path.mp3" "0"\nreadpicture "a/full/path.mp3" "16"' 882 b'\nreadpicture "a/full/path.mp3" "32"\n' 883 ) 884 self.assertEqual(real_binary, {"binary": expected_binary}) 885 886 def test_binary_readpicture_typed(self): 887 # length: 16 each 888 expected_binary = bytes(range(48)) 889 890 # 3 distinct commands expected 891 self.MPDWillReturnBinary( 892 b"size: 48\ntype: image/png\nbinary: 16\n" 893 + expected_binary[0:16] 894 + b"\nOK\nsize: 48\ntype: image/png\nbinary: 16\n" 895 + expected_binary[16:32] 896 + b"\nOK\nsize: 48\ntype: image/png\nbinary: 16\n" 897 + expected_binary[32:48] 898 + b"\nOK\n" 899 ) 900 901 real_binary = self.client.readpicture("a/full/path.mp3") 902 903 self.assertMPDReceived( 904 b'readpicture "a/full/path.mp3" "0"\nreadpicture "a/full/path.mp3" "16"' 905 b'\nreadpicture "a/full/path.mp3" "32"\n' 906 ) 907 self.assertEqual(real_binary, {"binary": expected_binary, "type": "image/png"}) 908 909 def test_binary_readpicture_badheaders(self): 910 expected_binary = bytes(range(32)) 911 912 # inconsistent type header from response 1 to response 2 913 # exception is expected 914 self.MPDWillReturnBinary( 915 b"size: 32\ntype: image/jpeg\nbinary: 16\n" 916 + expected_binary[0:16] 917 + b"\nOK\nsize: 32\ntype: image/png\nbinary: 16\n" 918 + expected_binary[16:32] 919 + b"\nOK\n" 920 ) 921 922 self.assertRaises( 923 mpd.CommandError, lambda: self.client.readpicture("song.mp3") 924 ) 925 926 self.assertMPDReceived( 927 b'readpicture "song.mp3" "0"\nreadpicture "song.mp3" "16"\n' 928 ) 929 930 931class MockTransport(object): 932 def __init__(self): 933 self.written = list() 934 935 def clear(self): 936 self.written = list() 937 938 def write(self, data): 939 self.written.append(data) 940 941 942@unittest.skipIf(TWISTED_MISSING, "requires twisted to be installed") 943class TestMPDProtocol(unittest.TestCase): 944 def init_protocol(self, default_idle=True, idle_result=None): 945 self.protocol = mpd.MPDProtocol( 946 default_idle=default_idle, idle_result=idle_result 947 ) 948 self.protocol.transport = MockTransport() 949 950 def test_create_command(self): 951 self.init_protocol(default_idle=False) 952 self.assertEqual(self.protocol._create_command("play"), b"play") 953 self.assertEqual( 954 self.protocol._create_command("rangeid", args=["1", ()]), b'rangeid "1" ":"' 955 ) 956 self.assertEqual( 957 self.protocol._create_command("rangeid", args=["1", (1,)]), 958 b'rangeid "1" "1:"', 959 ) 960 self.assertEqual( 961 self.protocol._create_command("rangeid", args=["1", (1, 2)]), 962 b'rangeid "1" "1:2"', 963 ) 964 965 def test_success(self): 966 self.init_protocol(default_idle=False) 967 968 def success(result): 969 expected = { 970 "file": "Dire Straits - Walk of Life.mp3", 971 "artist": "Dire Straits", 972 "title": "Walk of Life", 973 "genre": "Rock/Pop", 974 "track": "3", 975 "album": "Brothers in Arms", 976 "id": "13", 977 "last-modified": "2016-08-11T10:57:03Z", 978 "pos": "4", 979 "time": "253", 980 } 981 self.assertEqual(expected, result) 982 983 self.protocol.currentsong().addCallback(success) 984 self.assertEqual([b"currentsong\n"], self.protocol.transport.written) 985 986 for line in [ 987 b"file: Dire Straits - Walk of Life.mp3", 988 b"Last-Modified: 2016-08-11T10:57:03Z", 989 b"Time: 253", 990 b"Artist: Dire Straits", 991 b"Title: Walk of Life", 992 b"Album: Brothers in Arms", 993 b"Track: 3", 994 b"Genre: Rock/Pop", 995 b"Pos: 4", 996 b"Id: 13", 997 b"OK", 998 ]: 999 self.protocol.lineReceived(line) 1000 1001 def test_failure(self): 1002 self.init_protocol(default_idle=False) 1003 1004 def error(result): 1005 self.assertIsInstance(result, Failure) 1006 self.assertEqual(result.getErrorMessage(), "[50@0] {load} No such playlist") 1007 1008 self.protocol.load("Foo").addErrback(error) 1009 self.assertEqual([b'load "Foo"\n'], self.protocol.transport.written) 1010 self.protocol.lineReceived(b"ACK [50@0] {load} No such playlist") 1011 1012 def test_default_idle(self): 1013 def idle_result(result): 1014 self.assertEqual(list(result), ["player"]) 1015 1016 self.init_protocol(idle_result=idle_result) 1017 self.protocol.lineReceived(b"OK MPD 0.18.0") 1018 self.assertEqual([b"idle\n"], self.protocol.transport.written) 1019 self.protocol.transport.clear() 1020 self.protocol.lineReceived(b"changed: player") 1021 self.protocol.lineReceived(b"OK") 1022 self.assertEqual([b"idle\n"], self.protocol.transport.written) 1023 1024 def test_noidle_when_default_idle(self): 1025 self.init_protocol() 1026 self.protocol.lineReceived(b"OK MPD 0.18.0") 1027 self.protocol.pause() 1028 self.protocol.lineReceived(b"OK") 1029 self.protocol.lineReceived(b"OK") 1030 self.assertEqual( 1031 [b"idle\n", b"noidle\n", b"pause\n", b"idle\n"], 1032 self.protocol.transport.written, 1033 ) 1034 1035 def test_already_idle(self): 1036 self.init_protocol(default_idle=False) 1037 self.protocol.idle() 1038 self.assertRaises(mpd.CommandError, lambda: self.protocol.idle()) 1039 1040 def test_already_noidle(self): 1041 self.init_protocol(default_idle=False) 1042 self.assertRaises(mpd.CommandError, lambda: self.protocol.noidle()) 1043 1044 def test_command_list(self): 1045 self.init_protocol(default_idle=False) 1046 1047 def success(result): 1048 self.assertEqual([None, None], result) 1049 1050 self.protocol.command_list_ok_begin() 1051 self.protocol.play() 1052 self.protocol.stop() 1053 self.protocol.command_list_end().addCallback(success) 1054 self.assertEqual( 1055 [b"command_list_ok_begin\n", b"play\n", b"stop\n", b"command_list_end\n",], 1056 self.protocol.transport.written, 1057 ) 1058 self.protocol.transport.clear() 1059 self.protocol.lineReceived(b"list_OK") 1060 self.protocol.lineReceived(b"list_OK") 1061 self.protocol.lineReceived(b"OK") 1062 1063 def test_command_list_failure(self): 1064 self.init_protocol(default_idle=False) 1065 1066 def load_command_error(result): 1067 self.assertIsInstance(result, Failure) 1068 self.assertEqual(result.getErrorMessage(), "[50@0] {load} No such playlist") 1069 1070 def command_list_general_error(result): 1071 self.assertIsInstance(result, Failure) 1072 self.assertEqual(result.getErrorMessage(), "An earlier command failed.") 1073 1074 self.protocol.command_list_ok_begin() 1075 self.protocol.load("Foo").addErrback(load_command_error) 1076 self.protocol.play().addErrback(command_list_general_error) 1077 self.protocol.command_list_end().addErrback(load_command_error) 1078 self.assertEqual( 1079 [ 1080 b"command_list_ok_begin\n", 1081 b'load "Foo"\n', 1082 b"play\n", 1083 b"command_list_end\n", 1084 ], 1085 self.protocol.transport.written, 1086 ) 1087 self.protocol.lineReceived(b"ACK [50@0] {load} No such playlist") 1088 1089 def test_command_list_when_default_idle(self): 1090 self.init_protocol() 1091 self.protocol.lineReceived(b"OK MPD 0.18.0") 1092 1093 def success(result): 1094 self.assertEqual([None, None], result) 1095 1096 self.protocol.command_list_ok_begin() 1097 self.protocol.play() 1098 self.protocol.stop() 1099 self.protocol.command_list_end().addCallback(success) 1100 self.assertEqual( 1101 [ 1102 b"idle\n", 1103 b"noidle\n", 1104 b"command_list_ok_begin\n", 1105 b"play\n", 1106 b"stop\n", 1107 b"command_list_end\n", 1108 ], 1109 self.protocol.transport.written, 1110 ) 1111 self.protocol.transport.clear() 1112 self.protocol.lineReceived(b"OK") 1113 self.protocol.lineReceived(b"list_OK") 1114 self.protocol.lineReceived(b"list_OK") 1115 self.protocol.lineReceived(b"OK") 1116 self.assertEqual([b"idle\n"], self.protocol.transport.written) 1117 1118 def test_command_list_failure_when_default_idle(self): 1119 self.init_protocol() 1120 self.protocol.lineReceived(b"OK MPD 0.18.0") 1121 1122 def load_command_error(result): 1123 self.assertIsInstance(result, Failure) 1124 self.assertEqual(result.getErrorMessage(), "[50@0] {load} No such playlist") 1125 1126 def command_list_general_error(result): 1127 self.assertIsInstance(result, Failure) 1128 self.assertEqual(result.getErrorMessage(), "An earlier command failed.") 1129 1130 self.protocol.command_list_ok_begin() 1131 self.protocol.load("Foo").addErrback(load_command_error) 1132 self.protocol.play().addErrback(command_list_general_error) 1133 self.protocol.command_list_end().addErrback(load_command_error) 1134 self.assertEqual( 1135 [ 1136 b"idle\n", 1137 b"noidle\n", 1138 b"command_list_ok_begin\n", 1139 b'load "Foo"\n', 1140 b"play\n", 1141 b"command_list_end\n", 1142 ], 1143 self.protocol.transport.written, 1144 ) 1145 self.protocol.transport.clear() 1146 self.protocol.lineReceived(b"OK") 1147 self.protocol.lineReceived(b"ACK [50@0] {load} No such playlist") 1148 self.assertEqual([b"idle\n"], self.protocol.transport.written) 1149 1150 def test_command_list_item_is_generator(self): 1151 self.init_protocol(default_idle=False) 1152 1153 def success(result): 1154 self.assertEqual( 1155 result, 1156 [ 1157 [ 1158 "Weezer - Say It Ain't So.mp3", 1159 "Dire Straits - Walk of Life.mp3", 1160 "01 - Love Delicatessen.mp3", 1161 "Guns N' Roses - Paradise City.mp3", 1162 ] 1163 ], 1164 ) 1165 1166 self.protocol.command_list_ok_begin() 1167 self.protocol.listplaylist("Foo") 1168 self.protocol.command_list_end().addCallback(success) 1169 self.protocol.lineReceived(b"file: Weezer - Say It Ain't So.mp3") 1170 self.protocol.lineReceived(b"file: Dire Straits - Walk of Life.mp3") 1171 self.protocol.lineReceived(b"file: 01 - Love Delicatessen.mp3") 1172 self.protocol.lineReceived(b"file: Guns N' Roses - Paradise City.mp3") 1173 self.protocol.lineReceived(b"list_OK") 1174 self.protocol.lineReceived(b"OK") 1175 1176 def test_already_in_command_list(self): 1177 self.init_protocol(default_idle=False) 1178 self.protocol.command_list_ok_begin() 1179 self.assertRaises( 1180 mpd.CommandListError, lambda: self.protocol.command_list_ok_begin() 1181 ) 1182 1183 def test_not_in_command_list(self): 1184 self.init_protocol(default_idle=False) 1185 self.assertRaises( 1186 mpd.CommandListError, lambda: self.protocol.command_list_end() 1187 ) 1188 1189 def test_invalid_command_in_command_list(self): 1190 self.init_protocol(default_idle=False) 1191 self.protocol.command_list_ok_begin() 1192 self.assertRaises(mpd.CommandListError, lambda: self.protocol.kill()) 1193 1194 def test_close(self): 1195 self.init_protocol(default_idle=False) 1196 1197 def success(result): 1198 self.assertEqual(result, None) 1199 1200 self.protocol.close().addCallback(success) 1201 1202 1203class AsyncMockServer: 1204 def __init__(self): 1205 self._output = asyncio.Queue() 1206 self._expectations = [] 1207 1208 def get_streams(self): 1209 result = asyncio.Future() 1210 result.set_result((self, self)) 1211 return result 1212 1213 def readline(self): 1214 # directly passing around the awaitable 1215 return self._output.get() 1216 1217 async def readexactly(self, length): 1218 ret = await self._output.get() 1219 if len(ret) != length: 1220 self.error("Mock data is not chuncked in the way the client expects to read it") 1221 return ret 1222 1223 def write(self, data): 1224 try: 1225 next_write = self._expectations[0][0][0] 1226 except IndexError: 1227 self.error("Data written to mock even though none expected: %r" % data) 1228 if next_write == data: 1229 self._expectations[0][0].pop(0) 1230 self._feed() 1231 else: 1232 self.error("Mock got %r, expected %r" % (data, next_write)) 1233 1234 def close(self): 1235 # todo: make sure calls to self.write fail after calling close 1236 pass 1237 1238 def error(self, message): 1239 raise AssertionError(message) 1240 1241 def _feed(self): 1242 if len(self._expectations[0][0]) == 0: 1243 _, response_lines = self._expectations.pop(0) 1244 for l in response_lines: 1245 self._output.put_nowait(l) 1246 1247 def expect_exchange(self, request_lines, response_lines): 1248 self._expectations.append((request_lines, response_lines)) 1249 self._feed() 1250 1251 1252class TestAsyncioMPD(unittest.TestCase): 1253 def init_client(self, odd_hello=None): 1254 self.loop = asyncio.get_event_loop() 1255 1256 self.mockserver = AsyncMockServer() 1257 asyncio.open_connection = mock.MagicMock( 1258 return_value=self.mockserver.get_streams() 1259 ) 1260 1261 if odd_hello is None: 1262 hello_lines = [b"OK MPD mocker\n"] 1263 else: 1264 hello_lines = odd_hello 1265 1266 self.mockserver.expect_exchange([], hello_lines) 1267 1268 self.client = mpd.asyncio.MPDClient() 1269 self._await(self.client.connect(TEST_MPD_HOST, TEST_MPD_PORT, loop=self.loop)) 1270 1271 asyncio.open_connection.assert_called_with( 1272 TEST_MPD_HOST, TEST_MPD_PORT, loop=self.loop 1273 ) 1274 1275 def _await(self, future): 1276 return self.loop.run_until_complete(future) 1277 1278 def test_oddhello(self): 1279 self.assertRaises( 1280 mpd.base.ProtocolError, self.init_client, odd_hello=[b"NOT OK\n"] 1281 ) 1282 1283 @unittest.skipIf( 1284 os.getenv("RUN_SLOW_TESTS") is None, 1285 "This test would add 5 seconds of idling to the run (export RUN_SLOW_TESTS=1 to run anyway)", 1286 ) 1287 def test_noresponse(self): 1288 self.assertRaises(mpd.base.ConnectionError, self.init_client, odd_hello=[]) 1289 1290 def test_status(self): 1291 self.init_client() 1292 1293 self.mockserver.expect_exchange( 1294 [b"status\n"], 1295 [ 1296 b"volume: 70\n", 1297 b"repeat: 0\n", 1298 b"random: 1\n", 1299 b"single: 0\n", 1300 b"consume: 0\n", 1301 b"playlist: 416\n", 1302 b"playlistlength: 7\n", 1303 b"mixrampdb: 0.000000\n", 1304 b"state: play\n", 1305 b"song: 4\n", 1306 b"songid: 19\n", 1307 b"time: 28:403\n", 1308 b"elapsed: 28.003\n", 1309 b"bitrate: 465\n", 1310 b"duration: 403.066\n", 1311 b"audio: 44100:16:2\n", 1312 b"OK\n", 1313 ], 1314 ) 1315 1316 status = self._await(self.client.status()) 1317 self.assertEqual( 1318 status, 1319 { 1320 "audio": "44100:16:2", 1321 "bitrate": "465", 1322 "consume": "0", 1323 "duration": "403.066", 1324 "elapsed": "28.003", 1325 "mixrampdb": "0.000000", 1326 "playlist": "416", 1327 "playlistlength": "7", 1328 "random": "1", 1329 "repeat": "0", 1330 "single": "0", 1331 "song": "4", 1332 "songid": "19", 1333 "state": "play", 1334 "time": "28:403", 1335 "volume": "70", 1336 }, 1337 ) 1338 1339 async def _test_outputs(self): 1340 self.mockserver.expect_exchange( 1341 [b"outputs\n"], 1342 [ 1343 b"outputid: 0\n", 1344 b"outputname: My ALSA Device\n", 1345 b"plugin: alsa\n", 1346 b"outputenabled: 0\n", 1347 b"attribute: dop=0\n", 1348 b"outputid: 1\n", 1349 b"outputname: My FM transmitter\n", 1350 b"plugin: fmradio\n", 1351 b"outputenabled: 1\n", 1352 b"OK\n", 1353 ], 1354 ) 1355 1356 outputs = self.client.outputs() 1357 1358 expected = iter( 1359 [ 1360 { 1361 "outputid": "0", 1362 "outputname": "My ALSA Device", 1363 "plugin": "alsa", 1364 "outputenabled": "0", 1365 "attribute": "dop=0", 1366 }, 1367 { 1368 "outputid": "1", 1369 "outputname": "My FM transmitter", 1370 "plugin": "fmradio", 1371 "outputenabled": "1", 1372 }, 1373 ] 1374 ) 1375 1376 async for o in outputs: 1377 self.assertEqual(o, next(expected)) 1378 self.assertRaises(StopIteration, next, expected) 1379 1380 def test_outputs(self): 1381 self.init_client() 1382 self._await(self._test_outputs()) 1383 1384 async def _test_list(self): 1385 self.mockserver.expect_exchange( 1386 [b'list "album"\n'], [b"Album: first\n", b"Album: second\n", b"OK\n",] 1387 ) 1388 1389 list_ = self.client.list("album") 1390 1391 expected = iter([{"album": "first"}, {"album": "second"},]) 1392 1393 async for o in list_: 1394 self.assertEqual(o, next(expected)) 1395 self.assertRaises(StopIteration, next, expected) 1396 1397 def test_list(self): 1398 self.init_client() 1399 self._await(self._test_list()) 1400 1401 async def _test_albumart(self): 1402 self.mockserver.expect_exchange( 1403 [b'albumart "x.mp3" "0"\n'], 1404 [ 1405 b"size: 32\n", 1406 b"binary: 16\n", 1407 bytes(range(16)), 1408 b"\n", 1409 b"OK\n", 1410 ] 1411 ) 1412 self.mockserver.expect_exchange( 1413 [b'albumart "x.mp3" "16"\n'], 1414 [ 1415 b"size: 32\n", 1416 b"binary: 16\n", 1417 bytes(range(16)), 1418 b"\n", 1419 b"OK\n", 1420 ], 1421 ) 1422 1423 albumart = await self.client.albumart("x.mp3") 1424 1425 expected = {"binary": bytes(range(16)) + bytes(range(16))} 1426 1427 self.assertEqual(albumart, expected) 1428 1429 async def _test_readpicture(self): 1430 self.mockserver.expect_exchange( 1431 [b'readpicture "x.mp3" "0"\n'], 1432 [ 1433 b"size: 32\n", 1434 b"type: image/jpeg\n", 1435 b"binary: 16\n", 1436 bytes(range(16)), 1437 b"\n", 1438 b"OK\n", 1439 ] 1440 ) 1441 self.mockserver.expect_exchange( 1442 [b'readpicture "x.mp3" "16"\n'], 1443 [ 1444 b"size: 32\n", 1445 b"type: image/jpeg\n", 1446 b"binary: 16\n", 1447 bytes(range(16)), 1448 b"\n", 1449 b"OK\n", 1450 ], 1451 ) 1452 1453 art = await self.client.readpicture("x.mp3") 1454 1455 expected = {"binary": bytes(range(16)) + bytes(range(16)), "type": "image/jpeg"} 1456 1457 self.assertEqual(art, expected) 1458 1459 async def _test_readpicture_empty(self): 1460 self.mockserver.expect_exchange( 1461 [b'readpicture "x.mp3" "0"\n'], 1462 [ 1463 b"OK\n", 1464 ] 1465 ) 1466 1467 art = await self.client.readpicture("x.mp3") 1468 1469 expected = {} 1470 1471 self.assertEqual(art, expected) 1472 1473 def test_albumart(self): 1474 self.init_client() 1475 self._await(self._test_albumart()) 1476 1477 def test_readpicture(self): 1478 self.init_client() 1479 self._await(self._test_readpicture()) 1480 1481 def test_readpicture_empty(self): 1482 self.init_client() 1483 self._await(self._test_readpicture_empty()) 1484 1485 def test_mocker(self): 1486 """Does the mock server refuse unexpected writes?""" 1487 self.init_client() 1488 1489 self.mockserver.expect_exchange([b"expecting odd things\n"], [b""]) 1490 self.assertRaises(AssertionError, self._await, self.client.status()) 1491 1492 1493if __name__ == "__main__": 1494 unittest.main() 1495