1# This program is free software; you can redistribute it and/or modify
2# it under the terms of the GNU General Public License as published by
3# the Free Software Foundation; either version 2 of the License, or
4# (at your option) any later version.
5
6from gi.repository import Gtk
7
8import os
9import shutil
10from senf import fsnative
11
12from quodlibet.formats import AudioFileError
13from quodlibet import config
14from quodlibet.util import connect_obj, is_windows
15from quodlibet.formats import AudioFile
16
17from tests import TestCase, get_data_path, mkstemp, mkdtemp, skipIf
18from .helper import capture_output, get_temp_copy
19
20from quodlibet.library.libraries import Library, PicklingMixin, SongLibrary, \
21    FileLibrary, AlbumLibrary, SongFileLibrary, iter_paths
22
23
24class Fake(int):
25    def __init__(self, _):
26        self.key = int(self)
27
28
29def Frange(*args):
30    return list(map(Fake, range(*args)))
31
32
33class FakeSong(Fake):
34    def list(self, tag):
35        # Turn tag_values into a less-than query, for testing.
36        if tag <= self:
37            return []
38        else:
39            return [int(self)]
40
41    def rename(self, newname):
42        self.key = newname
43
44
45class AlbumSong(AudioFile):
46    """A mock AudioFile belong to one of three albums,
47    based on a single number"""
48    def __init__(self, num, album=None):
49        super(AlbumSong, self).__init__()
50        self["~filename"] = fsnative(u"file_%d.mp3" % (num + 1))
51        self["title"] = "Song %d" % (num + 1)
52        self["artist"] = "Fakeman"
53        if album is None:
54            self["album"] = "Album %d" % (num % 3 + 1)
55        else:
56            self["album"] = album
57        self["labelid"] = self["album"]
58
59
60class FakeSongFile(FakeSong):
61    _valid = True
62    _exists = True
63    _mounted = True
64
65    @property
66    def mountpoint(self):
67        return "/" if self._mounted else "/FAKE"
68
69    def valid(self):
70        return self._valid
71
72    def exists(self):
73        return self._exists
74
75    def reload(self):
76        if self._exists:
77            self._valid = True
78        else:
79            raise IOError("doesn't exist")
80
81    def mounted(self):
82        return self._mounted
83
84
85# Custom range functions, to generate lists of song-like objects
86def FSFrange(*args):
87    return list(map(FakeSongFile, range(*args)))
88
89
90def FSrange(*args):
91    return list(map(FakeSong, range(*args)))
92
93
94def ASrange(*args):
95    return list(map(AlbumSong, range(*args)))
96
97
98class TLibrary(TestCase):
99    Fake = Fake
100    Frange = staticmethod(Frange)
101    Library = Library
102
103    def setUp(self):
104        self.library = self.Library()
105        self.added = []
106        self.changed = []
107        self.removed = []
108
109        connect_obj(self.library, 'added', list.extend, self.added)
110        connect_obj(self.library, 'changed', list.extend, self.changed)
111        connect_obj(self.library, 'removed', list.extend, self.removed)
112
113    def test_add(self):
114        self.library.add(self.Frange(12))
115        self.failUnlessEqual(self.added, self.Frange(12))
116        del(self.added[:])
117        self.library.add(self.Frange(12, 24))
118        self.failUnlessEqual(self.added, self.Frange(12, 24))
119
120    def test_remove(self):
121        self.library.add(self.Frange(10))
122        self.assertTrue(self.library.remove(self.Frange(3, 6)))
123        self.failUnlessEqual(self.removed, self.Frange(3, 6))
124
125        # Neither the objects nor their keys should be present.
126        self.failIf(self.Fake(3) in self.library)
127        self.failUnless(self.Fake(6) in self.library)
128        self.failIf(3 in self.library)
129        self.failUnless(6 in self.library)
130
131    def test_remove_when_not_present(self):
132        self.assertFalse(self.library.remove([self.Fake(12)]))
133
134    def test_changed(self):
135        self.library.add(self.Frange(10))
136        self.library.changed(self.Frange(5))
137        while Gtk.events_pending():
138            Gtk.main_iteration()
139        self.failUnlessEqual(self.changed, self.Frange(5))
140
141    def test_changed_not_present(self):
142        self.library.add(self.Frange(10))
143        self.library.changed(self.Frange(2, 20, 3))
144        while Gtk.events_pending():
145            Gtk.main_iteration()
146        self.failUnlessEqual(set(self.changed), {2, 5, 8})
147
148    def test_changed_none_present(self):
149        self.library.changed(self.Frange(5))
150        while Gtk.events_pending():
151            Gtk.main_iteration()
152
153    def test___iter__(self):
154        self.library.add(self.Frange(10))
155        self.failUnlessEqual(sorted(list(self.library)), self.Frange(10))
156
157    def test___iter___empty(self):
158        self.failIf(list(self.library))
159
160    def test___len__(self):
161        self.failUnlessEqual(len(self.library), 0)
162        self.library.add(self.Frange(10))
163        self.failUnlessEqual(len(self.library), 10)
164        self.library.remove(self.Frange(3))
165        self.failUnlessEqual(len(self.library), 7)
166
167    def test___getitem__(self):
168        self.library.add(self.Frange(10))
169        self.failUnlessEqual(self.library[5], 5)
170        new = self.Fake(12)
171        new.key = 100
172        self.library.add([new])
173        self.failUnlessEqual(self.library[100], 12)
174        self.failIf(12 in self.library)
175
176    def test___getitem___not_present(self):
177        self.library.add(self.Frange(10))
178        self.failUnlessRaises(KeyError, self.library.__getitem__, 12)
179
180    def test___contains__(self):
181        self.library.add(self.Frange(10))
182        new = self.Fake(12)
183        new.key = 100
184        self.library.add([new])
185        for value in [0, 1, 2, 6, 9, 100, new]:
186            # 0, 1, 2, 6, 9: all added by self.Frange
187            # 100: key for new
188            # new: is itself present
189            self.failUnless(value in self.library, "didn't find %d" % value)
190
191        for value in [-1, 10, 12, 101]:
192            # -1, 10, 101: boundry values
193            # 12: equal but non-key-equal to new
194            self.failIf(value in self.library, "found %d" % value)
195
196    def test_get(self):
197        self.failUnless(self.library.get(12) is None)
198        self.failUnless(self.library.get(12, "foo") == "foo")
199        new = self.Fake(12)
200        new.key = 100
201        self.library.add([new])
202        self.failUnless(self.library.get(12) is None)
203        self.failUnless(self.library.get(100) is new)
204
205    def test_keys(self):
206        items = []
207        for i in range(20):
208            items.append(self.Fake(i))
209            items[-1].key = i + 100
210        self.library.add(items)
211        self.failUnlessEqual(
212            sorted(self.library.keys()), list(range(100, 120)))
213
214    def test_values(self):
215        items = []
216        for i in range(20):
217            items.append(self.Fake(i))
218            items[-1].key = i + 100
219        self.library.add(items)
220        self.failUnlessEqual(sorted(self.library.values()), list(range(20)))
221
222    def test_items(self):
223        items = []
224        for i in range(20):
225            items.append(self.Fake(i))
226            items[-1].key = i + 100
227        self.library.add(items)
228        expected = list(zip(range(100, 120), range(20)))
229        self.failUnlessEqual(sorted(self.library.items()), expected)
230
231    def test_has_key(self):
232        self.failIf(self.library.has_key(10))
233        new = self.Fake(10)
234        new.key = 20
235        self.library.add([new])
236        self.failIf(self.library.has_key(10))
237        self.failUnless(self.library.has_key(20))
238
239    def tearDown(self):
240        self.library.destroy()
241
242
243class FakeAudioFile(AudioFile):
244
245    def __init__(self, key):
246        self["~filename"] = fsnative(str(key))
247
248
249def FakeAudioFileRange(*args):
250    return list(map(FakeAudioFile, range(*args)))
251
252
253class TPicklingMixin(TestCase):
254
255    class PicklingMockLibrary(PicklingMixin, Library):
256        """A library-like class that implements enough to test PicklingMixin"""
257        def __init__(self):
258            PicklingMixin.__init__(self)
259            self._contents = {}
260            # set up just enough of the library interface to work
261            self.values = self._contents.values
262            self.items = self._contents.items
263
264        def add(self, items):
265            for item in items:
266                self._contents[item.key] = item
267
268    Library = PicklingMockLibrary
269    Frange = staticmethod(FakeAudioFileRange)
270
271    def setUp(self):
272        self.library = self.Library()
273
274    def test_load_noexist(self):
275        fd, filename = mkstemp()
276        os.close(fd)
277        os.unlink(filename)
278        library = self.Library()
279        library.load(filename)
280        assert len(library) == 0
281
282    def test_load_invalid(self):
283        fd, filename = mkstemp()
284        os.write(fd, b"nope")
285        os.close(fd)
286        try:
287            library = self.Library()
288            library.load(filename)
289            assert len(library) == 0
290        finally:
291            os.unlink(filename)
292
293    def test_save_load(self):
294        fd, filename = mkstemp()
295        os.close(fd)
296        try:
297            self.library.add(self.Frange(30))
298            self.library.save(filename)
299
300            library = self.Library()
301            library.load(filename)
302            for (k, v), (k2, v2) in zip(
303                    sorted(self.library.items()), sorted(library.items())):
304                assert k == k2
305                assert v.key == v2.key
306        finally:
307            os.unlink(filename)
308
309
310class TSongLibrary(TLibrary):
311    Fake = FakeSong
312    Frange = staticmethod(FSrange)
313    Library = SongLibrary
314
315    def test_rename_dirty(self):
316        self.library.dirty = False
317        song = self.Fake(10)
318        self.library.add([song])
319        self.failUnless(self.library.dirty)
320        self.library.dirty = False
321        self.library.rename(song, 20)
322        self.failUnless(self.library.dirty)
323
324    def test_rename(self):
325        song = self.Fake(10)
326        self.library.add([song])
327        self.library.rename(song, 20)
328        while Gtk.events_pending():
329            Gtk.main_iteration()
330        self.failUnless(song in self.changed)
331        self.failUnless(song in self.library)
332        self.failUnless(song.key in self.library)
333        self.failUnlessEqual(song.key, 20)
334
335    def test_rename_changed(self):
336        song = self.Fake(10)
337        self.library.add([song])
338        changed = set()
339        self.library.rename(song, 20, changed=changed)
340        self.assertEqual(len(changed), 1)
341        self.assertTrue(song in changed)
342
343    def test_tag_values(self):
344        self.library.add(self.Frange(30))
345        del(self.added[:])
346        self.failUnlessEqual(
347            sorted(self.library.tag_values(10)), list(range(10)))
348        self.failUnlessEqual(sorted(self.library.tag_values(0)), [])
349        self.failIf(self.changed or self.added or self.removed)
350
351
352class TFileLibrary(TLibrary):
353    Fake = FakeSongFile
354    Library = FileLibrary
355
356    def test_mask_invalid_mount_point(self):
357        new = self.Fake(1)
358        self.library.add([new])
359        self.failIf(self.library.masked_mount_points)
360        self.failUnless(len(self.library))
361        self.library.mask("/adsadsafaf")
362        self.failIf(self.library.masked_mount_points)
363        self.library.unmask("/adsadsafaf")
364        self.failIf(self.library.masked_mount_points)
365        self.failUnless(len(self.library))
366
367    def test_mask_basic(self):
368        new = self.Fake(1)
369        self.library.add([new])
370        self.failIf(self.library.masked_mount_points)
371        self.library.mask(new.mountpoint)
372        self.failUnlessEqual(self.library.masked_mount_points,
373                             [new.mountpoint])
374        self.failIf(len(self.library))
375        self.failUnlessEqual(self.library.get_masked(new.mountpoint), [new])
376        self.failUnless(self.library.masked(new))
377        self.library.unmask(new.mountpoint)
378        self.failUnless(len(self.library))
379        self.failUnlessEqual(self.library.get_masked(new.mountpoint), [])
380
381    def test_remove_masked(self):
382        new = self.Fake(1)
383        self.library.add([new])
384        self.library.mask(new.mountpoint)
385        self.failUnless(self.library.masked_mount_points)
386        self.library.remove_masked(new.mountpoint)
387        self.failIf(self.library.masked_mount_points)
388
389    def test_content_masked(self):
390        new = self.Fake(100)
391        new._mounted = False
392        self.failIf(self.library.get_content())
393        self.library._load_init([new])
394        self.failUnless(self.library.masked(new))
395        self.failUnless(self.library.get_content())
396
397    def test_init_masked(self):
398        new = self.Fake(100)
399        new._mounted = False
400        self.library._load_init([new])
401        self.failIf(self.library.items())
402        self.failUnless(self.library.masked(new))
403
404    def test_load_init_nonmasked(self):
405        new = self.Fake(200)
406        new._mounted = True
407        self.library._load_init([new])
408        self.failUnlessEqual(list(self.library.values()), [new])
409
410    def test_reload(self):
411        new = self.Fake(200)
412        self.library.add([new])
413        changed = set()
414        removed = set()
415        self.library.reload(new, changed=changed, removed=removed)
416        self.assertTrue(new in changed)
417        self.assertFalse(removed)
418
419
420class TSongFileLibrary(TSongLibrary):
421    Fake = FakeSongFile
422    Frange = staticmethod(FSFrange)
423    Library = SongFileLibrary
424
425    def test__load_exists_invalid(self):
426        new = self.Fake(100)
427        new._valid = False
428        changed, removed = self.library._load_item(new)
429        self.failIf(removed)
430        self.failUnless(changed)
431        self.failUnless(new._valid)
432        self.failUnless(new in self.library)
433
434    def test__load_not_exists(self):
435        new = self.Fake(100)
436        new._valid = False
437        new._exists = False
438        changed, removed = self.library._load_item(new)
439        self.failIf(removed)
440        self.failIf(changed)
441        self.failIf(new._valid)
442        self.failIf(new in self.library)
443
444    def test__load_error_during_reload(self):
445        try:
446            from quodlibet import util
447            print_exc = util.print_exc
448            util.print_exc = lambda *args, **kwargs: None
449            new = self.Fake(100)
450
451            def error():
452                raise AudioFileError
453            new.reload = error
454            new._valid = False
455            changed, removed = self.library._load_item(new)
456            self.failUnless(removed)
457            self.failIf(changed)
458            self.failIf(new._valid)
459            self.failIf(new in self.library)
460        finally:
461            util.print_exc = print_exc
462
463    def test__load_not_mounted(self):
464        new = self.Fake(100)
465        new._valid = False
466        new._exists = False
467        new._mounted = False
468        changed, removed = self.library._load_item(new)
469        self.failIf(removed)
470        self.failIf(changed)
471        self.failIf(new._valid)
472        self.failIf(new in self.library)
473        self.failUnless(self.library.masked(new))
474
475    def __get_file(self):
476        return get_temp_copy(get_data_path('empty.flac'))
477
478    def test_add_filename(self):
479        config.init()
480        try:
481            filename = self.__get_file()
482            ret = self.library.add_filename(filename)
483            self.failUnless(ret)
484            self.failUnlessEqual(len(self.library), 1)
485            self.failUnlessEqual(len(self.added), 1)
486            ret = self.library.add_filename(filename)
487            self.failUnless(ret)
488            self.failUnlessEqual(len(self.added), 1)
489            os.unlink(filename)
490
491            filename = self.__get_file()
492            ret = self.library.add_filename(filename, add=False)
493            self.failUnless(ret)
494            self.failIf(ret in self.library)
495            self.failUnlessEqual(len(self.added), 1)
496            self.library.add([ret])
497            self.failUnless(ret in self.library)
498            self.failUnlessEqual(len(self.added), 2)
499            self.failUnlessEqual(2, len(self.library))
500            os.unlink(filename)
501
502            with capture_output():
503                ret = self.library.add_filename("")
504            self.failIf(ret)
505            self.failUnlessEqual(len(self.added), 2)
506            self.failUnlessEqual(len(self.library), 2)
507
508        finally:
509            config.quit()
510
511    def test_contains_filename(self):
512        filename = self.__get_file()
513        try:
514            assert not self.library.contains_filename(filename)
515            assert self.library.add_filename(filename, add=False)
516            assert not self.library.contains_filename(filename)
517            assert self.library.add_filename(filename)
518            assert self.library.contains_filename(filename)
519        finally:
520            os.unlink(filename)
521
522    def test_add_filename_normalize_path(self):
523        if not os.name == "nt":
524            return
525
526        config.init()
527        filename = self.__get_file()
528
529        # create a equivalent path different from the original one
530        if filename.upper() == filename:
531            other = filename.lower()
532        else:
533            other = filename.upper()
534
535        song = self.library.add_filename(filename)
536        other_song = self.library.add_filename(other)
537        self.assertTrue(song is other_song)
538        os.unlink(filename)
539        config.quit()
540
541
542class TAlbumLibrary(TestCase):
543    Fake = FakeSong
544    Frange = staticmethod(ASrange)
545    UnderlyingLibrary = Library
546
547    def setUp(self):
548        self.underlying = self.UnderlyingLibrary()
549        self.added = []
550        self.changed = []
551        self.removed = []
552
553        self._sigs = [
554            connect_obj(self.underlying, 'added', list.extend, self.added),
555            connect_obj(self.underlying,
556                'changed', list.extend, self.changed),
557            connect_obj(self.underlying,
558                'removed', list.extend, self.removed),
559        ]
560
561        self.library = AlbumLibrary(self.underlying)
562
563        # Populate for every test
564        self.underlying.add(self.Frange(12))
565
566    def tearDown(self):
567        for s in self._sigs:
568            self.underlying.disconnect(s)
569        self.underlying.destroy()
570        self.library.destroy()
571
572    def test_get(self):
573        key = self.underlying.get("file_1.mp3").album_key
574        self.failUnlessEqual(self.library.get(key).title, "Album 1")
575        album = self.library.get(key)
576        self.failUnlessEqual(album.key, key)
577        self.failUnlessEqual(len(album.songs), 4)
578
579        key = self.underlying.get("file_2.mp3").album_key
580        self.failUnlessEqual(self.library.get(key).title, "Album 2")
581
582    def test_getitem(self):
583        key = self.underlying.get("file_4.mp3").album_key
584        self.failUnlessEqual(self.library[key].key, key)
585
586    def test_keys(self):
587        self.failUnless(len(self.library.keys()), 3)
588
589    def test_has_key(self):
590        key = self.underlying.get("file_1.mp3").album_key
591        self.failUnless(self.library.has_key(key))
592
593    def test_misc_collection(self):
594        self.failUnless(self.library.values())
595
596    def test_items(self):
597        self.failUnlessEqual(len(self.library.items()), 3)
598
599    def test_items_2(self):
600        albums = self.library.values()
601        self.failUnlessEqual(len(albums), 3)
602        songs = self.underlying._contents.values()
603        # Make sure "all the songs' albums" == "all the albums", roughly
604        self.failUnlessEqual({a.key for a in albums},
605                             {s.album_key for s in songs})
606
607    def test_remove(self):
608        key = self.underlying.get("file_1.mp3").album_key
609        songs = self.underlying._contents
610
611        # Remove all songs in Album 1
612        for i in range(1, 12, 3):
613            song = songs["file_%d.mp3" % i]
614            self.underlying.remove([song])
615
616        # Album 1 is all gone...
617        self.failUnlessEqual(self.library.get(key), None)
618
619        # ...but Album 2 is fine
620        key = self.underlying.get("file_2.mp3").album_key
621        album2 = self.library[key]
622        self.failUnlessEqual(album2.key, key)
623        self.failUnlessEqual(len(album2.songs), 4)
624
625    def test_misc(self):
626        # It shouldn't implement FileLibrary etc
627        self.failIf(getattr(self.library, "filename", None))
628
629
630class TAlbumLibrarySignals(TestCase):
631    def setUp(self):
632        lib = SongLibrary()
633        received = []
634
635        def listen(name, items):
636            received.append(name)
637
638        self._sigs = [
639            connect_obj(lib, 'added', listen, 'added'),
640            connect_obj(lib, 'changed', listen, 'changed'),
641            connect_obj(lib, 'removed', listen, 'removed'),
642        ]
643
644        albums = lib.albums
645        self._asigs = [
646            connect_obj(albums, 'added', listen, 'a_added'),
647            connect_obj(albums, 'changed', listen, 'a_changed'),
648            connect_obj(albums, 'removed', listen, 'a_removed'),
649        ]
650
651        self.lib = lib
652        self.albums = albums
653        self.received = received
654
655    def test_add_one(self):
656        self.lib.add([AlbumSong(1)])
657        self.failUnlessEqual(self.received, ["added", "a_added"])
658
659    def test_add_two_same(self):
660        self.lib.add([AlbumSong(1, "a1")])
661        self.lib.add([AlbumSong(5, "a1")])
662        self.failUnlessEqual(self.received,
663            ["added", "a_added", "added", "a_changed"])
664
665    def test_remove(self):
666        songs = [AlbumSong(1, "a1"), AlbumSong(2, "a1"), AlbumSong(4, "a2")]
667        self.lib.add(songs)
668        self.lib.remove(songs[:2])
669        self.failUnlessEqual(self.received,
670            ["added", "a_added", "removed", "a_removed"])
671
672    def test_change(self):
673        songs = [AlbumSong(1, "a1"), AlbumSong(2, "a1"), AlbumSong(4, "a2")]
674        self.lib.add(songs)
675        self.lib.changed(songs)
676        self.failUnlessEqual(self.received,
677            ["added", "a_added", "changed", "a_changed"])
678
679    def tearDown(self):
680        for s in self._asigs:
681            self.albums.disconnect(s)
682        for s in self._sigs:
683            self.lib.disconnect(s)
684        self.lib.destroy()
685
686
687class Titer_paths(TestCase):
688
689    def setUp(self):
690        # on osx the temp folder returned is a symlink
691        self.root = os.path.realpath(mkdtemp())
692
693    def tearDown(self):
694        shutil.rmtree(self.root)
695
696    def test_empty(self):
697        assert list(iter_paths(self.root)) == []
698
699    def test_one_file(self):
700        fd, name = mkstemp(dir=self.root)
701        os.close(fd)
702        assert list(iter_paths(self.root)) == [name]
703
704    def test_one_file_exclude(self):
705        fd, name = mkstemp(dir=self.root)
706        os.close(fd)
707        assert list(iter_paths(self.root, exclude=[self.root])) == []
708        assert list(iter_paths(self.root,
709                               exclude=[os.path.dirname(self.root)])) == []
710        assert list(iter_paths(self.root, exclude=[name])) == []
711        assert list(iter_paths(self.root, exclude=[name + "a"])) == [name]
712
713    @skipIf(is_windows(), "no symlink")
714    def test_with_dir_symlink(self):
715        child = mkdtemp(dir=self.root)
716        link = os.path.join(self.root, "foo")
717        os.symlink(child, link)
718        fd, name = mkstemp(dir=link)
719        os.close(fd)
720
721        assert name not in list(iter_paths(self.root))
722        assert list(iter_paths(link)) == list(iter_paths(child))
723
724        assert list(iter_paths(link, exclude=[link])) == []
725        assert list(iter_paths(child, exclude=[child])) == []
726        assert list(iter_paths(link, exclude=[child])) == []
727
728    @skipIf(is_windows(), "no symlink")
729    def test_with_file(self):
730        fd, name = mkstemp(dir=self.root)
731        os.close(fd)
732        link = os.path.join(self.root, "foo")
733        os.symlink(name, link)
734
735        assert list(iter_paths(self.root)) == [name, name]
736        assert list(iter_paths(self.root, exclude=[link])) == [name]
737        assert list(iter_paths(self.root, exclude=[name])) == []
738
739    def test_hidden_dir(self):
740        child = mkdtemp(dir=self.root, prefix=".")
741        fd, name = mkstemp(dir=child)
742        os.close(fd)
743        assert list(iter_paths(child)) == []
744        assert list(iter_paths(child, skip_hidden=False)) == [name]
745        assert list(iter_paths(self.root)) == []
746        assert list(iter_paths(self.root, skip_hidden=False)) == [name]
747
748    def test_hidden_file(self):
749        fd, name = mkstemp(dir=self.root, prefix=".")
750        os.close(fd)
751
752        assert list(iter_paths(self.root)) == []
753