1# This program is free software; you can redistribute it and/or modify 2# it under the terms of the GNU General Public License as published by 3# the Free Software Foundation; either version 2 of the License, or 4# (at your option) any later version. 5 6import io 7import os 8import shutil 9 10from gi.repository import Gtk 11from gi.repository import GdkPixbuf 12 13from quodlibet.util.cover.http import ApiCoverSourcePlugin 14from quodlibet.util.thread import Cancellable 15from tests import TestCase, mkdtemp, mkstemp, get_data_path 16 17from quodlibet import config 18from quodlibet.plugins import Plugin 19from quodlibet.formats.mp3 import MP3File 20from quodlibet.formats import AudioFile, EmbeddedImage 21from quodlibet.plugins.cover import CoverSourcePlugin 22from quodlibet.util.cover.manager import CoverPluginHandler, CoverManager 23from quodlibet.util.path import path_equal 24 25from .helper import get_temp_copy 26 27 28DUMMY_COVER = io.StringIO() 29 30 31def run_loop(): 32 while Gtk.events_pending(): 33 Gtk.main_iteration() 34 35 36class DummyCoverSource1(CoverSourcePlugin): 37 @staticmethod 38 def priority(): 39 return 0.95 40 41 @property 42 def cover(self): 43 DummyCoverSource1.cover_call = True 44 return None 45 46 47class DummyCoverSource2(CoverSourcePlugin): 48 @staticmethod 49 def priority(): 50 return 0.5 51 52 @property 53 def cover(self): 54 DummyCoverSource2.cover_call = True 55 return DUMMY_COVER 56 57 def fetch_cover(self): 58 DummyCoverSource2.fetch_call = True 59 return self.emit('fetch-success', self.cover) 60 61 62class DummyCoverSource3(ApiCoverSourcePlugin): 63 @staticmethod 64 def priority(): 65 return 0.3 66 67 @property 68 def cover(self): 69 DummyCoverSource3.cover_call = True 70 return None 71 72 def search(self): 73 return self.emit('search-complete', [{'cover': DUMMY_COVER}]) 74 75 def fetch_cover(self): 76 DummyCoverSource3.fetch_call = True 77 return self.emit('fetch-success', DUMMY_COVER) 78 79 80dummy_sources = [Plugin(s) for s in 81 (DummyCoverSource1, DummyCoverSource2, DummyCoverSource3)] 82 83 84class TCoverManager(TestCase): 85 built_in_count = 2 86 87 def setUp(self): 88 self.manager = CoverManager() 89 90 def test_has_builtin_covers(self): 91 self.assertEqual(len(list(self.manager.sources)), self.built_in_count) 92 manager = CoverPluginHandler(use_built_in=False) 93 self.assertEqual(len(list(manager.sources)), 0) 94 95 def test_only_enabled(self): 96 for source in dummy_sources: 97 self.manager.plugin_handler.plugin_handle(source) 98 self.assertEqual(len(list(self.manager.sources)), self.built_in_count) 99 for source in dummy_sources: 100 self.manager.plugin_handler.plugin_enable(source) 101 self.assertEqual(len(list(self.manager.sources)), 102 self.built_in_count + len(dummy_sources)) 103 for k, source in enumerate(dummy_sources): 104 self.manager.plugin_handler.plugin_disable(source) 105 self.assertEqual(len(list(self.manager.sources)), 106 self.built_in_count + len(dummy_sources) - k - 1) 107 108 def test_sources_sorted(self): 109 for source in dummy_sources: 110 self.manager.plugin_handler.plugin_handle(source) 111 self.manager.plugin_handler.plugin_enable(source) 112 priorities = [p.priority() for p in self.manager.sources] 113 self.assertSequenceEqual(priorities, sorted(priorities, reverse=True)) 114 # Test that sources are sorted even after removing some of the sources 115 for source in dummy_sources: 116 self.manager.plugin_handler.plugin_disable(source) 117 ps = [p.priority() for p in self.manager.sources] 118 self.assertSequenceEqual(ps, sorted(ps, reverse=True)) 119 120 def test_acquire_cover_sync(self): 121 song = AudioFile({"~filename": "/dev/null"}) 122 123 manager = CoverManager(use_built_in=False) 124 handler = manager.plugin_handler 125 for source in dummy_sources: 126 handler.plugin_handle(source) 127 handler.plugin_enable(dummy_sources[0]) 128 self.assertIs(manager.acquire_cover_sync(song), None) 129 handler.plugin_enable(dummy_sources[1]) 130 self.assertIs(manager.acquire_cover_sync(song), DUMMY_COVER) 131 handler.plugin_enable(dummy_sources[2]) 132 self.assertIs(manager.acquire_cover_sync(song), DUMMY_COVER) 133 handler.plugin_disable(dummy_sources[1]) 134 self.assertIs(manager.acquire_cover_sync(song), None) 135 136 def test_acquire_cover(self): 137 manager = CoverManager(use_built_in=False) 138 handler = manager.plugin_handler 139 for source in dummy_sources: 140 handler.plugin_handle(source) 141 handler.plugin_enable(dummy_sources[0]) 142 found = [] 143 result = [] 144 145 def done(_found, _result): 146 found.append(_found) 147 result.append(_result) 148 manager.acquire_cover(done, None, None) 149 run_loop() 150 self.assertFalse(found[0]) 151 handler.plugin_enable(dummy_sources[1]) 152 manager.acquire_cover(done, None, None) 153 run_loop() 154 self.assertTrue(found[1]) 155 self.assertIs(result[1], DUMMY_COVER) 156 handler.plugin_disable(dummy_sources[1]) 157 handler.plugin_enable(dummy_sources[2]) 158 manager.acquire_cover(done, None, None) 159 run_loop() 160 self.assertTrue(found[2]) 161 self.assertIs(result[2], DUMMY_COVER) 162 163 def test_acquire_cover_calls(self): 164 # * fetch_cover shouldn't get called if source provides the cover 165 # synchronously 166 # * First cover source should fail providing the cover both 167 # synchronously and asynchronously and only then the next source 168 # should be used 169 manager = CoverManager(use_built_in=False) 170 handler = manager.plugin_handler 171 found = [] 172 result = [] 173 for source in dummy_sources: 174 handler.plugin_handle(source) 175 handler.plugin_enable(source) 176 source.cls.cover_call = False 177 source.cls.fetch_call = False 178 179 def done(_found, _result): 180 found.append(_found) 181 result.append(_result) 182 manager.acquire_cover(done, None, None) 183 run_loop() 184 self.assertTrue(found[0]) 185 self.assertIs(result[0], DUMMY_COVER) 186 self.assertTrue(dummy_sources[0].cls.cover_call) 187 self.assertTrue(dummy_sources[1].cls.cover_call) 188 self.assertFalse(dummy_sources[2].cls.cover_call) 189 self.assertFalse(dummy_sources[0].cls.fetch_call) 190 self.assertFalse(dummy_sources[1].cls.fetch_call) 191 self.assertFalse(dummy_sources[2].cls.fetch_call) 192 for source in dummy_sources: 193 source.cls.cover_call = False 194 source.cls.fetch_call = False 195 handler.plugin_disable(dummy_sources[1]) 196 manager.acquire_cover(done, None, None) 197 run_loop() 198 self.assertTrue(found[1]) 199 self.assertIs(result[1], DUMMY_COVER) 200 self.assertTrue(dummy_sources[0].cls.cover_call) 201 self.assertFalse(dummy_sources[1].cls.cover_call) 202 self.assertTrue(dummy_sources[2].cls.cover_call) 203 self.assertFalse(dummy_sources[0].cls.fetch_call) 204 self.assertFalse(dummy_sources[1].cls.fetch_call) 205 self.assertTrue(dummy_sources[2].cls.fetch_call) 206 207 def test_search(self): 208 manager = CoverManager(use_built_in=False) 209 handler = manager.plugin_handler 210 for source in dummy_sources: 211 handler.plugin_handle(source) 212 handler.plugin_enable(source) 213 source.cls.cover_call = False 214 source.cls.fetch_call = False 215 216 song = AudioFile({ 217 "~filename": os.path.join("/tmp/asong.ogg"), 218 "album": "Abbey Road", 219 "artist": "The Beatles" 220 }) 221 songs = [song] 222 results = [] 223 224 def done(manager, provider, result): 225 self.failUnless(result, msg="Shouldn't succeed with no results") 226 results.append(result) 227 228 def finished(manager, songs): 229 print("Finished!") 230 231 manager.connect('covers-found', done) 232 manager.search_cover(Cancellable(), songs) 233 manager.connect('searches-complete', finished) 234 run_loop() 235 236 self.failUnlessEqual(len(results), 1) 237 238 def tearDown(self): 239 pass 240 241 242class TCoverManagerBuiltin(TestCase): 243 244 def setUp(self): 245 config.init() 246 247 self.main = mkdtemp() 248 249 self.dir1 = mkdtemp(dir=self.main) 250 self.dir2 = mkdtemp(dir=self.main) 251 252 h, self.cover1 = mkstemp(".png", dir=self.main) 253 os.close(h) 254 pb = GdkPixbuf.Pixbuf.new(GdkPixbuf.Colorspace.RGB, True, 8, 10, 10) 255 pb.savev(self.cover1, "png", [], []) 256 257 h, self.cover2 = mkstemp(".png", dir=self.main) 258 os.close(h) 259 pb = GdkPixbuf.Pixbuf.new(GdkPixbuf.Colorspace.RGB, True, 8, 20, 20) 260 pb.savev(self.cover2, "png", [], []) 261 262 self.file1 = get_temp_copy(get_data_path('silence-44-s.mp3')) 263 self.file2 = get_temp_copy(get_data_path('silence-44-s.mp3')) 264 265 self.manager = CoverManager() 266 267 def tearDown(self): 268 shutil.rmtree(self.main) 269 config.quit() 270 271 def test_connect_cover_changed(self): 272 273 called_with = [] 274 275 def sig_handler(*args): 276 called_with.extend(args) 277 278 obj = object() 279 self.manager.connect("cover-changed", sig_handler) 280 self.manager.cover_changed([obj]) 281 282 self.assertEqual(called_with, [self.manager, [obj]]) 283 284 def test_get_primary_image(self): 285 self.assertFalse(MP3File(self.file1).has_images) 286 self.assertFalse(MP3File(self.file1).has_images) 287 288 def test_manager(self): 289 self.assertEqual(len(list(self.manager.sources)), 2) 290 291 def test_get_cover_many_prefer_embedded(self): 292 # embed one cover, move one to the other dir 293 MP3File(self.file1).set_image(EmbeddedImage.from_path(self.cover1)) 294 os.unlink(self.cover1) 295 self.external_cover = os.path.join(self.dir2, "cover.png") 296 shutil.move(self.cover2, self.external_cover) 297 298 # move one audio file in each dir 299 shutil.move(self.file1, self.dir1) 300 self.file1 = os.path.join(self.dir1, os.path.basename(self.file1)) 301 shutil.move(self.file2, self.dir2) 302 self.file2 = os.path.join(self.dir2, os.path.basename(self.file2)) 303 304 song1 = MP3File(self.file1) 305 song2 = MP3File(self.file2) 306 307 # each should find a cover 308 self.failUnless(self.is_embedded(self.manager.get_cover(song1))) 309 self.failIf(self.is_embedded(self.manager.get_cover(song2))) 310 311 cover_for = self.manager.get_cover_many 312 # both settings should search both songs before giving up 313 config.set("albumart", "prefer_embedded", True) 314 self.failUnless(self.is_embedded(cover_for([song1, song2]))) 315 self.failUnless(self.is_embedded(cover_for([song2, song1]))) 316 317 config.set("albumart", "prefer_embedded", False) 318 self.failIf(self.is_embedded(cover_for([song1, song2]))) 319 self.failIf(self.is_embedded(cover_for([song2, song1]))) 320 321 def is_embedded(self, fileobj): 322 return not path_equal(fileobj.name, self.external_cover, True) 323 324 def test_acquire_prefer_embedded(self): 325 # embed one cover... 326 MP3File(self.file1).set_image(EmbeddedImage.from_path(self.cover1)) 327 os.unlink(self.cover1) 328 self.external_cover = os.path.join(self.dir1, "cover.png") 329 # ...and save a different cover externally 330 shutil.copy(self.cover2, self.external_cover) 331 332 shutil.move(self.file1, self.dir1) 333 self.file1 = os.path.join(self.dir1, os.path.basename(self.file1)) 334 both_song = MP3File(self.file1) 335 336 results = [] 337 338 def acquire(song): 339 def cb(source, result): 340 results.append(result) 341 342 self.manager.acquire_cover(cb, None, song) 343 344 def result_was_embedded(): 345 return self.is_embedded(results.pop()) 346 347 config.set("albumart", "prefer_embedded", True) 348 acquire(both_song) 349 self.failUnless(result_was_embedded(), 350 "Embedded image expected due to prefs") 351 352 config.set("albumart", "prefer_embedded", False) 353 acquire(both_song) 354 self.failIf(result_was_embedded(), 355 "Got an embedded image despite prefs") 356