1# -*- coding: utf-8 -*- 2 3import os 4import sys 5from tempfile import mkstemp 6import shutil 7import warnings 8 9from hypothesis.strategies import composite, integers, one_of 10from hypothesis import given 11 12from tests import TestCase, DATA_DIR, get_temp_copy 13from mutagen._compat import cBytesIO, text_type 14from mutagen import File, Metadata, FileType, MutagenError, PaddingInfo 15from mutagen._util import loadfile, get_size 16from mutagen.oggvorbis import OggVorbis 17from mutagen.oggflac import OggFLAC 18from mutagen.oggspeex import OggSpeex 19from mutagen.oggtheora import OggTheora 20from mutagen.oggopus import OggOpus 21from mutagen.mp3 import MP3, EasyMP3 22from mutagen.id3 import ID3FileType 23from mutagen.apev2 import APEv2File 24from mutagen.flac import FLAC 25from mutagen.wavpack import WavPack 26from mutagen.trueaudio import TrueAudio, EasyTrueAudio 27from mutagen.mp4 import MP4 28from mutagen.musepack import Musepack 29from mutagen.monkeysaudio import MonkeysAudio 30from mutagen.optimfrog import OptimFROG 31from mutagen.asf import ASF 32from mutagen.aiff import AIFF 33from mutagen.aac import AAC 34from mutagen.smf import SMF 35from mutagen.dsf import DSF 36from os import devnull 37 38 39class TMetadata(TestCase): 40 41 class FakeMeta(Metadata): 42 def __init__(self): 43 pass 44 45 def test_virtual_constructor(self): 46 self.failUnlessRaises(NotImplementedError, Metadata, cBytesIO()) 47 48 def test_load(self): 49 m = Metadata() 50 self.failUnlessRaises(NotImplementedError, m.load, cBytesIO()) 51 52 def test_virtual_save(self): 53 self.failUnlessRaises( 54 NotImplementedError, self.FakeMeta().save, cBytesIO()) 55 self.failUnlessRaises( 56 NotImplementedError, self.FakeMeta().save, cBytesIO()) 57 58 def test_virtual_delete(self): 59 self.failUnlessRaises( 60 NotImplementedError, self.FakeMeta().delete, cBytesIO()) 61 self.failUnlessRaises( 62 NotImplementedError, self.FakeMeta().delete, cBytesIO()) 63 64 65class TPaddingInfo(TestCase): 66 67 def test_props(self): 68 info = PaddingInfo(10, 100) 69 self.assertEqual(info.size, 100) 70 self.assertEqual(info.padding, 10) 71 72 info = PaddingInfo(-10, 100) 73 self.assertEqual(info.size, 100) 74 self.assertEqual(info.padding, -10) 75 76 def test_default_strategy(self): 77 s = 100000 78 self.assertEqual(PaddingInfo(10, s).get_default_padding(), 10) 79 self.assertEqual(PaddingInfo(-10, s).get_default_padding(), 1124) 80 self.assertEqual(PaddingInfo(0, s).get_default_padding(), 0) 81 self.assertEqual(PaddingInfo(20000, s).get_default_padding(), 1124) 82 83 self.assertEqual(PaddingInfo(10, 0).get_default_padding(), 10) 84 self.assertEqual(PaddingInfo(-10, 0).get_default_padding(), 1024) 85 self.assertEqual(PaddingInfo(1050, 0).get_default_padding(), 1050) 86 self.assertEqual(PaddingInfo(20000, 0).get_default_padding(), 1024) 87 88 def test_repr(self): 89 info = PaddingInfo(10, 100) 90 self.assertEqual(repr(info), "<PaddingInfo size=100 padding=10>") 91 92 93class MyFileType(FileType): 94 95 @loadfile() 96 def load(self, filething, arg=1): 97 self.filename = filething.filename 98 self.fileobj = filething.fileobj 99 self.arg = arg 100 101 102class TFileTypeLoad(TestCase): 103 104 filename = os.path.join(DATA_DIR, "empty.ogg") 105 106 def test_old_argument_handling(self): 107 with warnings.catch_warnings(): 108 warnings.simplefilter("ignore") 109 f = MyFileType() 110 self.assertFalse(hasattr(f, "a")) 111 112 f = MyFileType(self.filename) 113 self.assertEquals(f.arg, 1) 114 115 f = MyFileType(self.filename, 42) 116 self.assertEquals(f.arg, 42) 117 self.assertEquals(f.filename, self.filename) 118 119 f = MyFileType(self.filename, arg=42) 120 self.assertEquals(f.arg, 42) 121 122 f = MyFileType(filename=self.filename, arg=42) 123 self.assertEquals(f.arg, 42) 124 125 self.assertRaises(TypeError, MyFileType, self.filename, nope=42) 126 self.assertRaises(TypeError, MyFileType, nope=42) 127 self.assertRaises(TypeError, MyFileType, self.filename, 42, 24) 128 129 def test_both_args(self): 130 # fileobj wins, but filename is saved 131 x = cBytesIO() 132 f = MyFileType(filename="foo", fileobj=x) 133 self.assertTrue(f.fileobj is x) 134 self.assertEquals(f.filename, "foo") 135 136 def test_fileobj(self): 137 x = cBytesIO() 138 f = MyFileType(fileobj=x) 139 self.assertTrue(f.fileobj is x) 140 self.assertTrue(f.filename is None) 141 142 def test_magic(self): 143 x = cBytesIO() 144 f = MyFileType(x) 145 self.assertTrue(f.fileobj is x) 146 self.assertTrue(f.filename is None) 147 148 def test_filething(self): 149 # while load() has that arg, we don't allow it as kwarg, either 150 # pass per arg, or be explicit about the type. 151 x = cBytesIO() 152 self.assertRaises(TypeError, MyFileType, filething=x) 153 154 def test_filename_explicit(self): 155 x = cBytesIO() 156 self.assertRaises(ValueError, MyFileType, filename=x) 157 158 159class TFileType(TestCase): 160 161 def setUp(self): 162 self.vorbis = File(os.path.join(DATA_DIR, "empty.ogg")) 163 164 filename = get_temp_copy(os.path.join(DATA_DIR, "xing.mp3")) 165 self.mp3_notags = File(filename) 166 self.mp3_filename = filename 167 168 def tearDown(self): 169 os.remove(self.mp3_filename) 170 171 def test_delitem_not_there(self): 172 self.failUnlessRaises(KeyError, self.vorbis.__delitem__, "foobar") 173 174 def test_add_tags(self): 175 with warnings.catch_warnings(): 176 warnings.simplefilter("ignore") 177 self.failUnlessRaises(NotImplementedError, FileType().add_tags) 178 179 def test_delitem(self): 180 self.vorbis["foobar"] = "quux" 181 del(self.vorbis["foobar"]) 182 self.failIf("quux" in self.vorbis) 183 184 def test_save_no_tags(self): 185 self.assertTrue(self.mp3_notags.tags is None) 186 self.assertTrue(self.mp3_notags.filename) 187 self.mp3_notags.save() 188 self.assertTrue(self.mp3_notags.tags is None) 189 190 191class _TestFileObj(object): 192 """A file-like object which fails in various ways""" 193 194 def __init__(self, fileobj, stop_after=-1, fail_after=-1): 195 """ 196 Args: 197 stop_after (int): size of data to return on read in total 198 fail_after (int): after this number of operations every method 199 will raise IOError 200 """ 201 202 self._fileobj = fileobj 203 self._stop_after = stop_after 204 self._fail_after = fail_after 205 206 self.dataread = 0 207 self.operations = 0 208 209 fileobj.seek(0, 0) 210 211 def _check_fail(self): 212 self.operations += 1 213 if self._fail_after != -1: 214 if self.operations > self._fail_after: 215 raise IOError("fail") 216 217 def tell(self): 218 self._check_fail() 219 return self._fileobj.tell() 220 221 def write(self, data): 222 try: 223 self._check_fail() 224 except IOError: 225 # we use write(b"") to check if the fileobj is writable 226 if len(data): 227 raise 228 self._fileobj.write(data) 229 230 def truncate(self, *args, **kwargs): 231 self._check_fail() 232 self._fileobj.truncate(*args, **kwargs) 233 234 def flush(self): 235 self._fileobj.flush() 236 237 def read(self, size=-1): 238 try: 239 self._check_fail() 240 except IOError: 241 # we use read(0) to test for the file object type, so don't error 242 # out in that case 243 if size != 0: 244 raise 245 246 data = self._fileobj.read(size) 247 self.dataread += len(data) 248 if self._stop_after != -1 and self.dataread > self._stop_after: 249 data = data[:self._stop_after - self.dataread] 250 return data 251 252 def seek(self, offset, whence=0): 253 self._check_fail() 254 255 # make sure we don't go negative 256 if whence == 0: 257 final_position = offset 258 elif whence == 1: 259 final_position = self._fileobj.tell() + offset 260 elif whence == 2: 261 final_position = get_size(self._fileobj) + offset 262 assert final_position >= 0, final_position 263 264 return self._fileobj.seek(offset, whence) 265 266 267def generate_test_file_objects(fileobj, func): 268 """Given a file object yields the same file object which fails differently 269 each time 270 """ 271 272 t = _TestFileObj(fileobj) 273 # first figure out how much a successful attempt reads and how many 274 # file object operations it executes. 275 func(t) 276 277 @composite 278 def strategy(draw): 279 280 stop_strat = integers( 281 min_value=0, max_value=t.dataread).map( 282 lambda i: _TestFileObj(fileobj, stop_after=i)) 283 284 fail_strat = integers( 285 min_value=0, max_value=t.operations).map( 286 lambda i: _TestFileObj(fileobj, fail_after=i)) 287 288 x = draw(one_of(stop_strat, fail_strat)) 289 return x 290 291 return strategy() 292 293 294class TAbstractFileType(object): 295 296 PATH = None 297 KIND = None 298 299 def setUp(self): 300 self.filename = get_temp_copy(self.PATH) 301 self.audio = self.KIND(self.filename) 302 303 def tearDown(self): 304 try: 305 os.remove(self.filename) 306 except OSError: 307 pass 308 309 def test_fileobj_load(self): 310 with open(self.filename, "rb") as h: 311 self.KIND(h) 312 313 def test_fileobj_save(self): 314 with open(self.filename, "rb+") as h: 315 f = self.KIND(h) 316 h.seek(0) 317 f.save(h) 318 h.seek(0) 319 f.delete(h) 320 321 def test_module_delete_fileobj(self): 322 mod = sys.modules[self.KIND.__module__] 323 if hasattr(mod, "delete"): 324 with open(self.filename, "rb+") as h: 325 mod.delete(fileobj=h) 326 327 def test_stringio(self): 328 with open(self.filename, "rb") as h: 329 fileobj = cBytesIO(h.read()) 330 self.KIND(fileobj) 331 # make sure it's not closed 332 fileobj.read(0) 333 334 def test_testfileobj(self): 335 with open(self.filename, "rb") as h: 336 self.KIND(_TestFileObj(h)) 337 338 def test_test_fileobj_load(self): 339 with open(self.filename, "rb") as h: 340 341 @given(generate_test_file_objects(h, self.KIND)) 342 def run(t): 343 try: 344 self.KIND(t) 345 except MutagenError: 346 pass 347 348 run() 349 350 def test_test_fileobj_save(self): 351 with open(self.filename, "rb+") as h: 352 o = self.KIND(_TestFileObj(h)) 353 354 @given(generate_test_file_objects(h, lambda t: o.save(fileobj=t))) 355 def run(t): 356 try: 357 o.save(fileobj=t) 358 except MutagenError: 359 pass 360 361 run() 362 363 def test_test_fileobj_delete(self): 364 with open(self.filename, "rb+") as h: 365 o = self.KIND(_TestFileObj(h)) 366 367 @given(generate_test_file_objects( 368 h, lambda t: o.delete(fileobj=t))) 369 def run(t): 370 try: 371 o.delete(fileobj=t) 372 except MutagenError: 373 pass 374 375 run() 376 377 def test_filename(self): 378 self.assertEqual(self.audio.filename, self.filename) 379 380 def test_file(self): 381 self.assertTrue(isinstance(File(self.PATH), self.KIND)) 382 383 def test_not_file(self): 384 self.failUnlessRaises(MutagenError, self.KIND, "/dev/doesnotexist") 385 386 def test_pprint(self): 387 res = self.audio.pprint() 388 self.assertTrue(res) 389 self.assertTrue(isinstance(res, text_type)) 390 391 def test_info(self): 392 self.assertTrue(self.audio.info) 393 394 def test_info_pprint(self): 395 res = self.audio.info.pprint() 396 self.assertTrue(res) 397 self.assertTrue(isinstance(res, text_type)) 398 399 def test_mime(self): 400 self.assertTrue(self.audio.mime) 401 self.assertTrue(isinstance(self.audio.mime, list)) 402 403 def test_load(self): 404 with warnings.catch_warnings(): 405 warnings.simplefilter("ignore") 406 x = self.KIND() 407 x.load(self.filename) 408 x.save() 409 410 def test_delete(self): 411 self.audio.delete(self.filename) 412 self.audio.delete() 413 414 def test_delete_nonexisting(self): 415 # if there are none, add them first 416 if not self.audio.tags: 417 try: 418 self.audio.add_tags() 419 except MutagenError: 420 pass 421 else: 422 self.audio.save() 423 424 os.remove(self.filename) 425 try: 426 self.audio.delete() 427 except MutagenError: 428 pass 429 430 def test_save_nonexisting(self): 431 os.remove(self.filename) 432 tags = self.audio.tags 433 # Metadata creates a new file 434 if not isinstance(tags, Metadata): 435 try: 436 self.audio.save() 437 except MutagenError: 438 pass 439 440 def test_save(self): 441 self.audio.save(self.filename) 442 self.audio.save() 443 444 def test_add_tags(self): 445 had_tags = self.audio.tags is not None 446 try: 447 self.audio.add_tags() 448 except MutagenError: 449 pass 450 else: 451 self.assertFalse(had_tags) 452 self.assertTrue(self.audio.tags is not None) 453 self.assertRaises(MutagenError, self.audio.add_tags) 454 455 def test_score(self): 456 with open(self.filename, "rb") as fileobj: 457 header = fileobj.read(128) 458 self.KIND.score(self.filename, fileobj, header) 459 460 def test_dict(self): 461 self.audio.keys() 462 self.assertRaises(KeyError, self.audio.__delitem__, "nopenopenopenope") 463 for key, value in self.audio.items(): 464 del self.audio[key] 465 self.audio[key] = value 466 467 468_FILETYPES = { 469 OggVorbis: [os.path.join(DATA_DIR, "empty.ogg")], 470 OggFLAC: [os.path.join(DATA_DIR, "empty.oggflac")], 471 OggSpeex: [os.path.join(DATA_DIR, "empty.spx")], 472 OggTheora: [os.path.join(DATA_DIR, "sample.oggtheora")], 473 OggOpus: [os.path.join(DATA_DIR, "example.opus")], 474 FLAC: [os.path.join(DATA_DIR, "silence-44-s.flac")], 475 TrueAudio: [os.path.join(DATA_DIR, "empty.tta")], 476 WavPack: [os.path.join(DATA_DIR, "silence-44-s.wv")], 477 MP3: [ 478 os.path.join(DATA_DIR, "bad-xing.mp3"), 479 os.path.join(DATA_DIR, "xing.mp3"), 480 os.path.join(DATA_DIR, "silence-44-s.mp3"), 481 ], 482 Musepack: [ 483 os.path.join(DATA_DIR, "click.mpc"), 484 os.path.join(DATA_DIR, "sv4_header.mpc"), 485 os.path.join(DATA_DIR, "sv5_header.mpc"), 486 os.path.join(DATA_DIR, "sv8_header.mpc"), 487 ], 488 OptimFROG: [ 489 os.path.join(DATA_DIR, "empty.ofr"), 490 os.path.join(DATA_DIR, "empty.ofs"), 491 ], 492 AAC: [ 493 os.path.join(DATA_DIR, "empty.aac"), 494 os.path.join(DATA_DIR, "adif.aac"), 495 ], 496 ASF: [ 497 os.path.join(DATA_DIR, "silence-1.wma"), 498 os.path.join(DATA_DIR, "silence-2.wma"), 499 os.path.join(DATA_DIR, "silence-3.wma"), 500 ], 501 AIFF: [ 502 os.path.join(DATA_DIR, "with-id3.aif"), 503 os.path.join(DATA_DIR, "11k-1ch-2s-silence.aif"), 504 os.path.join(DATA_DIR, "48k-2ch-s16-silence.aif"), 505 os.path.join(DATA_DIR, "8k-1ch-1s-silence.aif"), 506 os.path.join(DATA_DIR, "8k-1ch-3.5s-silence.aif"), 507 os.path.join(DATA_DIR, "8k-4ch-1s-silence.aif") 508 ], 509 MonkeysAudio: [ 510 os.path.join(DATA_DIR, "mac-399.ape"), 511 os.path.join(DATA_DIR, "mac-396.ape"), 512 ], 513 MP4: [ 514 os.path.join(DATA_DIR, "has-tags.m4a"), 515 os.path.join(DATA_DIR, "no-tags.m4a"), 516 os.path.join(DATA_DIR, "no-tags.3g2"), 517 os.path.join(DATA_DIR, "truncated-64bit.mp4"), 518 ], 519 SMF: [ 520 os.path.join(DATA_DIR, "sample.mid"), 521 ], 522 DSF: [ 523 os.path.join(DATA_DIR, '2822400-1ch-0s-silence.dsf'), 524 os.path.join(DATA_DIR, '5644800-2ch-s01-silence.dsf'), 525 os.path.join(DATA_DIR, 'with-id3.dsf'), 526 os.path.join(DATA_DIR, 'without-id3.dsf'), 527 ] 528} 529 530_FILETYPES[ID3FileType] = _FILETYPES[MP3] 531_FILETYPES[APEv2File] = _FILETYPES[MonkeysAudio] 532 533 534def create_filetype_tests(): 535 for kind, paths in _FILETYPES.items(): 536 for i, path in enumerate(paths): 537 suffix = "_" + str(i + 1) if i else "" 538 new_type = type("TFileType" + kind.__name__ + suffix, 539 (TAbstractFileType, TestCase), 540 {"PATH": path, "KIND": kind}) 541 globals()[new_type.__name__] = new_type 542 543create_filetype_tests() 544 545 546class TFile(TestCase): 547 548 @property 549 def filenames(self): 550 for kind, paths in _FILETYPES.items(): 551 for path in paths: 552 yield path 553 554 def test_bad(self): 555 try: 556 self.failUnless(File(devnull) is None) 557 except (OSError, IOError): 558 print("WARNING: Unable to open %s." % devnull) 559 self.failUnless(File(__file__) is None) 560 561 def test_empty(self): 562 filename = os.path.join(DATA_DIR, "empty") 563 open(filename, "wb").close() 564 try: 565 self.failUnless(File(filename) is None) 566 finally: 567 os.unlink(filename) 568 569 def test_not_file(self): 570 self.failUnlessRaises(MutagenError, File, "/dev/doesnotexist") 571 572 def test_no_options(self): 573 for filename in self.filenames: 574 filename = os.path.join(DATA_DIR, filename) 575 self.failIf(File(filename, options=[])) 576 577 def test_fileobj(self): 578 for filename in self.filenames: 579 with open(filename, "rb") as h: 580 self.assertTrue(File(h) is not None) 581 with open(filename, "rb") as h: 582 fileobj = cBytesIO(h.read()) 583 self.assertTrue(File(fileobj, filename=filename) is not None) 584 585 def test_mock_fileobj(self): 586 for filename in self.filenames: 587 with open(filename, "rb") as h: 588 589 @given(generate_test_file_objects(h, File)) 590 def run(t): 591 try: 592 File(t) 593 except MutagenError: 594 pass 595 596 run() 597 598 def test_easy_mp3(self): 599 self.failUnless(isinstance( 600 File(os.path.join(DATA_DIR, "silence-44-s.mp3"), easy=True), 601 EasyMP3)) 602 603 def test_apev2(self): 604 self.failUnless(isinstance( 605 File(os.path.join(DATA_DIR, "oldtag.apev2")), APEv2File)) 606 607 def test_easy_tta(self): 608 self.failUnless(isinstance( 609 File(os.path.join(DATA_DIR, "empty.tta"), easy=True), 610 EasyTrueAudio)) 611 612 def test_id3_indicates_mp3_not_tta(self): 613 header = b"ID3 the rest of this is garbage" 614 fileobj = cBytesIO(header) 615 filename = "not-identifiable.ext" 616 self.failUnless(TrueAudio.score(filename, fileobj, header) < 617 MP3.score(filename, fileobj, header)) 618 619 def test_prefer_theora_over_vorbis(self): 620 header = ( 621 b"OggS\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\xe1x\x06\x0f" 622 b"\x00\x00\x00\x00)S'\xf4\x01*\x80theora\x03\x02\x01\x006\x00\x1e" 623 b"\x00\x03V\x00\x01\xe0\x00\x00\x00\x00\x00\x18\x00\x00\x00\x01" 624 b"\x00\x00\x00\x00\x00\x00\x00&%\xa0\x00\xc0OggS\x00\x02\x00\x00" 625 b"\x00\x00\x00\x00\x00\x00d#\xa8\x1f\x00\x00\x00\x00]Y\xc0\xc0" 626 b"\x01\x1e\x01vorbis\x00\x00\x00\x00\x02\x80\xbb\x00\x00\x00\x00" 627 b"\x00\x00\x00\xee\x02\x00\x00\x00\x00\x00\xb8\x01") 628 fileobj = cBytesIO(header) 629 filename = "not-identifiable.ext" 630 self.failUnless(OggVorbis.score(filename, fileobj, header) < 631 OggTheora.score(filename, fileobj, header)) 632 633 634class TFileUpperExt(TestCase): 635 FILES = [ 636 (os.path.join(DATA_DIR, "empty.ofr"), OptimFROG), 637 (os.path.join(DATA_DIR, "sv5_header.mpc"), Musepack), 638 (os.path.join(DATA_DIR, "silence-3.wma"), ASF), 639 (os.path.join(DATA_DIR, "truncated-64bit.mp4"), MP4), 640 (os.path.join(DATA_DIR, "silence-44-s.flac"), FLAC), 641 ] 642 643 def setUp(self): 644 checks = [] 645 for (original, instance) in self.FILES: 646 ext = os.path.splitext(original)[1] 647 fd, filename = mkstemp(suffix=ext.upper()) 648 os.close(fd) 649 shutil.copy(original, filename) 650 checks.append((filename, instance)) 651 self.checks = checks 652 653 def test_case_insensitive_ext(self): 654 for (path, instance) in self.checks: 655 if isinstance(path, bytes): 656 path = path.decode("ascii") 657 self.failUnless( 658 isinstance(File(path, options=[instance]), instance)) 659 path = path.encode("ascii") 660 self.failUnless( 661 isinstance(File(path, options=[instance]), instance)) 662 663 def tearDown(self): 664 for (path, instance) in self.checks: 665 os.unlink(path) 666 667 668class TModuleImportAll(TestCase): 669 670 def setUp(self): 671 import mutagen 672 files = os.listdir(mutagen.__path__[0]) 673 modules = set(os.path.splitext(f)[0] for f in files) 674 modules = [f for f in modules if not f.startswith("_")] 675 676 self.modules = [] 677 for module in modules: 678 mod = getattr(__import__("mutagen." + module), module) 679 self.modules.append(mod) 680 681 def tearDown(self): 682 del self.modules[:] 683 684 def test_all(self): 685 for mod in self.modules: 686 for attr in getattr(mod, "__all__", []): 687 getattr(mod, attr) 688 689 def test_errors(self): 690 for mod in self.modules: 691 self.assertTrue(issubclass(mod.error, MutagenError), msg=mod.error) 692