1# -*- coding: utf-8 -*- 2 3import os 4import random 5import subprocess 6 7from mutagen._compat import BytesIO, xrange 8from tests import TestCase, DATA_DIR, get_temp_copy 9from mutagen.ogg import OggPage, error as OggError 10from mutagen._util import cdata 11from mutagen import _util 12 13 14class TOggPage(TestCase): 15 16 def setUp(self): 17 self.fileobj = open(os.path.join(DATA_DIR, "empty.ogg"), "rb") 18 self.page = OggPage(self.fileobj) 19 20 pages = [OggPage(), OggPage(), OggPage()] 21 pages[0].packets = [b"foo"] 22 pages[1].packets = [b"bar"] 23 pages[2].packets = [b"baz"] 24 for i in xrange(len(pages)): 25 pages[i].sequence = i 26 for page in pages: 27 page.serial = 1 28 self.pages = pages 29 30 def test_flags(self): 31 self.failUnless(self.page.first) 32 self.failIf(self.page.continued) 33 self.failIf(self.page.last) 34 self.failUnless(self.page.complete) 35 36 for first in [True, False]: 37 self.page.first = first 38 for last in [True, False]: 39 self.page.last = last 40 for continued in [True, False]: 41 self.page.continued = continued 42 self.failUnlessEqual(self.page.first, first) 43 self.failUnlessEqual(self.page.last, last) 44 self.failUnlessEqual(self.page.continued, continued) 45 46 def test_flags_next_page(self): 47 page = OggPage(self.fileobj) 48 self.failIf(page.first) 49 self.failIf(page.continued) 50 self.failIf(page.last) 51 52 def test_length(self): 53 # Always true for Ogg Vorbis files 54 self.failUnlessEqual(self.page.size, 58) 55 self.failUnlessEqual(len(self.page.write()), 58) 56 57 def test_first_metadata_page_is_separate(self): 58 self.failIf(OggPage(self.fileobj).continued) 59 60 def test_single_page_roundtrip(self): 61 self.failUnlessEqual( 62 self.page, OggPage(BytesIO(self.page.write()))) 63 64 def test_at_least_one_audio_page(self): 65 page = OggPage(self.fileobj) 66 while not page.last: 67 page = OggPage(self.fileobj) 68 self.failUnless(page.last) 69 70 def test_crappy_fragmentation(self): 71 packets = [b"1" * 511, b"2" * 511, b"3" * 511] 72 pages = OggPage.from_packets(packets, default_size=510, wiggle_room=0) 73 self.failUnless(len(pages) > 3) 74 self.failUnlessEqual(OggPage.to_packets(pages), packets) 75 76 def test_wiggle_room(self): 77 packets = [b"1" * 511, b"2" * 511, b"3" * 511] 78 pages = OggPage.from_packets( 79 packets, default_size=510, wiggle_room=100) 80 self.failUnlessEqual(len(pages), 3) 81 self.failUnlessEqual(OggPage.to_packets(pages), packets) 82 83 def test_one_packet_per_wiggle(self): 84 packets = [b"1" * 511, b"2" * 511, b"3" * 511] 85 pages = OggPage.from_packets( 86 packets, default_size=1000, wiggle_room=1000000) 87 self.failUnlessEqual(len(pages), 2) 88 self.failUnlessEqual(OggPage.to_packets(pages), packets) 89 90 def test_replace(self): 91 # create interleaved pages 92 fileobj = BytesIO() 93 pages = [OggPage(), OggPage(), OggPage()] 94 pages[0].serial = 42 95 pages[0].sequence = 0 96 pages[0].packets = [b"foo"] 97 pages[1].serial = 24 98 pages[1].sequence = 0 99 pages[1].packets = [b"bar"] 100 pages[2].serial = 42 101 pages[2].sequence = 1 102 pages[2].packets = [b"baz"] 103 for page in pages: 104 fileobj.write(page.write()) 105 106 fileobj.seek(0, 0) 107 pages_from_file = [OggPage(fileobj), OggPage(fileobj), 108 OggPage(fileobj)] 109 110 old_pages = [pages_from_file[0], pages_from_file[2]] 111 packets = OggPage.to_packets(old_pages, strict=True) 112 self.assertEqual(packets, [b"foo", b"baz"]) 113 new_packets = [b"1111", b"2222"] 114 new_pages = OggPage.from_packets(new_packets, 115 sequence=old_pages[0].sequence) 116 self.assertEqual(len(new_pages), 1) 117 OggPage.replace(fileobj, old_pages, new_pages) 118 119 fileobj.seek(0, 0) 120 first = OggPage(fileobj) 121 self.assertEqual(first.serial, 42) 122 self.assertEqual(OggPage.to_packets([first], strict=True), 123 [b"1111", b"2222"]) 124 second = OggPage(fileobj) 125 self.assertEqual(second.serial, 24) 126 self.assertEqual(OggPage.to_packets([second], strict=True), [b"bar"]) 127 128 def test_replace_fast_path(self): 129 # create interleaved pages 130 fileobj = BytesIO() 131 pages = [OggPage(), OggPage(), OggPage()] 132 pages[0].serial = 42 133 pages[0].sequence = 0 134 pages[0].packets = [b"foo"] 135 pages[1].serial = 24 136 pages[1].sequence = 0 137 pages[1].packets = [b"bar"] 138 pages[2].serial = 42 139 pages[2].sequence = 1 140 pages[2].packets = [b"baz"] 141 for page in pages: 142 fileobj.write(page.write()) 143 144 fileobj.seek(0, 0) 145 pages_from_file = [OggPage(fileobj), OggPage(fileobj), 146 OggPage(fileobj)] 147 148 old_pages = [pages_from_file[0], pages_from_file[2]] 149 packets = OggPage.to_packets(old_pages, strict=True) 150 self.assertEqual(packets, [b"foo", b"baz"]) 151 new_packets = [b"111", b"222"] 152 new_pages = OggPage._from_packets_try_preserve(new_packets, old_pages) 153 self.assertEqual(len(new_pages), 2) 154 155 # remove insert_bytes, so we can be sure the fast path was taken 156 old_insert_bytes = _util.insert_bytes 157 _util.insert_bytes = None 158 try: 159 OggPage.replace(fileobj, old_pages, new_pages) 160 finally: 161 _util.insert_bytes = old_insert_bytes 162 163 # validate that the new data was written and the other pages 164 # are untouched 165 fileobj.seek(0, 0) 166 pages_from_file = [OggPage(fileobj), OggPage(fileobj), 167 OggPage(fileobj)] 168 packets = OggPage.to_packets( 169 [pages_from_file[0], pages_from_file[2]], strict=True) 170 self.assertEqual(packets, [b"111", b"222"]) 171 packets = OggPage.to_packets([pages_from_file[1]], strict=True) 172 self.assertEqual(packets, [b"bar"]) 173 174 def test_replace_continued(self): 175 # take a partial packet and replace it with a new page 176 # replace() should make it spanning again 177 fileobj = BytesIO() 178 pages = [OggPage(), OggPage()] 179 pages[0].serial = 1 180 pages[0].sequence = 0 181 pages[0].complete = False 182 pages[0].packets = [b"foo"] 183 pages[1].serial = 1 184 pages[1].sequence = 1 185 pages[1].continued = True 186 pages[1].packets = [b"bar"] 187 fileobj = BytesIO() 188 for page in pages: 189 fileobj.write(page.write()) 190 191 fileobj.seek(0, 0) 192 pages_from_file = [OggPage(fileobj), OggPage(fileobj)] 193 self.assertEqual(OggPage.to_packets(pages_from_file), [b"foobar"]) 194 packets_part = OggPage.to_packets([pages_from_file[0]]) 195 self.assertEqual(packets_part, [b"foo"]) 196 new_pages = OggPage.from_packets([b"quuux"]) 197 OggPage.replace(fileobj, [pages_from_file[0]], new_pages) 198 199 fileobj.seek(0, 0) 200 written = OggPage.to_packets([OggPage(fileobj), OggPage(fileobj)]) 201 self.assertEquals(written, [b"quuuxbar"]) 202 203 def test_renumber(self): 204 self.failUnlessEqual( 205 [page.sequence for page in self.pages], [0, 1, 2]) 206 fileobj = BytesIO() 207 for page in self.pages: 208 fileobj.write(page.write()) 209 fileobj.seek(0) 210 OggPage.renumber(fileobj, 1, 10) 211 fileobj.seek(0) 212 pages = [OggPage(fileobj) for i in xrange(3)] 213 self.failUnlessEqual([page.sequence for page in pages], [10, 11, 12]) 214 215 fileobj.seek(0) 216 OggPage.renumber(fileobj, 1, 20) 217 fileobj.seek(0) 218 pages = [OggPage(fileobj) for i in xrange(3)] 219 self.failUnlessEqual([page.sequence for page in pages], [20, 21, 22]) 220 221 def test_renumber_extradata(self): 222 fileobj = BytesIO() 223 for page in self.pages: 224 fileobj.write(page.write()) 225 fileobj.write(b"left over data") 226 fileobj.seek(0) 227 # Trying to rewrite should raise an error... 228 self.failUnlessRaises(Exception, OggPage.renumber, fileobj, 1, 10) 229 fileobj.seek(0) 230 # But the already written data should remain valid, 231 pages = [OggPage(fileobj) for i in xrange(3)] 232 self.failUnlessEqual([page.sequence for page in pages], [10, 11, 12]) 233 # And the garbage that caused the error should be okay too. 234 self.failUnlessEqual(fileobj.read(), b"left over data") 235 236 def test_renumber_reread(self): 237 try: 238 filename = get_temp_copy( 239 os.path.join(DATA_DIR, "multipagecomment.ogg")) 240 with open(filename, "rb+") as fileobj: 241 OggPage.renumber(fileobj, 1002429366, 20) 242 with open(filename, "rb+") as fileobj: 243 OggPage.renumber(fileobj, 1002429366, 0) 244 finally: 245 os.unlink(filename) 246 247 def test_renumber_muxed(self): 248 pages = [OggPage() for i in xrange(10)] 249 for seq, page in enumerate(pages[0:1] + pages[2:]): 250 page.serial = 0 251 page.sequence = seq 252 pages[1].serial = 2 253 pages[1].sequence = 100 254 data = BytesIO(b"".join([page.write() for page in pages])) 255 OggPage.renumber(data, 0, 20) 256 data.seek(0) 257 pages = [OggPage(data) for i in xrange(10)] 258 self.failUnlessEqual(pages[1].serial, 2) 259 self.failUnlessEqual(pages[1].sequence, 100) 260 pages.pop(1) 261 self.failUnlessEqual( 262 [page.sequence for page in pages], list(xrange(20, 29))) 263 264 def test_to_packets(self): 265 self.failUnlessEqual( 266 [b"foo", b"bar", b"baz"], OggPage.to_packets(self.pages)) 267 self.pages[0].complete = False 268 self.pages[1].continued = True 269 self.failUnlessEqual( 270 [b"foobar", b"baz"], OggPage.to_packets(self.pages)) 271 272 def test_to_packets_mixed_stream(self): 273 self.pages[0].serial = 3 274 self.failUnlessRaises(ValueError, OggPage.to_packets, self.pages) 275 276 def test_to_packets_missing_sequence(self): 277 self.pages[0].sequence = 3 278 self.failUnlessRaises(ValueError, OggPage.to_packets, self.pages) 279 280 def test_to_packets_continued(self): 281 self.pages[0].continued = True 282 self.failUnlessEqual( 283 OggPage.to_packets(self.pages), [b"foo", b"bar", b"baz"]) 284 285 def test_to_packets_continued_strict(self): 286 self.pages[0].continued = True 287 self.failUnlessRaises( 288 ValueError, OggPage.to_packets, self.pages, strict=True) 289 290 def test_to_packets_strict(self): 291 for page in self.pages: 292 page.complete = False 293 self.failUnlessRaises( 294 ValueError, OggPage.to_packets, self.pages, strict=True) 295 296 def test_from_packets_short_enough(self): 297 packets = [b"1" * 200, b"2" * 200, b"3" * 200] 298 pages = OggPage.from_packets(packets) 299 self.failUnlessEqual(OggPage.to_packets(pages), packets) 300 301 def test_from_packets_position(self): 302 packets = [b"1" * 100000] 303 pages = OggPage.from_packets(packets) 304 self.failUnless(len(pages) > 1) 305 for page in pages[:-1]: 306 self.failUnlessEqual(-1, page.position) 307 self.failUnlessEqual(0, pages[-1].position) 308 309 def test_from_packets_long(self): 310 packets = [b"1" * 100000, b"2" * 100000, b"3" * 100000] 311 pages = OggPage.from_packets(packets) 312 self.failIf(pages[0].complete) 313 self.failUnless(pages[1].continued) 314 self.failUnlessEqual(OggPage.to_packets(pages), packets) 315 316 def test__from_packets_try_preserve(self): 317 # if the packet layout matches, just create pages with 318 # the same layout and copy things over 319 packets = [b"1" * 100000, b"2" * 100000, b"3" * 100000] 320 pages = OggPage.from_packets(packets, sequence=42, default_size=977) 321 new_pages = OggPage._from_packets_try_preserve(packets, pages) 322 self.assertEqual(pages, new_pages) 323 324 # zero case 325 new_pages = OggPage._from_packets_try_preserve([], pages) 326 self.assertEqual(new_pages, []) 327 328 # if the layout doesn't match we should fall back to creating new 329 # pages starting with the sequence of the first given page 330 other_packets = list(packets) 331 other_packets[1] += b"\xff" 332 other_pages = OggPage.from_packets(other_packets, 42) 333 new_pages = OggPage._from_packets_try_preserve(other_packets, pages) 334 self.assertEqual(new_pages, other_pages) 335 336 def test_random_data_roundtrip(self): 337 try: 338 random_file = open("/dev/urandom", "rb") 339 except (IOError, OSError): 340 print("WARNING: Random data round trip test disabled.") 341 return 342 try: 343 for i in xrange(10): 344 num_packets = random.randrange(2, 100) 345 lengths = [random.randrange(10, 10000) 346 for i in xrange(num_packets)] 347 packets = list(map(random_file.read, lengths)) 348 self.failUnlessEqual( 349 packets, OggPage.to_packets(OggPage.from_packets(packets))) 350 finally: 351 random_file.close() 352 353 def test_packet_exactly_255(self): 354 page = OggPage() 355 page.packets = [b"1" * 255] 356 page.complete = False 357 page2 = OggPage() 358 page2.packets = [b""] 359 page2.sequence = 1 360 page2.continued = True 361 self.failUnlessEqual( 362 [b"1" * 255], OggPage.to_packets([page, page2])) 363 364 def test_page_max_size_alone_too_big(self): 365 page = OggPage() 366 page.packets = [b"1" * 255 * 255] 367 page.complete = True 368 self.failUnlessRaises(ValueError, page.write) 369 370 def test_page_max_size(self): 371 page = OggPage() 372 page.packets = [b"1" * 255 * 255] 373 page.complete = False 374 page2 = OggPage() 375 page2.packets = [b""] 376 page2.sequence = 1 377 page2.continued = True 378 self.failUnlessEqual( 379 [b"1" * 255 * 255], OggPage.to_packets([page, page2])) 380 381 def test_complete_zero_length(self): 382 packets = [b""] * 20 383 page = OggPage.from_packets(packets)[0] 384 new_page = OggPage(BytesIO(page.write())) 385 self.failUnlessEqual(new_page, page) 386 self.failUnlessEqual(OggPage.to_packets([new_page]), packets) 387 388 def test_too_many_packets(self): 389 packets = [b"1"] * 3000 390 pages = OggPage.from_packets(packets) 391 map(OggPage.write, pages) 392 self.failUnless(len(pages) > 3000 // 255) 393 394 def test_read_max_size(self): 395 page = OggPage() 396 page.packets = [b"1" * 255 * 255] 397 page.complete = False 398 page2 = OggPage() 399 page2.packets = [b"", b"foo"] 400 page2.sequence = 1 401 page2.continued = True 402 data = page.write() + page2.write() 403 fileobj = BytesIO(data) 404 self.failUnlessEqual(OggPage(fileobj), page) 405 self.failUnlessEqual(OggPage(fileobj), page2) 406 self.failUnlessRaises(EOFError, OggPage, fileobj) 407 408 def test_invalid_version(self): 409 page = OggPage() 410 OggPage(BytesIO(page.write())) 411 page.version = 1 412 self.failUnlessRaises(OggError, OggPage, BytesIO(page.write())) 413 414 def test_not_enough_lacing(self): 415 data = OggPage().write()[:-1] + b"\x10" 416 self.failUnlessRaises(OggError, OggPage, BytesIO(data)) 417 418 def test_not_enough_data(self): 419 data = OggPage().write()[:-1] + b"\x01\x10" 420 self.failUnlessRaises(OggError, OggPage, BytesIO(data)) 421 422 def test_not_equal(self): 423 self.failIfEqual(OggPage(), 12) 424 425 def test_find_last(self): 426 pages = [OggPage() for i in xrange(10)] 427 for i, page in enumerate(pages): 428 page.sequence = i 429 data = BytesIO(b"".join([page.write() for page in pages])) 430 self.failUnlessEqual( 431 OggPage.find_last(data, pages[0].serial), pages[-1]) 432 433 def test_find_last_none_finishing(self): 434 page = OggPage() 435 page.position = -1 436 data = BytesIO(page.write()) 437 assert OggPage.find_last(data, page.serial, finishing=True) is None 438 439 def test_find_last_none_finishing_mux(self): 440 page1 = OggPage() 441 page1.last = True 442 page1.position = -1 443 page2 = OggPage() 444 page2.serial = page1.serial + 1 445 pages = [page1, page2] 446 data = BytesIO(b"".join([page.write() for page in pages])) 447 448 assert OggPage.find_last(data, page1.serial, finishing=True) is None 449 assert OggPage.find_last(data, page2.serial, finishing=True) == page2 450 451 def test_find_last_last_empty(self): 452 # https://github.com/quodlibet/mutagen/issues/308 453 pages = [OggPage() for i in xrange(10)] 454 for i, page in enumerate(pages): 455 page.sequence = i 456 page.position = i 457 pages[-1].last = True 458 pages[-1].position = -1 459 data = BytesIO(b"".join([page.write() for page in pages])) 460 page = OggPage.find_last(data, pages[-1].serial, finishing=True) 461 assert page is not None 462 assert page.position == 8 463 page = OggPage.find_last(data, pages[-1].serial, finishing=False) 464 assert page is not None 465 assert page.position == -1 466 467 def test_find_last_single_muxed(self): 468 page1 = OggPage() 469 page1.last = True 470 page2 = OggPage() 471 page2.serial = page1.serial + 1 472 pages = [page1, page2] 473 data = BytesIO(b"".join([page.write() for page in pages])) 474 assert OggPage.find_last(data, page2.serial).serial == page2.serial 475 476 def test_find_last_really_last(self): 477 pages = [OggPage() for i in xrange(10)] 478 pages[-1].last = True 479 for i, page in enumerate(pages): 480 page.sequence = i 481 data = BytesIO(b"".join([page.write() for page in pages])) 482 self.failUnlessEqual( 483 OggPage.find_last(data, pages[0].serial), pages[-1]) 484 485 def test_find_last_muxed(self): 486 pages = [OggPage() for i in xrange(10)] 487 for i, page in enumerate(pages): 488 page.sequence = i 489 pages[-2].last = True 490 pages[-1].serial = pages[0].serial + 1 491 data = BytesIO(b"".join([page.write() for page in pages])) 492 self.failUnlessEqual( 493 OggPage.find_last(data, pages[0].serial), pages[-2]) 494 495 def test_find_last_no_serial(self): 496 pages = [OggPage() for i in xrange(10)] 497 for i, page in enumerate(pages): 498 page.sequence = i 499 data = BytesIO(b"".join([page.write() for page in pages])) 500 self.failUnless(OggPage.find_last(data, pages[0].serial + 1) is None) 501 502 def test_find_last_invalid(self): 503 data = BytesIO(b"if you think this is an Ogg, you're crazy") 504 self.failUnlessRaises(OggError, OggPage.find_last, data, 0) 505 506 # Disabled because GStreamer will write Oggs with bad data, 507 # which we need to make a best guess for. 508 # 509 # def test_find_last_invalid_sync(self): 510 # data = BytesIO("if you think this is an OggS, you're crazy") 511 # self.failUnlessRaises(OggError, OggPage.find_last, data, 0) 512 513 def test_find_last_invalid_sync(self): 514 data = BytesIO(b"if you think this is an OggS, you're crazy") 515 page = OggPage.find_last(data, 0) 516 self.failIf(page) 517 518 def test_crc_py25(self): 519 # Make sure page.write can handle both signed/unsigned int 520 # return values of crc32. 521 # https://github.com/quodlibet/mutagen/issues/63 522 # http://docs.python.org/library/zlib.html#zlib.crc32 523 524 import zlib 525 old_crc = zlib.crc32 526 527 def zlib_uint(*args): 528 return (old_crc(*args) & 0xffffffff) 529 530 def zlib_int(*args): 531 return cdata.int_be(cdata.to_uint_be(old_crc(*args) & 0xffffffff)) 532 533 try: 534 page = OggPage() 535 page.packets = [b"abc"] 536 zlib.crc32 = zlib_uint 537 uint_data = page.write() 538 zlib.crc32 = zlib_int 539 int_data = page.write() 540 finally: 541 zlib.crc32 = old_crc 542 543 self.failUnlessEqual(uint_data, int_data) 544 545 def tearDown(self): 546 self.fileobj.close() 547 548 549class TOggFileTypeMixin(object): 550 551 PADDING_SUPPORT = True 552 553 def scan_file(self): 554 with open(self.filename, "rb") as fileobj: 555 try: 556 while True: 557 OggPage(fileobj) 558 except EOFError: 559 pass 560 561 def test_pprint_empty(self): 562 self.audio.pprint() 563 564 def test_pprint_stuff(self): 565 self.test_set_two_tags() 566 self.audio.pprint() 567 568 def test_length(self): 569 self.failUnlessAlmostEqual(3.7, self.audio.info.length, 1) 570 571 def test_no_tags(self): 572 self.failIf(self.audio.tags) 573 self.failIf(self.audio.tags is None) 574 575 def test_vendor_safe(self): 576 self.audio["vendor"] = "a vendor" 577 self.audio.save() 578 audio = self.Kind(self.filename) 579 self.failUnlessEqual(audio["vendor"], ["a vendor"]) 580 581 def test_set_two_tags(self): 582 self.audio["foo"] = ["a"] 583 self.audio["bar"] = ["b"] 584 self.audio.save() 585 audio = self.Kind(self.filename) 586 self.failUnlessEqual(len(audio.tags.keys()), 2) 587 self.failUnlessEqual(audio["foo"], ["a"]) 588 self.failUnlessEqual(audio["bar"], ["b"]) 589 self.scan_file() 590 591 def test_save_twice(self): 592 self.audio.save() 593 self.audio.save() 594 self.failUnlessEqual(self.Kind(self.filename).tags, self.audio.tags) 595 self.scan_file() 596 597 def test_set_delete(self): 598 self.test_set_two_tags() 599 self.audio.tags.clear() 600 self.audio.save() 601 audio = self.Kind(self.filename) 602 self.failIf(audio.tags) 603 self.scan_file() 604 605 def test_delete(self): 606 self.test_set_two_tags() 607 self.audio.delete() 608 self.failIf(self.audio.tags) 609 audio = self.Kind(self.filename) 610 self.failIf(audio.tags) 611 612 self.audio["foobar"] = "foobar" * 1000 613 self.audio.save() 614 audio = self.Kind(self.filename) 615 self.failUnlessEqual(self.audio["foobar"], audio["foobar"]) 616 617 self.scan_file() 618 619 def test_delete_remove_padding(self): 620 if not self.PADDING_SUPPORT: 621 return 622 self.audio.clear() 623 self.audio.save(padding=lambda x: 0) 624 filesize = os.path.getsize(self.audio.filename) 625 self.audio.delete() 626 # deleting shouldn't add padding 627 self.assertTrue(os.path.getsize(self.audio.filename) <= filesize) 628 629 def test_really_big(self): 630 self.audio["foo"] = "foo" * (2 ** 16) 631 self.audio["bar"] = "bar" * (2 ** 16) 632 self.audio["baz"] = "quux" * (2 ** 16) 633 self.audio.save() 634 audio = self.Kind(self.filename) 635 self.failUnlessEqual(audio["foo"], ["foo" * 2 ** 16]) 636 self.failUnlessEqual(audio["bar"], ["bar" * 2 ** 16]) 637 self.failUnlessEqual(audio["baz"], ["quux" * 2 ** 16]) 638 self.scan_file() 639 640 def test_delete_really_big(self): 641 self.audio["foo"] = "foo" * (2 ** 16) 642 self.audio["bar"] = "bar" * (2 ** 16) 643 self.audio["baz"] = "quux" * (2 ** 16) 644 self.audio.save() 645 646 self.audio.delete() 647 audio = self.Kind(self.filename) 648 self.failIf(audio.tags) 649 self.scan_file() 650 651 def test_invalid_open(self): 652 self.failUnlessRaises(OggError, self.Kind, 653 os.path.join(DATA_DIR, 'xing.mp3')) 654 655 def test_invalid_delete(self): 656 self.failUnlessRaises(OggError, self.audio.delete, 657 os.path.join(DATA_DIR, 'xing.mp3')) 658 659 def test_invalid_save(self): 660 self.failUnlessRaises(OggError, self.audio.save, 661 os.path.join(DATA_DIR, 'xing.mp3')) 662 663 def ogg_reference(self, filename): 664 self.scan_file() 665 if have_ogginfo: 666 self.assertEqual(call_ogginfo(filename), 0, 667 msg="ogginfo failed on %s" % filename) 668 669 if have_oggz_validate: 670 if filename.endswith(".opus") and not have_oggz_validate_opus: 671 return 672 self.assertEqual(call_oggz_validate(filename), 0, 673 msg="oggz-validate failed on %s" % filename) 674 675 def test_ogg_reference_simple_save(self): 676 self.audio.save() 677 self.ogg_reference(self.filename) 678 679 def test_ogg_reference_really_big(self): 680 self.test_really_big() 681 self.audio.save() 682 self.ogg_reference(self.filename) 683 684 def test_ogg_reference_delete(self): 685 self.audio.delete() 686 self.ogg_reference(self.filename) 687 688 def test_ogg_reference_medium_sized(self): 689 self.audio["foobar"] = "foobar" * 1000 690 self.audio.save() 691 self.ogg_reference(self.filename) 692 693 def test_ogg_reference_delete_readd(self): 694 self.audio.delete() 695 self.audio.tags.clear() 696 self.audio["foobar"] = "foobar" * 1000 697 self.audio.save() 698 self.ogg_reference(self.filename) 699 700 def test_mime_secondary(self): 701 self.failUnless('application/ogg' in self.audio.mime) 702 703 def test_padding(self): 704 if not self.PADDING_SUPPORT: 705 return 706 707 self.audio.clear() 708 self.audio["foo"] = ["bar"] 709 710 for i in [0, 1, 2, 42, 5000, 4999]: 711 self.audio.save(padding=lambda x: i) 712 new = self.Kind(self.filename) 713 self.assertEqual(new.tags._padding, i) 714 self.assertEqual(new["foo"], ["bar"]) 715 self.ogg_reference(self.filename) 716 717 718def call_ogginfo(*args): 719 with open(os.devnull, 'wb') as null: 720 return subprocess.call( 721 ["ogginfo"] + list(args), stdout=null, stderr=subprocess.STDOUT) 722 723 724def call_oggz_validate(*args): 725 with open(os.devnull, 'wb') as null: 726 return subprocess.call( 727 ["oggz-validate"] + list(args), 728 stdout=null, stderr=subprocess.STDOUT) 729 730 731def get_oggz_validate_version(): 732 """A version tuple or OSError if oggz-validate isn't available""" 733 734 process = subprocess.Popen(["oggz-validate", "--version"], 735 stdout=subprocess.PIPE) 736 output, unused_err = process.communicate() 737 retcode = process.poll() 738 if retcode != 0: 739 return (0,) 740 lines = output.splitlines() 741 if not lines: 742 return (0,) 743 parts = lines[0].split() 744 if not parts: 745 return (0,) 746 try: 747 return tuple(map(int, parts[-1].split(b"."))) 748 except ValueError: 749 return (0,) 750 751 752have_ogginfo = True 753try: 754 call_ogginfo() 755except OSError: 756 have_ogginfo = False 757 print("WARNING: Skipping ogginfo reference tests.") 758 759 760have_oggz_validate = True 761have_oggz_validate_opus = True 762try: 763 call_oggz_validate() 764except OSError: 765 have_oggz_validate = False 766 print("WARNING: Skipping oggz-validate reference tests.") 767else: 768 if get_oggz_validate_version() <= (0, 9, 9): 769 have_oggz_validate_opus = False 770 print("WARNING: Skipping oggz-validate reference tests for opus") 771