1# This program is free software; you can redistribute it and/or modify
2# it under the terms of the GNU General Public License as published by
3# the Free Software Foundation; either version 2 of the License, or
4# (at your option) any later version.
5
6import io
7import os
8import shutil
9
10from gi.repository import Gtk
11from gi.repository import GdkPixbuf
12
13from quodlibet.util.cover.http import ApiCoverSourcePlugin
14from quodlibet.util.thread import Cancellable
15from tests import TestCase, mkdtemp, mkstemp, get_data_path
16
17from quodlibet import config
18from quodlibet.plugins import Plugin
19from quodlibet.formats.mp3 import MP3File
20from quodlibet.formats import AudioFile, EmbeddedImage
21from quodlibet.plugins.cover import CoverSourcePlugin
22from quodlibet.util.cover.manager import CoverPluginHandler, CoverManager
23from quodlibet.util.path import path_equal
24
25from .helper import get_temp_copy
26
27
28DUMMY_COVER = io.StringIO()
29
30
31def run_loop():
32    while Gtk.events_pending():
33        Gtk.main_iteration()
34
35
36class DummyCoverSource1(CoverSourcePlugin):
37    @staticmethod
38    def priority():
39        return 0.95
40
41    @property
42    def cover(self):
43        DummyCoverSource1.cover_call = True
44        return None
45
46
47class DummyCoverSource2(CoverSourcePlugin):
48    @staticmethod
49    def priority():
50        return 0.5
51
52    @property
53    def cover(self):
54        DummyCoverSource2.cover_call = True
55        return DUMMY_COVER
56
57    def fetch_cover(self):
58        DummyCoverSource2.fetch_call = True
59        return self.emit('fetch-success', self.cover)
60
61
62class DummyCoverSource3(ApiCoverSourcePlugin):
63    @staticmethod
64    def priority():
65        return 0.3
66
67    @property
68    def cover(self):
69        DummyCoverSource3.cover_call = True
70        return None
71
72    def search(self):
73        return self.emit('search-complete', [{'cover': DUMMY_COVER}])
74
75    def fetch_cover(self):
76        DummyCoverSource3.fetch_call = True
77        return self.emit('fetch-success', DUMMY_COVER)
78
79
80dummy_sources = [Plugin(s) for s in
81                 (DummyCoverSource1, DummyCoverSource2, DummyCoverSource3)]
82
83
84class TCoverManager(TestCase):
85    built_in_count = 2
86
87    def setUp(self):
88        self.manager = CoverManager()
89
90    def test_has_builtin_covers(self):
91        self.assertEqual(len(list(self.manager.sources)), self.built_in_count)
92        manager = CoverPluginHandler(use_built_in=False)
93        self.assertEqual(len(list(manager.sources)), 0)
94
95    def test_only_enabled(self):
96        for source in dummy_sources:
97            self.manager.plugin_handler.plugin_handle(source)
98        self.assertEqual(len(list(self.manager.sources)), self.built_in_count)
99        for source in dummy_sources:
100            self.manager.plugin_handler.plugin_enable(source)
101        self.assertEqual(len(list(self.manager.sources)),
102                         self.built_in_count + len(dummy_sources))
103        for k, source in enumerate(dummy_sources):
104            self.manager.plugin_handler.plugin_disable(source)
105            self.assertEqual(len(list(self.manager.sources)),
106                             self.built_in_count + len(dummy_sources) - k - 1)
107
108    def test_sources_sorted(self):
109        for source in dummy_sources:
110            self.manager.plugin_handler.plugin_handle(source)
111            self.manager.plugin_handler.plugin_enable(source)
112        priorities = [p.priority() for p in self.manager.sources]
113        self.assertSequenceEqual(priorities, sorted(priorities, reverse=True))
114        # Test that sources are sorted even after removing some of the sources
115        for source in dummy_sources:
116            self.manager.plugin_handler.plugin_disable(source)
117            ps = [p.priority() for p in self.manager.sources]
118            self.assertSequenceEqual(ps, sorted(ps, reverse=True))
119
120    def test_acquire_cover_sync(self):
121        song = AudioFile({"~filename": "/dev/null"})
122
123        manager = CoverManager(use_built_in=False)
124        handler = manager.plugin_handler
125        for source in dummy_sources:
126            handler.plugin_handle(source)
127        handler.plugin_enable(dummy_sources[0])
128        self.assertIs(manager.acquire_cover_sync(song), None)
129        handler.plugin_enable(dummy_sources[1])
130        self.assertIs(manager.acquire_cover_sync(song), DUMMY_COVER)
131        handler.plugin_enable(dummy_sources[2])
132        self.assertIs(manager.acquire_cover_sync(song), DUMMY_COVER)
133        handler.plugin_disable(dummy_sources[1])
134        self.assertIs(manager.acquire_cover_sync(song), None)
135
136    def test_acquire_cover(self):
137        manager = CoverManager(use_built_in=False)
138        handler = manager.plugin_handler
139        for source in dummy_sources:
140            handler.plugin_handle(source)
141        handler.plugin_enable(dummy_sources[0])
142        found = []
143        result = []
144
145        def done(_found, _result):
146            found.append(_found)
147            result.append(_result)
148        manager.acquire_cover(done, None, None)
149        run_loop()
150        self.assertFalse(found[0])
151        handler.plugin_enable(dummy_sources[1])
152        manager.acquire_cover(done, None, None)
153        run_loop()
154        self.assertTrue(found[1])
155        self.assertIs(result[1], DUMMY_COVER)
156        handler.plugin_disable(dummy_sources[1])
157        handler.plugin_enable(dummy_sources[2])
158        manager.acquire_cover(done, None, None)
159        run_loop()
160        self.assertTrue(found[2])
161        self.assertIs(result[2], DUMMY_COVER)
162
163    def test_acquire_cover_calls(self):
164        # * fetch_cover shouldn't get called if source provides the cover
165        #   synchronously
166        # * First cover source should fail providing the cover both
167        #   synchronously and asynchronously and only then the next source
168        #   should be used
169        manager = CoverManager(use_built_in=False)
170        handler = manager.plugin_handler
171        found = []
172        result = []
173        for source in dummy_sources:
174            handler.plugin_handle(source)
175            handler.plugin_enable(source)
176            source.cls.cover_call = False
177            source.cls.fetch_call = False
178
179        def done(_found, _result):
180            found.append(_found)
181            result.append(_result)
182        manager.acquire_cover(done, None, None)
183        run_loop()
184        self.assertTrue(found[0])
185        self.assertIs(result[0], DUMMY_COVER)
186        self.assertTrue(dummy_sources[0].cls.cover_call)
187        self.assertTrue(dummy_sources[1].cls.cover_call)
188        self.assertFalse(dummy_sources[2].cls.cover_call)
189        self.assertFalse(dummy_sources[0].cls.fetch_call)
190        self.assertFalse(dummy_sources[1].cls.fetch_call)
191        self.assertFalse(dummy_sources[2].cls.fetch_call)
192        for source in dummy_sources:
193            source.cls.cover_call = False
194            source.cls.fetch_call = False
195        handler.plugin_disable(dummy_sources[1])
196        manager.acquire_cover(done, None, None)
197        run_loop()
198        self.assertTrue(found[1])
199        self.assertIs(result[1], DUMMY_COVER)
200        self.assertTrue(dummy_sources[0].cls.cover_call)
201        self.assertFalse(dummy_sources[1].cls.cover_call)
202        self.assertTrue(dummy_sources[2].cls.cover_call)
203        self.assertFalse(dummy_sources[0].cls.fetch_call)
204        self.assertFalse(dummy_sources[1].cls.fetch_call)
205        self.assertTrue(dummy_sources[2].cls.fetch_call)
206
207    def test_search(self):
208        manager = CoverManager(use_built_in=False)
209        handler = manager.plugin_handler
210        for source in dummy_sources:
211            handler.plugin_handle(source)
212            handler.plugin_enable(source)
213            source.cls.cover_call = False
214            source.cls.fetch_call = False
215
216        song = AudioFile({
217            "~filename": os.path.join("/tmp/asong.ogg"),
218            "album": "Abbey Road",
219            "artist": "The Beatles"
220        })
221        songs = [song]
222        results = []
223
224        def done(manager, provider, result):
225            self.failUnless(result, msg="Shouldn't succeed with no results")
226            results.append(result)
227
228        def finished(manager, songs):
229            print("Finished!")
230
231        manager.connect('covers-found', done)
232        manager.search_cover(Cancellable(), songs)
233        manager.connect('searches-complete', finished)
234        run_loop()
235
236        self.failUnlessEqual(len(results), 1)
237
238    def tearDown(self):
239        pass
240
241
242class TCoverManagerBuiltin(TestCase):
243
244    def setUp(self):
245        config.init()
246
247        self.main = mkdtemp()
248
249        self.dir1 = mkdtemp(dir=self.main)
250        self.dir2 = mkdtemp(dir=self.main)
251
252        h, self.cover1 = mkstemp(".png", dir=self.main)
253        os.close(h)
254        pb = GdkPixbuf.Pixbuf.new(GdkPixbuf.Colorspace.RGB, True, 8, 10, 10)
255        pb.savev(self.cover1, "png", [], [])
256
257        h, self.cover2 = mkstemp(".png", dir=self.main)
258        os.close(h)
259        pb = GdkPixbuf.Pixbuf.new(GdkPixbuf.Colorspace.RGB, True, 8, 20, 20)
260        pb.savev(self.cover2, "png", [], [])
261
262        self.file1 = get_temp_copy(get_data_path('silence-44-s.mp3'))
263        self.file2 = get_temp_copy(get_data_path('silence-44-s.mp3'))
264
265        self.manager = CoverManager()
266
267    def tearDown(self):
268        shutil.rmtree(self.main)
269        config.quit()
270
271    def test_connect_cover_changed(self):
272
273        called_with = []
274
275        def sig_handler(*args):
276            called_with.extend(args)
277
278        obj = object()
279        self.manager.connect("cover-changed", sig_handler)
280        self.manager.cover_changed([obj])
281
282        self.assertEqual(called_with, [self.manager, [obj]])
283
284    def test_get_primary_image(self):
285        self.assertFalse(MP3File(self.file1).has_images)
286        self.assertFalse(MP3File(self.file1).has_images)
287
288    def test_manager(self):
289        self.assertEqual(len(list(self.manager.sources)), 2)
290
291    def test_get_cover_many_prefer_embedded(self):
292        # embed one cover, move one to the other dir
293        MP3File(self.file1).set_image(EmbeddedImage.from_path(self.cover1))
294        os.unlink(self.cover1)
295        self.external_cover = os.path.join(self.dir2, "cover.png")
296        shutil.move(self.cover2, self.external_cover)
297
298        # move one audio file in each dir
299        shutil.move(self.file1, self.dir1)
300        self.file1 = os.path.join(self.dir1, os.path.basename(self.file1))
301        shutil.move(self.file2, self.dir2)
302        self.file2 = os.path.join(self.dir2, os.path.basename(self.file2))
303
304        song1 = MP3File(self.file1)
305        song2 = MP3File(self.file2)
306
307        # each should find a cover
308        self.failUnless(self.is_embedded(self.manager.get_cover(song1)))
309        self.failIf(self.is_embedded(self.manager.get_cover(song2)))
310
311        cover_for = self.manager.get_cover_many
312        # both settings should search both songs before giving up
313        config.set("albumart", "prefer_embedded", True)
314        self.failUnless(self.is_embedded(cover_for([song1, song2])))
315        self.failUnless(self.is_embedded(cover_for([song2, song1])))
316
317        config.set("albumart", "prefer_embedded", False)
318        self.failIf(self.is_embedded(cover_for([song1, song2])))
319        self.failIf(self.is_embedded(cover_for([song2, song1])))
320
321    def is_embedded(self, fileobj):
322        return not path_equal(fileobj.name, self.external_cover, True)
323
324    def test_acquire_prefer_embedded(self):
325        # embed one cover...
326        MP3File(self.file1).set_image(EmbeddedImage.from_path(self.cover1))
327        os.unlink(self.cover1)
328        self.external_cover = os.path.join(self.dir1, "cover.png")
329        # ...and save a different cover externally
330        shutil.copy(self.cover2, self.external_cover)
331
332        shutil.move(self.file1, self.dir1)
333        self.file1 = os.path.join(self.dir1, os.path.basename(self.file1))
334        both_song = MP3File(self.file1)
335
336        results = []
337
338        def acquire(song):
339            def cb(source, result):
340                results.append(result)
341
342            self.manager.acquire_cover(cb, None, song)
343
344        def result_was_embedded():
345            return self.is_embedded(results.pop())
346
347        config.set("albumart", "prefer_embedded", True)
348        acquire(both_song)
349        self.failUnless(result_was_embedded(),
350                        "Embedded image expected due to prefs")
351
352        config.set("albumart", "prefer_embedded", False)
353        acquire(both_song)
354        self.failIf(result_was_embedded(),
355                    "Got an embedded image despite prefs")
356