1# This program is free software; you can redistribute it and/or modify 2# it under the terms of the GNU General Public License as published by 3# the Free Software Foundation; either version 2 of the License, or 4# (at your option) any later version. 5 6from gi.repository import Gtk 7 8import os 9import shutil 10from senf import fsnative 11 12from quodlibet.formats import AudioFileError 13from quodlibet import config 14from quodlibet.util import connect_obj, is_windows 15from quodlibet.formats import AudioFile 16 17from tests import TestCase, get_data_path, mkstemp, mkdtemp, skipIf 18from .helper import capture_output, get_temp_copy 19 20from quodlibet.library.libraries import Library, PicklingMixin, SongLibrary, \ 21 FileLibrary, AlbumLibrary, SongFileLibrary, iter_paths 22 23 24class Fake(int): 25 def __init__(self, _): 26 self.key = int(self) 27 28 29def Frange(*args): 30 return list(map(Fake, range(*args))) 31 32 33class FakeSong(Fake): 34 def list(self, tag): 35 # Turn tag_values into a less-than query, for testing. 36 if tag <= self: 37 return [] 38 else: 39 return [int(self)] 40 41 def rename(self, newname): 42 self.key = newname 43 44 45class AlbumSong(AudioFile): 46 """A mock AudioFile belong to one of three albums, 47 based on a single number""" 48 def __init__(self, num, album=None): 49 super(AlbumSong, self).__init__() 50 self["~filename"] = fsnative(u"file_%d.mp3" % (num + 1)) 51 self["title"] = "Song %d" % (num + 1) 52 self["artist"] = "Fakeman" 53 if album is None: 54 self["album"] = "Album %d" % (num % 3 + 1) 55 else: 56 self["album"] = album 57 self["labelid"] = self["album"] 58 59 60class FakeSongFile(FakeSong): 61 _valid = True 62 _exists = True 63 _mounted = True 64 65 @property 66 def mountpoint(self): 67 return "/" if self._mounted else "/FAKE" 68 69 def valid(self): 70 return self._valid 71 72 def exists(self): 73 return self._exists 74 75 def reload(self): 76 if self._exists: 77 self._valid = True 78 else: 79 raise IOError("doesn't exist") 80 81 def mounted(self): 82 return self._mounted 83 84 85# Custom range functions, to generate lists of song-like objects 86def FSFrange(*args): 87 return list(map(FakeSongFile, range(*args))) 88 89 90def FSrange(*args): 91 return list(map(FakeSong, range(*args))) 92 93 94def ASrange(*args): 95 return list(map(AlbumSong, range(*args))) 96 97 98class TLibrary(TestCase): 99 Fake = Fake 100 Frange = staticmethod(Frange) 101 Library = Library 102 103 def setUp(self): 104 self.library = self.Library() 105 self.added = [] 106 self.changed = [] 107 self.removed = [] 108 109 connect_obj(self.library, 'added', list.extend, self.added) 110 connect_obj(self.library, 'changed', list.extend, self.changed) 111 connect_obj(self.library, 'removed', list.extend, self.removed) 112 113 def test_add(self): 114 self.library.add(self.Frange(12)) 115 self.failUnlessEqual(self.added, self.Frange(12)) 116 del(self.added[:]) 117 self.library.add(self.Frange(12, 24)) 118 self.failUnlessEqual(self.added, self.Frange(12, 24)) 119 120 def test_remove(self): 121 self.library.add(self.Frange(10)) 122 self.assertTrue(self.library.remove(self.Frange(3, 6))) 123 self.failUnlessEqual(self.removed, self.Frange(3, 6)) 124 125 # Neither the objects nor their keys should be present. 126 self.failIf(self.Fake(3) in self.library) 127 self.failUnless(self.Fake(6) in self.library) 128 self.failIf(3 in self.library) 129 self.failUnless(6 in self.library) 130 131 def test_remove_when_not_present(self): 132 self.assertFalse(self.library.remove([self.Fake(12)])) 133 134 def test_changed(self): 135 self.library.add(self.Frange(10)) 136 self.library.changed(self.Frange(5)) 137 while Gtk.events_pending(): 138 Gtk.main_iteration() 139 self.failUnlessEqual(self.changed, self.Frange(5)) 140 141 def test_changed_not_present(self): 142 self.library.add(self.Frange(10)) 143 self.library.changed(self.Frange(2, 20, 3)) 144 while Gtk.events_pending(): 145 Gtk.main_iteration() 146 self.failUnlessEqual(set(self.changed), {2, 5, 8}) 147 148 def test_changed_none_present(self): 149 self.library.changed(self.Frange(5)) 150 while Gtk.events_pending(): 151 Gtk.main_iteration() 152 153 def test___iter__(self): 154 self.library.add(self.Frange(10)) 155 self.failUnlessEqual(sorted(list(self.library)), self.Frange(10)) 156 157 def test___iter___empty(self): 158 self.failIf(list(self.library)) 159 160 def test___len__(self): 161 self.failUnlessEqual(len(self.library), 0) 162 self.library.add(self.Frange(10)) 163 self.failUnlessEqual(len(self.library), 10) 164 self.library.remove(self.Frange(3)) 165 self.failUnlessEqual(len(self.library), 7) 166 167 def test___getitem__(self): 168 self.library.add(self.Frange(10)) 169 self.failUnlessEqual(self.library[5], 5) 170 new = self.Fake(12) 171 new.key = 100 172 self.library.add([new]) 173 self.failUnlessEqual(self.library[100], 12) 174 self.failIf(12 in self.library) 175 176 def test___getitem___not_present(self): 177 self.library.add(self.Frange(10)) 178 self.failUnlessRaises(KeyError, self.library.__getitem__, 12) 179 180 def test___contains__(self): 181 self.library.add(self.Frange(10)) 182 new = self.Fake(12) 183 new.key = 100 184 self.library.add([new]) 185 for value in [0, 1, 2, 6, 9, 100, new]: 186 # 0, 1, 2, 6, 9: all added by self.Frange 187 # 100: key for new 188 # new: is itself present 189 self.failUnless(value in self.library, "didn't find %d" % value) 190 191 for value in [-1, 10, 12, 101]: 192 # -1, 10, 101: boundry values 193 # 12: equal but non-key-equal to new 194 self.failIf(value in self.library, "found %d" % value) 195 196 def test_get(self): 197 self.failUnless(self.library.get(12) is None) 198 self.failUnless(self.library.get(12, "foo") == "foo") 199 new = self.Fake(12) 200 new.key = 100 201 self.library.add([new]) 202 self.failUnless(self.library.get(12) is None) 203 self.failUnless(self.library.get(100) is new) 204 205 def test_keys(self): 206 items = [] 207 for i in range(20): 208 items.append(self.Fake(i)) 209 items[-1].key = i + 100 210 self.library.add(items) 211 self.failUnlessEqual( 212 sorted(self.library.keys()), list(range(100, 120))) 213 214 def test_values(self): 215 items = [] 216 for i in range(20): 217 items.append(self.Fake(i)) 218 items[-1].key = i + 100 219 self.library.add(items) 220 self.failUnlessEqual(sorted(self.library.values()), list(range(20))) 221 222 def test_items(self): 223 items = [] 224 for i in range(20): 225 items.append(self.Fake(i)) 226 items[-1].key = i + 100 227 self.library.add(items) 228 expected = list(zip(range(100, 120), range(20))) 229 self.failUnlessEqual(sorted(self.library.items()), expected) 230 231 def test_has_key(self): 232 self.failIf(self.library.has_key(10)) 233 new = self.Fake(10) 234 new.key = 20 235 self.library.add([new]) 236 self.failIf(self.library.has_key(10)) 237 self.failUnless(self.library.has_key(20)) 238 239 def tearDown(self): 240 self.library.destroy() 241 242 243class FakeAudioFile(AudioFile): 244 245 def __init__(self, key): 246 self["~filename"] = fsnative(str(key)) 247 248 249def FakeAudioFileRange(*args): 250 return list(map(FakeAudioFile, range(*args))) 251 252 253class TPicklingMixin(TestCase): 254 255 class PicklingMockLibrary(PicklingMixin, Library): 256 """A library-like class that implements enough to test PicklingMixin""" 257 def __init__(self): 258 PicklingMixin.__init__(self) 259 self._contents = {} 260 # set up just enough of the library interface to work 261 self.values = self._contents.values 262 self.items = self._contents.items 263 264 def add(self, items): 265 for item in items: 266 self._contents[item.key] = item 267 268 Library = PicklingMockLibrary 269 Frange = staticmethod(FakeAudioFileRange) 270 271 def setUp(self): 272 self.library = self.Library() 273 274 def test_load_noexist(self): 275 fd, filename = mkstemp() 276 os.close(fd) 277 os.unlink(filename) 278 library = self.Library() 279 library.load(filename) 280 assert len(library) == 0 281 282 def test_load_invalid(self): 283 fd, filename = mkstemp() 284 os.write(fd, b"nope") 285 os.close(fd) 286 try: 287 library = self.Library() 288 library.load(filename) 289 assert len(library) == 0 290 finally: 291 os.unlink(filename) 292 293 def test_save_load(self): 294 fd, filename = mkstemp() 295 os.close(fd) 296 try: 297 self.library.add(self.Frange(30)) 298 self.library.save(filename) 299 300 library = self.Library() 301 library.load(filename) 302 for (k, v), (k2, v2) in zip( 303 sorted(self.library.items()), sorted(library.items())): 304 assert k == k2 305 assert v.key == v2.key 306 finally: 307 os.unlink(filename) 308 309 310class TSongLibrary(TLibrary): 311 Fake = FakeSong 312 Frange = staticmethod(FSrange) 313 Library = SongLibrary 314 315 def test_rename_dirty(self): 316 self.library.dirty = False 317 song = self.Fake(10) 318 self.library.add([song]) 319 self.failUnless(self.library.dirty) 320 self.library.dirty = False 321 self.library.rename(song, 20) 322 self.failUnless(self.library.dirty) 323 324 def test_rename(self): 325 song = self.Fake(10) 326 self.library.add([song]) 327 self.library.rename(song, 20) 328 while Gtk.events_pending(): 329 Gtk.main_iteration() 330 self.failUnless(song in self.changed) 331 self.failUnless(song in self.library) 332 self.failUnless(song.key in self.library) 333 self.failUnlessEqual(song.key, 20) 334 335 def test_rename_changed(self): 336 song = self.Fake(10) 337 self.library.add([song]) 338 changed = set() 339 self.library.rename(song, 20, changed=changed) 340 self.assertEqual(len(changed), 1) 341 self.assertTrue(song in changed) 342 343 def test_tag_values(self): 344 self.library.add(self.Frange(30)) 345 del(self.added[:]) 346 self.failUnlessEqual( 347 sorted(self.library.tag_values(10)), list(range(10))) 348 self.failUnlessEqual(sorted(self.library.tag_values(0)), []) 349 self.failIf(self.changed or self.added or self.removed) 350 351 352class TFileLibrary(TLibrary): 353 Fake = FakeSongFile 354 Library = FileLibrary 355 356 def test_mask_invalid_mount_point(self): 357 new = self.Fake(1) 358 self.library.add([new]) 359 self.failIf(self.library.masked_mount_points) 360 self.failUnless(len(self.library)) 361 self.library.mask("/adsadsafaf") 362 self.failIf(self.library.masked_mount_points) 363 self.library.unmask("/adsadsafaf") 364 self.failIf(self.library.masked_mount_points) 365 self.failUnless(len(self.library)) 366 367 def test_mask_basic(self): 368 new = self.Fake(1) 369 self.library.add([new]) 370 self.failIf(self.library.masked_mount_points) 371 self.library.mask(new.mountpoint) 372 self.failUnlessEqual(self.library.masked_mount_points, 373 [new.mountpoint]) 374 self.failIf(len(self.library)) 375 self.failUnlessEqual(self.library.get_masked(new.mountpoint), [new]) 376 self.failUnless(self.library.masked(new)) 377 self.library.unmask(new.mountpoint) 378 self.failUnless(len(self.library)) 379 self.failUnlessEqual(self.library.get_masked(new.mountpoint), []) 380 381 def test_remove_masked(self): 382 new = self.Fake(1) 383 self.library.add([new]) 384 self.library.mask(new.mountpoint) 385 self.failUnless(self.library.masked_mount_points) 386 self.library.remove_masked(new.mountpoint) 387 self.failIf(self.library.masked_mount_points) 388 389 def test_content_masked(self): 390 new = self.Fake(100) 391 new._mounted = False 392 self.failIf(self.library.get_content()) 393 self.library._load_init([new]) 394 self.failUnless(self.library.masked(new)) 395 self.failUnless(self.library.get_content()) 396 397 def test_init_masked(self): 398 new = self.Fake(100) 399 new._mounted = False 400 self.library._load_init([new]) 401 self.failIf(self.library.items()) 402 self.failUnless(self.library.masked(new)) 403 404 def test_load_init_nonmasked(self): 405 new = self.Fake(200) 406 new._mounted = True 407 self.library._load_init([new]) 408 self.failUnlessEqual(list(self.library.values()), [new]) 409 410 def test_reload(self): 411 new = self.Fake(200) 412 self.library.add([new]) 413 changed = set() 414 removed = set() 415 self.library.reload(new, changed=changed, removed=removed) 416 self.assertTrue(new in changed) 417 self.assertFalse(removed) 418 419 420class TSongFileLibrary(TSongLibrary): 421 Fake = FakeSongFile 422 Frange = staticmethod(FSFrange) 423 Library = SongFileLibrary 424 425 def test__load_exists_invalid(self): 426 new = self.Fake(100) 427 new._valid = False 428 changed, removed = self.library._load_item(new) 429 self.failIf(removed) 430 self.failUnless(changed) 431 self.failUnless(new._valid) 432 self.failUnless(new in self.library) 433 434 def test__load_not_exists(self): 435 new = self.Fake(100) 436 new._valid = False 437 new._exists = False 438 changed, removed = self.library._load_item(new) 439 self.failIf(removed) 440 self.failIf(changed) 441 self.failIf(new._valid) 442 self.failIf(new in self.library) 443 444 def test__load_error_during_reload(self): 445 try: 446 from quodlibet import util 447 print_exc = util.print_exc 448 util.print_exc = lambda *args, **kwargs: None 449 new = self.Fake(100) 450 451 def error(): 452 raise AudioFileError 453 new.reload = error 454 new._valid = False 455 changed, removed = self.library._load_item(new) 456 self.failUnless(removed) 457 self.failIf(changed) 458 self.failIf(new._valid) 459 self.failIf(new in self.library) 460 finally: 461 util.print_exc = print_exc 462 463 def test__load_not_mounted(self): 464 new = self.Fake(100) 465 new._valid = False 466 new._exists = False 467 new._mounted = False 468 changed, removed = self.library._load_item(new) 469 self.failIf(removed) 470 self.failIf(changed) 471 self.failIf(new._valid) 472 self.failIf(new in self.library) 473 self.failUnless(self.library.masked(new)) 474 475 def __get_file(self): 476 return get_temp_copy(get_data_path('empty.flac')) 477 478 def test_add_filename(self): 479 config.init() 480 try: 481 filename = self.__get_file() 482 ret = self.library.add_filename(filename) 483 self.failUnless(ret) 484 self.failUnlessEqual(len(self.library), 1) 485 self.failUnlessEqual(len(self.added), 1) 486 ret = self.library.add_filename(filename) 487 self.failUnless(ret) 488 self.failUnlessEqual(len(self.added), 1) 489 os.unlink(filename) 490 491 filename = self.__get_file() 492 ret = self.library.add_filename(filename, add=False) 493 self.failUnless(ret) 494 self.failIf(ret in self.library) 495 self.failUnlessEqual(len(self.added), 1) 496 self.library.add([ret]) 497 self.failUnless(ret in self.library) 498 self.failUnlessEqual(len(self.added), 2) 499 self.failUnlessEqual(2, len(self.library)) 500 os.unlink(filename) 501 502 with capture_output(): 503 ret = self.library.add_filename("") 504 self.failIf(ret) 505 self.failUnlessEqual(len(self.added), 2) 506 self.failUnlessEqual(len(self.library), 2) 507 508 finally: 509 config.quit() 510 511 def test_contains_filename(self): 512 filename = self.__get_file() 513 try: 514 assert not self.library.contains_filename(filename) 515 assert self.library.add_filename(filename, add=False) 516 assert not self.library.contains_filename(filename) 517 assert self.library.add_filename(filename) 518 assert self.library.contains_filename(filename) 519 finally: 520 os.unlink(filename) 521 522 def test_add_filename_normalize_path(self): 523 if not os.name == "nt": 524 return 525 526 config.init() 527 filename = self.__get_file() 528 529 # create a equivalent path different from the original one 530 if filename.upper() == filename: 531 other = filename.lower() 532 else: 533 other = filename.upper() 534 535 song = self.library.add_filename(filename) 536 other_song = self.library.add_filename(other) 537 self.assertTrue(song is other_song) 538 os.unlink(filename) 539 config.quit() 540 541 542class TAlbumLibrary(TestCase): 543 Fake = FakeSong 544 Frange = staticmethod(ASrange) 545 UnderlyingLibrary = Library 546 547 def setUp(self): 548 self.underlying = self.UnderlyingLibrary() 549 self.added = [] 550 self.changed = [] 551 self.removed = [] 552 553 self._sigs = [ 554 connect_obj(self.underlying, 'added', list.extend, self.added), 555 connect_obj(self.underlying, 556 'changed', list.extend, self.changed), 557 connect_obj(self.underlying, 558 'removed', list.extend, self.removed), 559 ] 560 561 self.library = AlbumLibrary(self.underlying) 562 563 # Populate for every test 564 self.underlying.add(self.Frange(12)) 565 566 def tearDown(self): 567 for s in self._sigs: 568 self.underlying.disconnect(s) 569 self.underlying.destroy() 570 self.library.destroy() 571 572 def test_get(self): 573 key = self.underlying.get("file_1.mp3").album_key 574 self.failUnlessEqual(self.library.get(key).title, "Album 1") 575 album = self.library.get(key) 576 self.failUnlessEqual(album.key, key) 577 self.failUnlessEqual(len(album.songs), 4) 578 579 key = self.underlying.get("file_2.mp3").album_key 580 self.failUnlessEqual(self.library.get(key).title, "Album 2") 581 582 def test_getitem(self): 583 key = self.underlying.get("file_4.mp3").album_key 584 self.failUnlessEqual(self.library[key].key, key) 585 586 def test_keys(self): 587 self.failUnless(len(self.library.keys()), 3) 588 589 def test_has_key(self): 590 key = self.underlying.get("file_1.mp3").album_key 591 self.failUnless(self.library.has_key(key)) 592 593 def test_misc_collection(self): 594 self.failUnless(self.library.values()) 595 596 def test_items(self): 597 self.failUnlessEqual(len(self.library.items()), 3) 598 599 def test_items_2(self): 600 albums = self.library.values() 601 self.failUnlessEqual(len(albums), 3) 602 songs = self.underlying._contents.values() 603 # Make sure "all the songs' albums" == "all the albums", roughly 604 self.failUnlessEqual({a.key for a in albums}, 605 {s.album_key for s in songs}) 606 607 def test_remove(self): 608 key = self.underlying.get("file_1.mp3").album_key 609 songs = self.underlying._contents 610 611 # Remove all songs in Album 1 612 for i in range(1, 12, 3): 613 song = songs["file_%d.mp3" % i] 614 self.underlying.remove([song]) 615 616 # Album 1 is all gone... 617 self.failUnlessEqual(self.library.get(key), None) 618 619 # ...but Album 2 is fine 620 key = self.underlying.get("file_2.mp3").album_key 621 album2 = self.library[key] 622 self.failUnlessEqual(album2.key, key) 623 self.failUnlessEqual(len(album2.songs), 4) 624 625 def test_misc(self): 626 # It shouldn't implement FileLibrary etc 627 self.failIf(getattr(self.library, "filename", None)) 628 629 630class TAlbumLibrarySignals(TestCase): 631 def setUp(self): 632 lib = SongLibrary() 633 received = [] 634 635 def listen(name, items): 636 received.append(name) 637 638 self._sigs = [ 639 connect_obj(lib, 'added', listen, 'added'), 640 connect_obj(lib, 'changed', listen, 'changed'), 641 connect_obj(lib, 'removed', listen, 'removed'), 642 ] 643 644 albums = lib.albums 645 self._asigs = [ 646 connect_obj(albums, 'added', listen, 'a_added'), 647 connect_obj(albums, 'changed', listen, 'a_changed'), 648 connect_obj(albums, 'removed', listen, 'a_removed'), 649 ] 650 651 self.lib = lib 652 self.albums = albums 653 self.received = received 654 655 def test_add_one(self): 656 self.lib.add([AlbumSong(1)]) 657 self.failUnlessEqual(self.received, ["added", "a_added"]) 658 659 def test_add_two_same(self): 660 self.lib.add([AlbumSong(1, "a1")]) 661 self.lib.add([AlbumSong(5, "a1")]) 662 self.failUnlessEqual(self.received, 663 ["added", "a_added", "added", "a_changed"]) 664 665 def test_remove(self): 666 songs = [AlbumSong(1, "a1"), AlbumSong(2, "a1"), AlbumSong(4, "a2")] 667 self.lib.add(songs) 668 self.lib.remove(songs[:2]) 669 self.failUnlessEqual(self.received, 670 ["added", "a_added", "removed", "a_removed"]) 671 672 def test_change(self): 673 songs = [AlbumSong(1, "a1"), AlbumSong(2, "a1"), AlbumSong(4, "a2")] 674 self.lib.add(songs) 675 self.lib.changed(songs) 676 self.failUnlessEqual(self.received, 677 ["added", "a_added", "changed", "a_changed"]) 678 679 def tearDown(self): 680 for s in self._asigs: 681 self.albums.disconnect(s) 682 for s in self._sigs: 683 self.lib.disconnect(s) 684 self.lib.destroy() 685 686 687class Titer_paths(TestCase): 688 689 def setUp(self): 690 # on osx the temp folder returned is a symlink 691 self.root = os.path.realpath(mkdtemp()) 692 693 def tearDown(self): 694 shutil.rmtree(self.root) 695 696 def test_empty(self): 697 assert list(iter_paths(self.root)) == [] 698 699 def test_one_file(self): 700 fd, name = mkstemp(dir=self.root) 701 os.close(fd) 702 assert list(iter_paths(self.root)) == [name] 703 704 def test_one_file_exclude(self): 705 fd, name = mkstemp(dir=self.root) 706 os.close(fd) 707 assert list(iter_paths(self.root, exclude=[self.root])) == [] 708 assert list(iter_paths(self.root, 709 exclude=[os.path.dirname(self.root)])) == [] 710 assert list(iter_paths(self.root, exclude=[name])) == [] 711 assert list(iter_paths(self.root, exclude=[name + "a"])) == [name] 712 713 @skipIf(is_windows(), "no symlink") 714 def test_with_dir_symlink(self): 715 child = mkdtemp(dir=self.root) 716 link = os.path.join(self.root, "foo") 717 os.symlink(child, link) 718 fd, name = mkstemp(dir=link) 719 os.close(fd) 720 721 assert name not in list(iter_paths(self.root)) 722 assert list(iter_paths(link)) == list(iter_paths(child)) 723 724 assert list(iter_paths(link, exclude=[link])) == [] 725 assert list(iter_paths(child, exclude=[child])) == [] 726 assert list(iter_paths(link, exclude=[child])) == [] 727 728 @skipIf(is_windows(), "no symlink") 729 def test_with_file(self): 730 fd, name = mkstemp(dir=self.root) 731 os.close(fd) 732 link = os.path.join(self.root, "foo") 733 os.symlink(name, link) 734 735 assert list(iter_paths(self.root)) == [name, name] 736 assert list(iter_paths(self.root, exclude=[link])) == [name] 737 assert list(iter_paths(self.root, exclude=[name])) == [] 738 739 def test_hidden_dir(self): 740 child = mkdtemp(dir=self.root, prefix=".") 741 fd, name = mkstemp(dir=child) 742 os.close(fd) 743 assert list(iter_paths(child)) == [] 744 assert list(iter_paths(child, skip_hidden=False)) == [name] 745 assert list(iter_paths(self.root)) == [] 746 assert list(iter_paths(self.root, skip_hidden=False)) == [name] 747 748 def test_hidden_file(self): 749 fd, name = mkstemp(dir=self.root, prefix=".") 750 os.close(fd) 751 752 assert list(iter_paths(self.root)) == [] 753