1#!/usr/bin/python 2 3# Audio Tools, a module and set of tools for manipulating audio data 4# Copyright (C) 2007-2014 Brian Langenberger 5 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 19 20import sys 21import unittest 22import audiotools 23import struct 24import random 25import tempfile 26import decimal 27import os 28import os.path 29import test_streams 30from io import BytesIO 31from hashlib import md5 32 33from test import (parser, Variable_Reader, BLANK_PCM_Reader, 34 RANDOM_PCM_Reader, EXACT_SILENCE_PCM_Reader, 35 EXACT_BLANK_PCM_Reader, SHORT_PCM_COMBINATIONS, 36 MD5_Reader, FrameCounter, Join_Reader, 37 Combinations, Possibilities, 38 TEST_COVER1, TEST_COVER2, TEST_COVER3, HUGE_BMP) 39 40 41def do_nothing(self): 42 pass 43 44 45# add a bunch of decorator metafunctions like LIB_CORE 46# which can be wrapped around individual tests as needed 47for section in parser.sections(): 48 for option in parser.options(section): 49 if parser.getboolean(section, option): 50 vars()["%s_%s" % (section.upper(), 51 option.upper())] = lambda function: function 52 else: 53 vars()["%s_%s" % (section.upper(), 54 option.upper())] = lambda function: do_nothing 55 56 57if sys.version_info[0] >= 3: 58 def ints_to_bytes(l): 59 return bytes(l) 60 61 def bytes_to_ints(b): 62 return list(b) 63else: 64 def ints_to_bytes(l): 65 return b"".join(map(chr, l)) 66 67 def bytes_to_ints(b): 68 return map(ord, b) 69 70 71class PCMReader(unittest.TestCase): 72 @LIB_PCM 73 def test_pcm(self): 74 from audiotools.pcm import from_list 75 76 # try reading lots of bps/signed/endianness combinations 77 for bps in [8, 16, 24]: 78 for big_endian in [True, False]: 79 for signed in [True, False]: 80 reader = audiotools.PCMFileReader( 81 BytesIO( 82 from_list(list(range(-5, 5)), 83 1, 84 bps, 85 True).to_bytes(big_endian, signed)), 86 sample_rate=44100, 87 channels=1, 88 channel_mask=0x4, 89 bits_per_sample=bps, 90 signed=signed, 91 big_endian=big_endian) 92 93 self.assertEqual(reader.sample_rate, 44100) 94 self.assertEqual(reader.channels, 1) 95 self.assertEqual(reader.channel_mask, 0x4) 96 self.assertEqual(reader.bits_per_sample, bps) 97 98 # ensure the FrameList is read correctly 99 f = reader.read((bps // 8) * 10) 100 self.assertEqual(len(f), 10) 101 self.assertEqual(list(f), list(range(-5, 5))) 102 103 # ensure subsequent reads return empty FrameLists 104 for i in range(10): 105 f = reader.read((bps // 8) * 10) 106 self.assertEqual(len(f), 0) 107 108 # ensure closing the stream raises ValueErrors 109 # on subsequent reads 110 reader.close() 111 112 self.assertRaises(ValueError, reader.read, (bps // 8) * 10) 113 114 115class PCMCat(unittest.TestCase): 116 @LIB_PCM 117 def test_pcm(self): 118 from audiotools.pcm import from_list 119 120 # ensure mismatched streams raise ValueError at init time 121 audiotools.PCMCat([audiotools.PCMFileReader(BytesIO(b""), 122 sample_rate=44100, 123 channels=1, 124 channel_mask=0x4, 125 bits_per_sample=16)]) 126 127 self.assertRaises(ValueError, 128 audiotools.PCMCat, 129 [audiotools.PCMFileReader(BytesIO(b""), 130 sample_rate=96000, 131 channels=1, 132 channel_mask=0x4, 133 bits_per_sample=16), 134 audiotools.PCMFileReader(BytesIO(b""), 135 sample_rate=44100, 136 channels=1, 137 channel_mask=0x4, 138 bits_per_sample=16)]) 139 140 self.assertRaises(ValueError, 141 audiotools.PCMCat, 142 [audiotools.PCMFileReader(BytesIO(b""), 143 sample_rate=44100, 144 channels=2, 145 channel_mask=0x3, 146 bits_per_sample=16), 147 audiotools.PCMFileReader(BytesIO(b""), 148 sample_rate=44100, 149 channels=1, 150 channel_mask=0x4, 151 bits_per_sample=16)]) 152 153 self.assertRaises(ValueError, 154 audiotools.PCMCat, 155 [audiotools.PCMFileReader(BytesIO(b""), 156 sample_rate=44100, 157 channels=1, 158 channel_mask=0x4, 159 bits_per_sample=24), 160 audiotools.PCMFileReader(BytesIO(b""), 161 sample_rate=44100, 162 channels=1, 163 channel_mask=0x4, 164 bits_per_sample=16)]) 165 166 main_readers = [ 167 audiotools.PCMFileReader( 168 BytesIO( 169 from_list(samples, 1, 16, True).to_bytes(True, 170 True)), 171 sample_rate=44100, 172 channels=1, 173 channel_mask=0x4, 174 bits_per_sample=16, 175 signed=True, 176 big_endian=True) 177 for samples in [range(-15, -5), range(-5, 5), range(5, 15)]] 178 179 reader = audiotools.PCMCat(main_readers) 180 181 # ensure PCMCat's stream attributes match first reader's 182 self.assertEqual(reader.sample_rate, 44100) 183 self.assertEqual(reader.channels, 1) 184 self.assertEqual(reader.channel_mask, 0x4) 185 self.assertEqual(reader.bits_per_sample, 16) 186 187 # ensure all the substreams are read correctly 188 samples = [] 189 f = reader.read(2) 190 while len(f) > 0: 191 samples.extend(list(f)) 192 f = reader.read(2) 193 194 self.assertEqual(samples, list(range(-15, 15))) 195 196 # ensure subsequent reads return empty FrameLists 197 for i in range(10): 198 self.assertEqual(len(reader.read(2)), 0) 199 200 # main readers should not yet be closed 201 for r in main_readers: 202 for i in range(10): 203 self.assertEqual(len(r.read(2)), 0) 204 205 # ensure closing the stream raises ValueErrors 206 # on subsequent reads 207 reader.close() 208 209 self.assertRaises(ValueError, reader.read, 2) 210 211 # sub readers should also be closed by PCMCat's close() 212 for r in main_readers: 213 self.assertRaises(ValueError, r.read, 2) 214 215 216class BufferedPCMReader(unittest.TestCase): 217 @LIB_PCM 218 def test_pcm(self): 219 def frame_lengths(reader, pcm_frames): 220 frame = reader.read(pcm_frames) 221 while len(frame) > 0: 222 yield frame.frames 223 frame = reader.read(pcm_frames) 224 else: 225 reader.close() 226 227 # ensure our reader is generating randomly-sized frames 228 reader = Variable_Reader(EXACT_BLANK_PCM_Reader(4096 * 100)) 229 self.assertGreater(len(set(frame_lengths(reader, 4096))), 1) 230 231 # then, ensure that wrapped our reader in a BufferedPCMReader 232 # results in equal-sized frames 233 reader = audiotools.BufferedPCMReader( 234 Variable_Reader(EXACT_BLANK_PCM_Reader(4096 * 100))) 235 # (make sure to account for bps/channels in frame_lengths()) 236 self.assertEqual(set(frame_lengths(reader, 4096)), set([4096])) 237 238 # check that sample_rate, bits_per_sample, channel_mask and channels 239 # pass-through properly 240 for sample_rate in [32000, 44100, 48000, 192000]: 241 for bits_per_sample in [8, 16, 24]: 242 for (channels, channel_mask) in [(1, 0x4), 243 (2, 0x3), 244 (4, 0x33), 245 (6, 0x3F)]: 246 reader = BLANK_PCM_Reader(1, 247 sample_rate=sample_rate, 248 channels=channels, 249 bits_per_sample=bits_per_sample, 250 channel_mask=channel_mask) 251 reader2 = audiotools.BufferedPCMReader(reader) 252 self.assertEqual(reader.sample_rate, sample_rate) 253 self.assertEqual(reader.channels, channels) 254 self.assertEqual(reader.bits_per_sample, bits_per_sample) 255 self.assertEqual(reader.channel_mask, channel_mask) 256 257 self.assertEqual(reader2.sample_rate, sample_rate) 258 self.assertEqual(reader2.channels, channels) 259 self.assertEqual(reader2.bits_per_sample, bits_per_sample) 260 self.assertEqual(reader2.channel_mask, channel_mask) 261 262 # ensure that random-sized reads also work okay 263 total_frames = 4096 * 1000 264 reader = audiotools.BufferedPCMReader( 265 Variable_Reader(EXACT_BLANK_PCM_Reader(total_frames))) 266 while total_frames > 0: 267 frames = min(total_frames, random.choice(range(1, 1000))) 268 frame = reader.read(frames) 269 self.assertEqual(frame.frames, frames) 270 total_frames -= frame.frames 271 272 # ensure reading after the stream has been exhausted 273 # results in empty FrameLists 274 reader = audiotools.BufferedPCMReader( 275 EXACT_BLANK_PCM_Reader(44100)) 276 f = reader.read(4096) 277 while len(f) > 0: 278 f = reader.read(4096) 279 280 self.assertEqual(len(f), 0) 281 282 for i in range(10): 283 f = reader.read(4096) 284 self.assertEqual(len(f), 0) 285 286 # and ensure reading after the stream is closed 287 # raises a ValueError 288 reader.close() 289 290 self.assertRaises(ValueError, 291 reader.read, 292 4096) 293 294 295class LimitedPCMReader(unittest.TestCase): 296 @LIB_PCM 297 def test_pcm(self): 298 from audiotools.pcm import from_list 299 300 main_reader = audiotools.PCMFileReader( 301 BytesIO( 302 from_list(list(range(-50, 50)), 303 1, 304 16, 305 True).to_bytes(True, True)), 306 sample_rate=44100, 307 channels=1, 308 channel_mask=0x4, 309 bits_per_sample=16, 310 signed=True, 311 big_endian=True) 312 313 total_samples = [] 314 for pcm_frames in [10, 20, 30, 40]: 315 reader_samples = [] 316 reader = audiotools.LimitedPCMReader(main_reader, pcm_frames) 317 self.assertEqual(reader.sample_rate, 44100) 318 self.assertEqual(reader.channels, 1) 319 self.assertEqual(reader.channel_mask, 0x4) 320 self.assertEqual(reader.bits_per_sample, 16) 321 322 f = reader.read(2) 323 while len(f) > 0: 324 reader_samples.extend(list(f)) 325 f = reader.read(2) 326 327 self.assertEqual(len(reader_samples), pcm_frames) 328 329 total_samples.extend(reader_samples) 330 331 # ensure subsequent reads return empty FrameLists 332 for i in range(10): 333 self.assertEqual(len(reader.read(2)), 0) 334 335 # ensure closing the substream raises ValueErrors 336 # on subsequent reads 337 # (note that this doesn't close the main reader) 338 reader.close() 339 340 self.assertRaises(ValueError, reader.read, 2) 341 342 self.assertEqual(total_samples, list(range(-50, 50))) 343 344 # ensure subsequent reads of main reader return empty FrameLists 345 for i in range(10): 346 self.assertEqual(len(main_reader.read(2)), 0) 347 348 # ensure closing the substream raises ValueErrors 349 # on subsequent reads 350 main_reader.close() 351 352 self.assertRaises(ValueError, main_reader.read, 2) 353 354 355class PCMReaderWindow(unittest.TestCase): 356 @LIB_PCM 357 def test_pcm(self): 358 from audiotools.pcm import from_list 359 360 for initial_offset in range(-5, 5): 361 for pcm_frames in range(5, 15): 362 main_reader = audiotools.PCMFileReader( 363 BytesIO( 364 from_list(list(range(1, 11)), 365 1, 366 16, 367 True).to_bytes(True, True)), 368 sample_rate=44100, 369 channels=1, 370 channel_mask=0x4, 371 bits_per_sample=16, 372 signed=True, 373 big_endian=True) 374 375 reader = audiotools.PCMReaderWindow(main_reader, 376 initial_offset, 377 pcm_frames) 378 379 self.assertEqual(reader.sample_rate, 380 main_reader.sample_rate) 381 self.assertEqual(reader.channels, 382 main_reader.channels) 383 self.assertEqual(reader.channel_mask, 384 main_reader.channel_mask) 385 self.assertEqual(reader.bits_per_sample, 386 main_reader.bits_per_sample) 387 388 # ensure reads generate the proper window of samples 389 samples = [] 390 f = reader.read(2) 391 while len(f) > 0: 392 samples.extend(list(f)) 393 f = reader.read(2) 394 395 self.assertEqual(len(samples), pcm_frames) 396 397 target_samples = list(range(1, 11)) 398 if initial_offset < 0: 399 # negative offsets pad window with 0s 400 target_samples = (([0] * abs(initial_offset)) + 401 target_samples) 402 elif initial_offset > 0: 403 # positive offsets remove samples from window 404 target_samples = target_samples[initial_offset:] 405 406 if len(target_samples) < pcm_frames: 407 # window longer than samples gets padded with 0s 408 target_samples += [0] * (pcm_frames - len(target_samples)) 409 elif len(target_samples) > pcm_frames: 410 # window shorder than samples truncates samples 411 target_samples = target_samples[0:pcm_frames] 412 413 self.assertEqual(samples, target_samples) 414 415 # ensure subsequent reads return empty FrameLists 416 for i in range(10): 417 self.assertEqual(len(reader.read(2)), 0) 418 419 # ensure closing the PCMReaderWindow 420 # generates ValueErrors on subsequent reads 421 reader.close() 422 423 self.assertRaises(ValueError, reader.read, 2) 424 425 # ensure closing the PCMReaderWindow 426 # closes the main PCMReader also 427 self.assertRaises(ValueError, main_reader.read, 2) 428 429 430class Sines(unittest.TestCase): 431 @LIB_PCM 432 def test_pcm(self): 433 for stream in [ 434 test_streams.Generate01(44100), 435 test_streams.Generate02(44100), 436 test_streams.Generate03(44100), 437 test_streams.Generate04(44100), 438 439 test_streams.Sine8_Mono(200000, 48000, 440 441.0, 0.50, 441.0, 0.49), 441 test_streams.Sine8_Stereo(200000, 48000, 442 441.0, 0.50, 441.0, 0.49, 1.0), 443 test_streams.Sine16_Mono(200000, 48000, 444 441.0, 0.50, 441.0, 0.49), 445 test_streams.Sine16_Stereo(200000, 48000, 446 441.0, 0.50, 441.0, 0.49, 1.0), 447 test_streams.Sine24_Mono(200000, 48000, 448 441.0, 0.50, 441.0, 0.49), 449 test_streams.Sine24_Stereo(200000, 48000, 450 441.0, 0.50, 441.0, 0.49, 1.0), 451 test_streams.Simple_Sine(200000, 44100, 0x3F, 16, 452 (6400, 10000), 453 (11520, 15000), 454 (16640, 20000), 455 (21760, 25000), 456 (26880, 30000), 457 (30720, 35000)), 458 459 test_streams.fsd16([1, -1], 100), 460 461 test_streams.WastedBPS16(1000)]: 462 463 # read the base data from the stream 464 f = stream.read(4096) 465 while len(f) > 0: 466 f = stream.read(4096) 467 468 # ensure subsequent reads return empty FrameLists 469 for i in range(10): 470 self.assertEqual(len(stream.read(4096)), 0) 471 472 # ensure subsequent reads on a closed stream 473 # raises ValueError 474 stream.close() 475 476 self.assertRaises(ValueError, stream.read, 4096) 477 478 479class CDDA(unittest.TestCase): 480 @LIB_CDIO 481 def setUp(self): 482 self.temp_dir = tempfile.mkdtemp() 483 self.bin = os.path.join(self.temp_dir, "Test.BIN") 484 self.cue = os.path.join(self.temp_dir, "Test.CUE") 485 486 with open(self.bin, "wb") as bin_file: 487 self.reader = test_streams.Sine16_Stereo(43646652, 44100, 488 441.0, 0.50, 489 4410.0, 0.49, 1.0) 490 audiotools.transfer_framelist_data(self.reader, bin_file.write) 491 492 with open(self.cue, "wb") as f1: 493 with open("cdda_test.cue", "rb") as f2: 494 f1.write(f2.read()) 495 496 @LIB_CDIO 497 def tearDown(self): 498 os.unlink(self.bin) 499 os.unlink(self.cue) 500 os.rmdir(self.temp_dir) 501 502 @LIB_CDIO 503 def test_cdda(self): 504 from audiotools.cdio import CDDAReader 505 506 cdda = CDDAReader(self.cue) 507 508 self.assertEqual(cdda.is_cd_image, True) 509 self.assertEqual(cdda.sample_rate, 44100) 510 self.assertEqual(cdda.channels, 2) 511 self.assertEqual(cdda.channel_mask, 0x3) 512 self.assertEqual(cdda.bits_per_sample, 16) 513 self.assertEqual(cdda.first_sector, 0) 514 self.assertEqual(cdda.last_sector, (43646652 // 588) - 1) 515 self.assertEqual(cdda.track_lengths, 516 {1: 8038548, 517 2: 7932120, 518 3: 6318648, 519 4: 10765104, 520 5: 5491920, 521 6: 5100312}) 522 self.assertEqual(cdda.track_offsets, 523 {1: 0, 524 2: 8038548, 525 3: 15970668, 526 4: 22289316, 527 5: 33054420, 528 6: 38546340}) 529 530 # verify whole disc 531 checksum = md5() 532 frame = cdda.read(44100) 533 while len(frame) > 0: 534 checksum.update(frame.to_bytes(False, True)) 535 frame = cdda.read(44100) 536 self.assertEqual(self.reader.hexdigest(), 537 checksum.hexdigest()) 538 539 # ensure subsequent reads keep generating empty framelists 540 for i in range(10): 541 self.assertEqual(cdda.read(44100).frames, 0) 542 543 # verify individual track sections 544 for track_num in sorted(cdda.track_offsets.keys()): 545 offset = cdda.track_offsets[track_num] 546 length = cdda.track_lengths[track_num] 547 remaining_offset = offset - cdda.seek(offset) 548 self.reader.reset() 549 self.assertTrue(audiotools.pcm_cmp( 550 audiotools.PCMReaderWindow(cdda, 551 remaining_offset, 552 length, 553 False), 554 audiotools.PCMReaderWindow(self.reader, offset, length))) 555 556 # verify close raises exceptions when reading/seeking 557 cdda.close() 558 self.assertRaises(ValueError, cdda.read, 10) 559 560 self.assertRaises(ValueError, cdda.seek, 10) 561 562 563class ChannelMask(unittest.TestCase): 564 @LIB_CORE 565 def test_mask(self): 566 mask = audiotools.ChannelMask.from_fields() 567 self.assertFalse(mask.defined()) 568 self.assertTrue(mask.undefined()) 569 self.assertEqual(len(mask), 0) 570 self.assertEqual(set([]), set(mask.channels())) 571 mask2 = audiotools.ChannelMask(int(mask)) 572 self.assertEqual(mask, mask2) 573 574 mask_fields = list(audiotools.ChannelMask.SPEAKER_TO_MASK.keys()) 575 for count in range(1, len(mask_fields) + 1): 576 for fields in Combinations(mask_fields, count): 577 # build a mask from fields 578 mask = audiotools.ChannelMask.from_fields( 579 **dict([(field, True) for field in fields])) 580 self.assertTrue(mask.defined()) 581 self.assertFalse(mask.undefined()) 582 self.assertEqual(len(mask), len(fields)) 583 self.assertEqual(set(fields), set(mask.channels())) 584 mask2 = audiotools.ChannelMask(int(mask)) 585 self.assertEqual(mask, mask2) 586 587 588class Filename(unittest.TestCase): 589 def setUp(self): 590 self.temp_dir = tempfile.mkdtemp() 591 self.temp_file1 = os.path.join(self.temp_dir, "file1") 592 self.temp_file2 = os.path.join(self.temp_dir, "file2") 593 f = open(self.temp_file1, "w") 594 f.write("hello world") 595 f.close() 596 os.link(self.temp_file1, self.temp_file2) 597 598 def tearDown(self): 599 os.unlink(self.temp_file1) 600 os.unlink(self.temp_file2) 601 os.rmdir(self.temp_dir) 602 603 @LIB_CORE 604 def test_filename(self): 605 file1 = audiotools.Filename(self.temp_file1) 606 file2 = audiotools.Filename(self.temp_file2) 607 file3 = audiotools.Filename(os.path.join(self.temp_dir, "file3")) 608 file4 = audiotools.Filename(os.path.join(self.temp_dir, "./file3")) 609 file5 = audiotools.Filename(os.path.join(self.temp_dir, "file4")) 610 611 self.assertTrue(file1.disk_file()) 612 self.assertTrue(file2.disk_file()) 613 self.assertNotEqual(str(file1), str(file2)) 614 self.assertNotEqual(u"%s" % (file1,), u"%s" % (file2,)) 615 self.assertEqual(file1, file2) 616 self.assertEqual(hash(file1), hash(file2)) 617 618 self.assertFalse(file3.disk_file()) 619 self.assertNotEqual(str(file1), str(file3)) 620 self.assertNotEqual(u"%s" % (file1,), u"%s" % (file3,)) 621 self.assertNotEqual(file1, file3) 622 self.assertNotEqual(hash(file1), hash(file3)) 623 624 self.assertFalse(file4.disk_file()) 625 self.assertEqual(str(file3), str(file4)) 626 self.assertEqual(u"%s" % (file3,), u"%s" % (file4,)) 627 self.assertEqual(file3, file4) 628 self.assertEqual(hash(file3), hash(file4)) 629 630 self.assertFalse(file5.disk_file()) 631 self.assertNotEqual(str(file3), str(file5)) 632 self.assertNotEqual(u"%s" % (file3,), u"%s" % (file5,)) 633 self.assertNotEqual(file3, file5) 634 self.assertNotEqual(hash(file3), hash(file5)) 635 636 637class ImageJPEG(unittest.TestCase): 638 @LIB_CORE 639 def setUp(self): 640 with open("imagejpeg_setup.jpg", "rb") as f: 641 self.image = f.read() 642 self.md5sum = "f8c43ff52c53aff1625979de47a04cec" 643 self.width = 12 644 self.height = 21 645 self.bpp = 24 646 self.colors = 0 647 self.mime_type = "image/jpeg" 648 649 @LIB_CORE 650 def test_checksum(self): 651 self.assertEqual(md5(self.image).hexdigest(), self.md5sum) 652 653 @LIB_CORE 654 def test_image(self): 655 img = audiotools.Image.new(self.image, u"Description", 1) 656 self.assertEqual(img.data, self.image) 657 self.assertEqual(img.mime_type, self.mime_type) 658 self.assertEqual(img.width, self.width) 659 self.assertEqual(img.height, self.height) 660 self.assertEqual(img.color_depth, self.bpp) 661 self.assertEqual(img.color_count, self.colors) 662 self.assertEqual(img.description, u"Description") 663 self.assertEqual(img.type, 1) 664 665 666class ImagePNG(ImageJPEG): 667 @LIB_CORE 668 def setUp(self): 669 with open("imagepng_setup.png", "rb") as f: 670 self.image = f.read() 671 self.md5sum = "31c4c5224327d5869aa6059bcda84d2e" 672 self.width = 12 673 self.height = 21 674 self.bpp = 24 675 self.colors = 0 676 self.mime_type = "image/png" 677 678 679class ImageCover1(ImageJPEG): 680 @LIB_CORE 681 def setUp(self): 682 self.image = TEST_COVER1 683 self.md5sum = "dbb6a01eca6336381754346de71e052e" 684 self.width = 500 685 self.height = 500 686 self.bpp = 24 687 self.colors = 0 688 self.mime_type = "image/jpeg" 689 690 691class ImageCover2(ImageJPEG): 692 @LIB_CORE 693 def setUp(self): 694 self.image = TEST_COVER2 695 self.md5sum = "2d348cf729c840893d672dd69476955c" 696 self.width = 500 697 self.height = 500 698 self.bpp = 24 699 self.colors = 0 700 self.mime_type = "image/png" 701 702 703class ImageCover3(ImageJPEG): 704 @LIB_CORE 705 def setUp(self): 706 self.image = TEST_COVER3 707 self.md5sum = "534b107e88d3830eac7ce814fc5d0279" 708 self.width = 100 709 self.height = 100 710 self.bpp = 24 711 self.colors = 0 712 self.mime_type = "image/jpeg" 713 714 715class ImageGIF(ImageJPEG): 716 @LIB_CORE 717 def setUp(self): 718 with open("imagegif_setup.gif", "rb") as f: 719 self.image = f.read() 720 self.md5sum = "1d4d36801b53c41d01086cbf9d0cb471" 721 self.width = 12 722 self.height = 21 723 self.bpp = 8 724 self.colors = 32 725 self.mime_type = "image/gif" 726 727 728class ImageBMP(ImageJPEG): 729 @LIB_CORE 730 def setUp(self): 731 with open("imagebmp_setup.bmp", "rb") as f: 732 self.image = f.read() 733 self.md5sum = "cb6ef2f7a458ab1d315c329f72ec9898" 734 self.width = 12 735 self.height = 21 736 self.bpp = 24 737 self.colors = 0 738 self.mime_type = "image/x-ms-bmp" 739 740 741class ImageTIFF(ImageJPEG): 742 @LIB_CORE 743 def setUp(self): 744 with open("imagetiff_setup.tiff", "rb") as f: 745 self.image = f.read() 746 self.md5sum = "192ceb086d217421a5f151cc0afa3f05" 747 self.width = 12 748 self.height = 21 749 self.bpp = 24 750 self.colors = 0 751 self.mime_type = "image/tiff" 752 753 754class ImageHugeBMP(ImageJPEG): 755 @LIB_CORE 756 def setUp(self): 757 from bz2 import decompress 758 self.image = decompress(HUGE_BMP) 759 self.md5sum = "558d875195829de829059fd4952fed46" 760 self.width = 2366 761 self.height = 2366 762 self.bpp = 24 763 self.colors = 0 764 self.mime_type = "image/x-ms-bmp" 765 766 767class PCMConverter(unittest.TestCase): 768 @LIB_PCM 769 def setUp(self): 770 self.tempwav = tempfile.NamedTemporaryFile(suffix=".wav") 771 772 @LIB_PCM 773 def tearDown(self): 774 self.tempwav.close() 775 776 @LIB_PCM 777 def test_conversions(self): 778 for ((i_sample_rate, 779 i_channels, 780 i_channel_mask, 781 i_bits_per_sample), 782 (o_sample_rate, 783 o_channels, 784 o_channel_mask, 785 o_bits_per_sample)) in Combinations(SHORT_PCM_COMBINATIONS, 2): 786 787 reader = BLANK_PCM_Reader(5, 788 sample_rate=i_sample_rate, 789 channels=i_channels, 790 bits_per_sample=i_bits_per_sample, 791 channel_mask=i_channel_mask) 792 793 converter = audiotools.PCMConverter( 794 reader, 795 sample_rate=o_sample_rate, 796 channels=o_channels, 797 bits_per_sample=o_bits_per_sample, 798 channel_mask=o_channel_mask) 799 800 wave = audiotools.WaveAudio.from_pcm(self.tempwav.name, converter) 801 converter.close() 802 803 self.assertEqual(wave.sample_rate(), o_sample_rate) 804 self.assertEqual(wave.channels(), o_channels) 805 self.assertEqual(wave.bits_per_sample(), o_bits_per_sample) 806 self.assertEqual(wave.channel_mask(), o_channel_mask) 807 self.assertEqual( 808 (decimal.Decimal(wave.cd_frames()) / 75).to_integral(), 809 5) 810 811 @LIB_PCM 812 def test_pcm(self): 813 for (in_sample_rate, 814 (in_channels, 815 in_channel_mask), 816 in_bits_per_sample) in Possibilities([44100, 96000], 817 [(1, 0x4), 818 (2, 0x3), 819 (4, 0x33)], 820 [16, 24]): 821 for (out_sample_rate, 822 (out_channels, 823 out_channel_mask), 824 out_bits_per_sample) in Possibilities([44100, 96000], 825 [(1, 0x4), 826 (2, 0x3), 827 (4, 0x33)], 828 [16, 24]): 829 830 main_reader = BLANK_PCM_Reader( 831 length=1, 832 sample_rate=in_sample_rate, 833 channels=in_channels, 834 bits_per_sample=in_bits_per_sample, 835 channel_mask=in_channel_mask) 836 837 reader = audiotools.PCMConverter( 838 pcmreader=main_reader, 839 sample_rate=out_sample_rate, 840 channels=out_channels, 841 channel_mask=out_channel_mask, 842 bits_per_sample=out_bits_per_sample) 843 844 # read contents of converted stream 845 f = reader.read(4096) 846 while len(f) > 0: 847 f = reader.read(4096) 848 849 # ensure subsequent reads return empty FrameLists 850 for i in range(10): 851 self.assertEqual(len(reader.read(4096)), 0) 852 853 # ensure closing stream raises ValueErrors 854 # on subsequent reads 855 reader.close() 856 857 self.assertRaises(ValueError, reader.read, 4096) 858 859 # ensure main reader is also closed 860 # when converter is closed 861 self.assertRaises(ValueError, main_reader.read, 4096) 862 863 864class Test_ReplayGain(unittest.TestCase): 865 @LIB_CORE 866 def test_replaygain(self): 867 # a trivial test of the ReplayGain container 868 869 self.assertEqual(audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0), 870 audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0)) 871 self.assertNotEqual(audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0), 872 audiotools.ReplayGain(0.25, 1.0, 0.5, 1.0)) 873 self.assertNotEqual(audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0), 874 audiotools.ReplayGain(0.5, 0.5, 0.5, 1.0)) 875 self.assertNotEqual(audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0), 876 audiotools.ReplayGain(0.5, 1.0, 0.25, 1.0)) 877 self.assertNotEqual(audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0), 878 audiotools.ReplayGain(0.5, 1.0, 0.5, 0.5)) 879 880 881class Test_filename_to_type(unittest.TestCase): 882 @LIB_CORE 883 def test_filename_to_type(self): 884 type_group = {} 885 for audio_type in audiotools.AVAILABLE_TYPES: 886 type_group.setdefault(audio_type.SUFFIX, []).append(audio_type) 887 888 for suffix in type_group.keys(): 889 temp = tempfile.NamedTemporaryFile(suffix="." + suffix) 890 try: 891 if len(type_group[suffix]) == 1: 892 self.assertEqual(audiotools.filename_to_type(temp.name), 893 type_group[suffix][0]) 894 else: 895 self.assertRaises(audiotools.AmbiguousAudioType, 896 audiotools.filename_to_type, 897 temp.name) 898 finally: 899 temp.close() 900 901 temp = tempfile.NamedTemporaryFile(suffix=".foo") 902 try: 903 self.assertRaises(audiotools.UnknownAudioType, 904 audiotools.filename_to_type, 905 temp.name) 906 finally: 907 temp.close() 908 909 temp = tempfile.NamedTemporaryFile() 910 try: 911 self.assertRaises(audiotools.UnknownAudioType, 912 audiotools.filename_to_type, 913 temp.name) 914 finally: 915 temp.close() 916 917 918class Test_group_tracks(unittest.TestCase): 919 @LIB_CORE 920 def setUp(self): 921 self.output_format = audiotools.FlacAudio 922 self.track_files = [ 923 tempfile.NamedTemporaryFile( 924 suffix="." + self.output_format.SUFFIX) 925 for i in range(5)] 926 self.tracks = [ 927 self.output_format.from_pcm( 928 track.name, 929 BLANK_PCM_Reader(1)) for track in self.track_files] 930 self.tracks[0].set_metadata(audiotools.MetaData( 931 album_name=u"Album 1", 932 album_number=1, 933 track_number=1)) 934 self.tracks[1].set_metadata(audiotools.MetaData( 935 album_name=u"Album 2", 936 album_number=1, 937 track_number=1)) 938 self.tracks[2].set_metadata(audiotools.MetaData( 939 album_name=u"Album 1", 940 album_number=1, 941 track_number=2)) 942 self.tracks[3].set_metadata(audiotools.MetaData( 943 album_name=u"Album 2", 944 album_number=2, 945 track_number=1)) 946 self.tracks[4].set_metadata(audiotools.MetaData( 947 album_name=u"Album 3", 948 album_number=1, 949 track_number=1)) 950 951 @LIB_CORE 952 def tearDown(self): 953 for track in self.track_files: 954 track.close() 955 956 @LIB_CORE 957 def test_grouping(self): 958 groupings = list(audiotools.group_tracks(self.tracks)) 959 groupings.sort(key=lambda x: x[0].get_metadata().album_name) 960 self.assertEqual(groupings[0], [self.tracks[0], self.tracks[2]]) 961 self.assertEqual(groupings[1], [self.tracks[1]]) 962 self.assertEqual(groupings[2], [self.tracks[3]]) 963 self.assertEqual(groupings[3], [self.tracks[4]]) 964 965 966class Test_open(unittest.TestCase): 967 @LIB_CORE 968 def setUp(self): 969 self.dummy1 = tempfile.NamedTemporaryFile() 970 self.dummy2 = tempfile.NamedTemporaryFile() 971 self.dummy3 = tempfile.NamedTemporaryFile() 972 self.dummy1.write(b"12345" * 1000) 973 self.dummy1.flush() 974 self.dummy2.write(b"54321" * 1000) 975 self.dummy2.flush() 976 977 with open("flac-allframes.flac", "rb") as f: 978 data = f.read() 979 self.dummy3.write(data[0:0x4] + b"\xFF" + data[0x5:]) 980 self.dummy3.flush() 981 982 @LIB_CORE 983 def tearDown(self): 984 self.dummy1.close() 985 self.dummy2.close() 986 self.dummy3.close() 987 988 @LIB_CORE 989 def test_open(self): 990 # ensure open on dummy file raises UnsupportedFile 991 self.assertRaises(audiotools.UnsupportedFile, 992 audiotools.open, 993 self.dummy1.name) 994 995 # ensure open on nonexistent file raises IOError 996 self.assertRaises(IOError, 997 audiotools.open, 998 "/dev/null/foo") 999 1000 # ensure open on directory raises IOError 1001 self.assertRaises(IOError, 1002 audiotools.open, 1003 "/") 1004 1005 # ensure open on unreadable file raises IOError 1006 os.chmod(self.dummy1.name, 0) 1007 try: 1008 self.assertRaises(IOError, 1009 audiotools.open, 1010 self.dummy1.name) 1011 finally: 1012 os.chmod(self.dummy1.name, 0o600) 1013 1014 self.assertRaises(audiotools.InvalidFile, 1015 audiotools.open, 1016 self.dummy3.name) 1017 1018 1019class Test_open_directory(unittest.TestCase): 1020 @LIB_CORE 1021 def setUp(self): 1022 self.output_type = audiotools.FlacAudio 1023 self.suffix = "." + self.output_type.SUFFIX 1024 self.dir = tempfile.mkdtemp() 1025 1026 def make_track(self, directory, track_number): 1027 track = self.output_type.from_pcm( 1028 os.path.join(directory, str(track_number) + self.suffix), 1029 BLANK_PCM_Reader(1)) 1030 track.set_metadata(audiotools.MetaData(track_name=u"Track Name", 1031 track_number=track_number)) 1032 return track 1033 1034 @LIB_CORE 1035 def tearDown(self): 1036 import shutil 1037 1038 shutil.rmtree(self.dir) 1039 1040 @LIB_CORE 1041 def test_open_directory(self): 1042 subdir1 = os.path.join(self.dir, "dir1") 1043 subdir2 = os.path.join(self.dir, "dir2") 1044 subdir3 = os.path.join(subdir1, "dir3") 1045 1046 os.mkdir(subdir1) 1047 os.mkdir(subdir2) 1048 os.mkdir(subdir3) 1049 1050 track0_1 = self.make_track(self.dir, 1) 1051 track0_2 = self.make_track(self.dir, 2) 1052 track0_3 = self.make_track(self.dir, 3) 1053 track1_1 = self.make_track(subdir1, 1) 1054 track1_2 = self.make_track(subdir1, 2) 1055 track1_3 = self.make_track(subdir1, 3) 1056 track2_1 = self.make_track(subdir2, 1) 1057 track2_2 = self.make_track(subdir2, 2) 1058 track2_3 = self.make_track(subdir2, 3) 1059 track3_1 = self.make_track(subdir3, 1) 1060 track3_2 = self.make_track(subdir3, 2) 1061 track3_3 = self.make_track(subdir3, 3) 1062 1063 tracks = list(audiotools.open_directory(self.dir)) 1064 self.assertEqual([t.filename for t in tracks], 1065 [t.filename for t in 1066 [track0_1, track0_2, track0_3, 1067 track1_1, track1_2, track1_3, 1068 track3_1, track3_2, track3_3, 1069 track2_1, track2_2, track2_3]]) 1070 1071 1072class Test_open_files(unittest.TestCase): 1073 @LIB_CORE 1074 def setUp(self): 1075 self.output_type = audiotools.FlacAudio 1076 self.suffix = "." + self.output_type.SUFFIX 1077 self.dir = tempfile.mkdtemp() 1078 1079 def make_track(self, directory, track_number): 1080 track = self.output_type.from_pcm( 1081 os.path.join(directory, str(track_number) + self.suffix), 1082 BLANK_PCM_Reader(1)) 1083 track.set_metadata(audiotools.MetaData(track_name=u"Track Name", 1084 track_number=track_number)) 1085 return track 1086 1087 @LIB_CORE 1088 def tearDown(self): 1089 import shutil 1090 1091 shutil.rmtree(self.dir) 1092 1093 @LIB_CORE 1094 def test_open_files(self): 1095 track1 = self.make_track(self.dir, 1) 1096 track2 = self.make_track(self.dir, 2) 1097 track3 = self.make_track(self.dir, 3) 1098 dummy1_name = os.path.join(self.dir, "4" + self.suffix) 1099 dummy1 = open(dummy1_name, "wb") 1100 dummy1.write(b"Hello World") 1101 dummy1.close() 1102 1103 tracks = list(audiotools.open_files([track1.filename, track2.filename, 1104 dummy1_name, track3.filename])) 1105 self.assertEqual([t.filename for t in tracks], 1106 [t.filename for t in [track1, track2, track3]]) 1107 1108 1109class Test_sorted_tracks(unittest.TestCase): 1110 @LIB_CORE 1111 def setUp(self): 1112 self.dir = tempfile.mkdtemp() 1113 1114 @LIB_CORE 1115 def tearDown(self): 1116 import shutil 1117 1118 shutil.rmtree(self.dir) 1119 1120 def __clean__(self): 1121 for f in os.listdir(self.dir): 1122 os.unlink(os.path.join(self.dir, f)) 1123 1124 def __no_metadata__(self, filename_number): 1125 return audiotools.WaveAudio.from_pcm( 1126 os.path.join(self.dir, "%2.2d.wav" % (filename_number)), 1127 BLANK_PCM_Reader(1)) 1128 1129 def __metadata__(self, filename_number, 1130 track_number=None, album_number=None): 1131 track = audiotools.FlacAudio.from_pcm( 1132 os.path.join(self.dir, "%2.2d.flac" % (filename_number)), 1133 BLANK_PCM_Reader(1)) 1134 track.set_metadata(audiotools.MetaData(track_number=track_number, 1135 album_number=album_number)) 1136 return track 1137 1138 def __test_order__(self, sorted_tracks): 1139 from random import shuffle 1140 1141 self.assertGreater(len(sorted_tracks), 1) 1142 1143 shuffled_tracks = sorted_tracks[:] 1144 while ([f.filename for f in shuffled_tracks] == 1145 [f.filename for f in sorted_tracks]): 1146 shuffle(shuffled_tracks) 1147 1148 self.assertNotEqual([f.filename for f in shuffled_tracks], 1149 [f.filename for f in sorted_tracks]) 1150 1151 reordered_tracks = audiotools.sorted_tracks(shuffled_tracks) 1152 1153 self.assertEqual([f.filename for f in reordered_tracks], 1154 [f.filename for f in sorted_tracks]) 1155 1156 @LIB_CORE 1157 def test_sort(self): 1158 # tracks without metadata come before tracks with metadata 1159 self.__clean__() 1160 self.__test_order__([self.__no_metadata__(1), 1161 self.__metadata__(2, 2), 1162 self.__metadata__(3, 3)]) 1163 1164 self.__clean__() 1165 self.__test_order__([self.__no_metadata__(3), 1166 self.__metadata__(1, 1), 1167 self.__metadata__(2, 2)]) 1168 1169 self.__clean__() 1170 self.__test_order__([self.__no_metadata__(3), 1171 self.__no_metadata__(10), 1172 self.__metadata__(1, 1), 1173 self.__metadata__(2, 2)]) 1174 1175 # tracks without metadata are sorted by filename 1176 self.__clean__() 1177 self.__test_order__([self.__no_metadata__(1), 1178 self.__no_metadata__(2), 1179 self.__no_metadata__(3)]) 1180 1181 self.__clean__() 1182 self.__test_order__([self.__no_metadata__(1), 1183 self.__no_metadata__(2), 1184 self.__no_metadata__(10), 1185 self.__no_metadata__(11)]) 1186 1187 # tracks without album numbers come before tracks with album numbers 1188 self.__clean__() 1189 self.__test_order__([self.__metadata__(3, 3), 1190 self.__metadata__(2, 1, 1), 1191 self.__metadata__(1, 2, 1)]) 1192 1193 self.__clean__() 1194 self.__test_order__([self.__metadata__(4, 3), 1195 self.__metadata__(3, 4), 1196 self.__metadata__(2, 1, 1), 1197 self.__metadata__(1, 2, 1)]) 1198 1199 # tracks without album numbers are sorted by track number (if any) 1200 self.__clean__() 1201 self.__test_order__([self.__metadata__(3), 1202 self.__metadata__(2, 1), 1203 self.__metadata__(1, 2)]) 1204 1205 self.__clean__() 1206 self.__test_order__([self.__metadata__(3, 1), 1207 self.__metadata__(2, 2), 1208 self.__metadata__(1, 3)]) 1209 1210 # tracks with album numbers are sorted by album number 1211 # and then by track number (if any) 1212 self.__clean__() 1213 self.__test_order__([self.__metadata__(5), 1214 self.__metadata__(4, 1, 1), 1215 self.__metadata__(3, 2, 1), 1216 self.__metadata__(2, 1, 2), 1217 self.__metadata__(1, 2, 2)]) 1218 1219 1220class Test_pcm_frame_cmp(unittest.TestCase): 1221 @LIB_CORE 1222 def test_pcm_frame_cmp(self): 1223 from test_formats import CLOSE_PCM_Reader 1224 1225 reader1 = CLOSE_PCM_Reader( 1226 test_streams.Sine16_Stereo(44100, 44100, 1227 441.0, 0.50, 1228 4410.0, 0.49, 1.0)) 1229 reader2 = CLOSE_PCM_Reader( 1230 test_streams.Sine16_Stereo(44100, 44100, 1231 441.0, 0.50, 1232 4410.0, 0.49, 1.0)) 1233 1234 self.assertEqual(reader1.closes_called, 0) 1235 self.assertEqual(reader2.closes_called, 0) 1236 self.assertIsNone(audiotools.pcm_frame_cmp(reader1, reader2)) 1237 self.assertEqual(reader1.closes_called, 1) 1238 self.assertEqual(reader2.closes_called, 1) 1239 1240 reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1)) 1241 reader2 = CLOSE_PCM_Reader(RANDOM_PCM_Reader(1)) 1242 1243 self.assertEqual(reader1.closes_called, 0) 1244 self.assertEqual(reader2.closes_called, 0) 1245 self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 0) 1246 self.assertEqual(reader1.closes_called, 1) 1247 self.assertEqual(reader2.closes_called, 1) 1248 1249 reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1)) 1250 reader2 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1, sample_rate=48000)) 1251 1252 self.assertEqual(reader1.closes_called, 0) 1253 self.assertEqual(reader2.closes_called, 0) 1254 self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 0) 1255 self.assertEqual(reader1.closes_called, 1) 1256 self.assertEqual(reader2.closes_called, 1) 1257 1258 reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1)) 1259 reader2 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1, channels=1)) 1260 1261 self.assertEqual(reader1.closes_called, 0) 1262 self.assertEqual(reader2.closes_called, 0) 1263 self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 0) 1264 self.assertEqual(reader1.closes_called, 1) 1265 self.assertEqual(reader2.closes_called, 1) 1266 1267 reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1)) 1268 reader2 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1, bits_per_sample=24)) 1269 1270 self.assertEqual(reader1.closes_called, 0) 1271 self.assertEqual(reader2.closes_called, 0) 1272 self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 0) 1273 self.assertEqual(reader1.closes_called, 1) 1274 self.assertEqual(reader2.closes_called, 1) 1275 1276 reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1)) 1277 reader2 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1, channel_mask=0x30)) 1278 1279 self.assertEqual(reader1.closes_called, 0) 1280 self.assertEqual(reader2.closes_called, 0) 1281 self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 0) 1282 self.assertEqual(reader1.closes_called, 1) 1283 self.assertEqual(reader2.closes_called, 1) 1284 1285 reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(2)) 1286 reader2 = CLOSE_PCM_Reader( 1287 audiotools.PCMCat([BLANK_PCM_Reader(1), RANDOM_PCM_Reader(1)])) 1288 1289 self.assertEqual(reader1.closes_called, 0) 1290 self.assertEqual(reader2.closes_called, 0) 1291 self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 44100) 1292 self.assertEqual(reader1.closes_called, 1) 1293 self.assertEqual(reader2.closes_called, 1) 1294 1295 1296class TestFrameList(unittest.TestCase): 1297 if sys.version_info[0] >= 3: 1298 @classmethod 1299 def Bits8(cls): 1300 for i in range(0, 0xFF + 1): 1301 yield bytes([i]) 1302 1303 @classmethod 1304 def Bits16(cls): 1305 for i in range(0, 0xFF + 1): 1306 for j in range(0, 0xFF + 1): 1307 yield bytes([i, j]) 1308 1309 @classmethod 1310 def Bits24(cls): 1311 for i in range(0, 0xFF + 1): 1312 for j in range(0, 0xFF + 1): 1313 for k in range(0, 0xFF + 1): 1314 yield bytes([i, j, k]) 1315 else: 1316 @classmethod 1317 def Bits8(cls): 1318 for i in range(0, 0xFF + 1): 1319 yield chr(i) 1320 1321 @classmethod 1322 def Bits16(cls): 1323 for i in range(0, 0xFF + 1): 1324 for j in range(0, 0xFF + 1): 1325 yield chr(i) + chr(j) 1326 1327 @classmethod 1328 def Bits24(cls): 1329 for i in range(0, 0xFF + 1): 1330 for j in range(0, 0xFF + 1): 1331 for k in range(0, 0xFF + 1): 1332 yield chr(i) + chr(j) + chr(k) 1333 1334 @LIB_CORE 1335 def test_basics(self): 1336 import audiotools.pcm 1337 1338 self.assertRaises(TypeError, 1339 audiotools.pcm.FrameList, 1340 0, 2, 16, 0, 1) 1341 1342 self.assertRaises(TypeError, 1343 audiotools.pcm.FrameList, 1344 [1, 2, 3], 2, 16, 0, 1) 1345 1346 self.assertRaises(ValueError, 1347 audiotools.pcm.FrameList, 1348 b"abc", 2, 16, 0, 1) 1349 1350 self.assertRaises(ValueError, 1351 audiotools.pcm.FrameList, 1352 b"abc", 4, 8, 0, 1) 1353 1354 self.assertRaises(ValueError, 1355 audiotools.pcm.FrameList, 1356 b"abcd", 1, 15, 0, 1) 1357 1358 f = audiotools.pcm.FrameList(ints_to_bytes(range(16)), 1359 2, 16, True, True) 1360 self.assertEqual(len(f), 8) 1361 self.assertEqual(f.channels, 2) 1362 self.assertEqual(f.frames, 4) 1363 self.assertEqual(f.bits_per_sample, 16) 1364 self.assertRaises(IndexError, f.__getitem__, 9) 1365 1366 self.assertEqual(list(f.frame(0)), 1367 [0x0001, 0x0203]) 1368 self.assertEqual(list(f.frame(1)), 1369 [0x0405, 0x0607]) 1370 self.assertEqual(list(f.frame(2)), 1371 [0x0809, 0x0A0B]) 1372 self.assertEqual(list(f.frame(3)), 1373 [0x0C0D, 0x0E0F]) 1374 self.assertRaises(IndexError, f.frame, 4) 1375 self.assertRaises(IndexError, f.frame, -1) 1376 1377 self.assertEqual(list(f.channel(0)), 1378 [0x0001, 0x0405, 0x0809, 0x0C0D]) 1379 self.assertEqual(list(f.channel(1)), 1380 [0x0203, 0x0607, 0x0A0B, 0x0E0F]) 1381 self.assertRaises(IndexError, f.channel, 2) 1382 self.assertRaises(IndexError, f.channel, -1) 1383 1384 for bps in [8, 16, 24]: 1385 self.assertEqual(list(audiotools.pcm.from_list( 1386 range(-40, 40), 1, bps, True)), 1387 list(range(-40, 40))) 1388 1389 for bps in [8, 16, 24]: 1390 self.assertEqual(list(audiotools.pcm.from_list( 1391 range((1 << (bps - 1)) - 40, 1392 (1 << (bps - 1)) + 40), 1, bps, False)), 1393 list(range(-40, 40))) 1394 1395 for channels in range(1, 9): 1396 for bps in [8, 16, 24]: 1397 for signed in [True, False]: 1398 if signed: 1399 l = [random.choice(range(-40, 40)) for i in 1400 range(16 * channels)] 1401 else: 1402 l = [random.choice(range(0, 80)) for i in 1403 range(16 * channels)] 1404 f2 = audiotools.pcm.from_list(l, channels, bps, signed) 1405 if signed: 1406 self.assertEqual(list(f2), l) 1407 for channel in range(channels): 1408 self.assertEqual(list(f2.channel(channel)), 1409 l[channel::channels]) 1410 else: 1411 self.assertEqual(list(f2), 1412 [i - (1 << (bps - 1)) 1413 for i in l]) 1414 for channel in range(channels): 1415 self.assertEqual(list(f2.channel(channel)), 1416 [i - (1 << (bps - 1)) 1417 for i in l[channel::channels]]) 1418 1419 self.assertEqual( 1420 f.to_bytes(True, True), 1421 b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f') 1422 self.assertEqual( 1423 f.to_bytes(False, True), 1424 b'\x01\x00\x03\x02\x05\x04\x07\x06\t\x08\x0b\n\r\x0c\x0f\x0e') 1425 # FIXME - check signed 1426 1427 self.assertEqual(list(f), 1428 list(audiotools.pcm.from_frames([f.frame(0), 1429 f.frame(1), 1430 f.frame(2), 1431 f.frame(3)]))) 1432 self.assertEqual(list(f), 1433 list(audiotools.pcm.from_channels([f.channel(0), 1434 f.channel(1)]))) 1435 1436 self.assertEqual(list(audiotools.pcm.from_list( 1437 [0x0001, 0x0203, 0x0405, 0x0607, 1438 0x0809, 0x0A0B, 0x0C0D, 0x0E0F], 2, 16, True)), 1439 list(f)) 1440 1441 self.assertRaises(ValueError, 1442 audiotools.pcm.from_list, 1443 [0x0001, 0x0203, 0x0405, 0x0607, 1444 0x0809, 0x0A0B, 0x0C0D], 2, 16, True) 1445 1446 self.assertRaises(ValueError, 1447 audiotools.pcm.from_list, 1448 [0x0001, 0x0203, 0x0405, 0x0607, 1449 0x0809, 0x0A0B, 0x0C0D, 0x0E0F], 2, 15, True) 1450 1451 self.assertRaises( 1452 TypeError, 1453 audiotools.pcm.from_frames, 1454 [audiotools.pcm.from_list(list(range(2)), 2, 16, False), 1455 list(range(2))]) 1456 1457 self.assertRaises( 1458 ValueError, 1459 audiotools.pcm.from_frames, 1460 [audiotools.pcm.from_list(list(range(2)), 2, 16, False), 1461 audiotools.pcm.from_list(list(range(4)), 2, 16, False)]) 1462 1463 self.assertRaises( 1464 ValueError, 1465 audiotools.pcm.from_frames, 1466 [audiotools.pcm.from_list(list(range(2)), 2, 16, False), 1467 audiotools.pcm.from_list(list(range(2)), 1, 16, False)]) 1468 1469 self.assertRaises( 1470 ValueError, 1471 audiotools.pcm.from_frames, 1472 [audiotools.pcm.from_list(list(range(2)), 2, 16, False), 1473 audiotools.pcm.from_list(list(range(2)), 2, 8, False)]) 1474 1475 self.assertEqual(list(audiotools.pcm.from_frames( 1476 [audiotools.pcm.from_list(list(range(2)), 2, 16, True), 1477 audiotools.pcm.from_list(list(range(2, 4)), 2, 16, True)])), 1478 list(range(4))) 1479 1480 self.assertRaises( 1481 TypeError, 1482 audiotools.pcm.from_channels, 1483 [audiotools.pcm.from_list(list(range(2)), 1, 16, False), 1484 list(range(2))]) 1485 1486 self.assertRaises( 1487 ValueError, 1488 audiotools.pcm.from_channels, 1489 [audiotools.pcm.from_list(list(range(1)), 1, 16, False), 1490 audiotools.pcm.from_list(list(range(2)), 2, 16, False)]) 1491 1492 self.assertRaises( 1493 ValueError, 1494 audiotools.pcm.from_channels, 1495 [audiotools.pcm.from_list(list(range(2)), 1, 16, False), 1496 audiotools.pcm.from_list(list(range(3)), 1, 16, False)]) 1497 1498 self.assertRaises( 1499 ValueError, 1500 audiotools.pcm.from_channels, 1501 [audiotools.pcm.from_list(list(range(2)), 1, 16, False), 1502 audiotools.pcm.from_list(list(range(2)), 1, 8, False)]) 1503 1504 self.assertEqual(list(audiotools.pcm.from_channels( 1505 [audiotools.pcm.from_list(list(range(2)), 1, 16, True), 1506 audiotools.pcm.from_list(list(range(2, 4)), 1, 16, True)])), 1507 [0, 2, 1, 3]) 1508 1509 self.assertRaises(IndexError, f.split, -1) 1510 1511 (f1, f2) = f.split(2) 1512 self.assertEqual(list(f1), 1513 [0x0001, 0x0203, 1514 0x0405, 0x0607]) 1515 self.assertEqual(list(f2), 1516 [0x0809, 0x0A0B, 1517 0x0C0D, 0x0E0F]) 1518 1519 (f1, f2) = f.split(0) 1520 self.assertEqual(list(f1), 1521 []) 1522 self.assertEqual(list(f2), 1523 [0x0001, 0x0203, 1524 0x0405, 0x0607, 1525 0x0809, 0x0A0B, 1526 0x0C0D, 0x0E0F]) 1527 1528 (f1, f2) = f.split(20) 1529 self.assertEqual(list(f1), 1530 [0x0001, 0x0203, 1531 0x0405, 0x0607, 1532 0x0809, 0x0A0B, 1533 0x0C0D, 0x0E0F]) 1534 self.assertEqual(list(f2), 1535 []) 1536 1537 for i in range(f.frames): 1538 (f1, f2) = f.split(i) 1539 self.assertEqual(len(f1), i * f.channels) 1540 self.assertEqual(len(f2), (len(f) - (i * f.channels))) 1541 self.assertEqual(list(f1 + f2), list(f)) 1542 1543 import operator 1544 1545 f1 = audiotools.pcm.from_list(list(range(10)), 2, 16, False) 1546 self.assertRaises(TypeError, operator.concat, f1, [1, 2, 3]) 1547 f2 = audiotools.pcm.from_list(list(range(10, 20)), 1, 16, False) 1548 self.assertRaises(ValueError, operator.concat, f1, f2) 1549 f2 = audiotools.pcm.from_list(list(range(10, 20)), 2, 8, False) 1550 self.assertRaises(ValueError, operator.concat, f1, f2) 1551 1552 f1 = audiotools.pcm.from_list(list(range(10)), 2, 16, False) 1553 self.assertEqual(f1, audiotools.pcm.from_list(range(10), 2, 16, False)) 1554 self.assertNotEqual(f1, 10) 1555 self.assertNotEqual(f1, list(range(10))) 1556 self.assertNotEqual( 1557 f1, 1558 audiotools.pcm.from_list(list(range(10)), 1, 16, False)) 1559 self.assertNotEqual( 1560 f1, 1561 audiotools.pcm.from_list(list(range(10)), 2, 8, False)) 1562 self.assertNotEqual( 1563 f1, 1564 audiotools.pcm.from_list(list(range(10)), 1, 8, False)) 1565 self.assertNotEqual( 1566 f1, 1567 audiotools.pcm.from_list(list(range(8)), 2, 16, False)) 1568 self.assertNotEqual( 1569 f1, 1570 audiotools.pcm.from_list(list(range(12)), 2, 8, False)) 1571 1572 @LIB_CORE 1573 def test_8bit_roundtrip(self): 1574 import audiotools.pcm 1575 1576 unsigned_ints = list(range(0, 0xFF + 1)) 1577 signed_ints = list(range(-0x80, 0x7F + 1)) 1578 1579 # unsigned, big-endian 1580 self.assertEqual( 1581 [i - (1 << 7) for i in unsigned_ints], 1582 list(audiotools.pcm.FrameList( 1583 struct.pack(">%dB" % (len(unsigned_ints)), *unsigned_ints), 1584 1, 8, True, False))) 1585 1586 # unsigned, little-endian 1587 self.assertEqual( 1588 [i - (1 << 7) for i in unsigned_ints], 1589 list(audiotools.pcm.FrameList( 1590 struct.pack("<%dB" % (len(unsigned_ints)), *unsigned_ints), 1591 1, 8, False, False))) 1592 1593 # signed, big-endian 1594 self.assertEqual( 1595 signed_ints, 1596 list(audiotools.pcm.FrameList( 1597 struct.pack(">%db" % (len(signed_ints)), *signed_ints), 1598 1, 8, True, True))) 1599 1600 # signed, little-endian 1601 self.assertEqual( 1602 signed_ints, 1603 list(audiotools.pcm.FrameList( 1604 struct.pack("<%db" % (len(signed_ints)), *signed_ints), 1605 1, 8, 0, 1))) 1606 1607 @LIB_CORE 1608 def test_8bit_roundtrip_str(self): 1609 import audiotools.pcm 1610 1611 s = b"".join(TestFrameList.Bits8()) 1612 1613 # big endian, unsigned 1614 self.assertEqual( 1615 audiotools.pcm.FrameList(s, 1, 8, 1616 True, False).to_bytes(True, False), s) 1617 1618 # big-endian, signed 1619 self.assertEqual( 1620 audiotools.pcm.FrameList(s, 1, 8, 1621 True, True).to_bytes(True, True), s) 1622 1623 # little-endian, unsigned 1624 self.assertEqual( 1625 audiotools.pcm.FrameList(s, 1, 8, 1626 False, False).to_bytes(False, False), s) 1627 1628 # little-endian, signed 1629 self.assertEqual( 1630 audiotools.pcm.FrameList(s, 1, 8, 1631 False, True).to_bytes(False, True), s) 1632 1633 @LIB_CORE 1634 def test_16bit_roundtrip(self): 1635 import audiotools.pcm 1636 1637 unsigned_ints = list(range(0, 0xFFFF + 1)) 1638 signed_ints = list(range(-0x8000, 0x7FFF + 1)) 1639 1640 # unsigned, big-endian 1641 self.assertEqual( 1642 [i - (1 << 15) for i in unsigned_ints], 1643 list(audiotools.pcm.FrameList( 1644 struct.pack(">%dH" % (len(unsigned_ints)), *unsigned_ints), 1645 1, 16, True, False))) 1646 1647 # unsigned, little-endian 1648 self.assertEqual( 1649 [i - (1 << 15) for i in unsigned_ints], 1650 list(audiotools.pcm.FrameList( 1651 struct.pack("<%dH" % (len(unsigned_ints)), *unsigned_ints), 1652 1, 16, False, False))) 1653 1654 # signed, big-endian 1655 self.assertEqual( 1656 signed_ints, 1657 list(audiotools.pcm.FrameList( 1658 struct.pack(">%dh" % (len(signed_ints)), *signed_ints), 1659 1, 16, True, True))) 1660 1661 # signed, little-endian 1662 self.assertEqual( 1663 signed_ints, 1664 list(audiotools.pcm.FrameList( 1665 struct.pack("<%dh" % (len(signed_ints)), *signed_ints), 1666 1, 16, False, True))) 1667 1668 @LIB_CORE 1669 def test_16bit_roundtrip_str(self): 1670 import audiotools.pcm 1671 1672 s = b"".join(TestFrameList.Bits16()) 1673 1674 # big-endian, unsigned 1675 self.assertEqual( 1676 audiotools.pcm.FrameList(s, 1, 16, 1677 True, False).to_bytes(True, False), 1678 s, 1679 "data mismatch converting UBInt16 through string") 1680 1681 # big-endian, signed 1682 self.assertEqual( 1683 audiotools.pcm.FrameList(s, 1, 16, 1684 True, True).to_bytes(True, True), 1685 s, 1686 "data mismatch converting SBInt16 through string") 1687 1688 # little-endian, unsigned 1689 self.assertEqual( 1690 audiotools.pcm.FrameList(s, 1, 16, 1691 False, False).to_bytes(False, False), 1692 s, 1693 "data mismatch converting ULInt16 through string") 1694 1695 # little-endian, signed 1696 self.assertEqual( 1697 audiotools.pcm.FrameList(s, 1, 16, 1698 False, True).to_bytes(False, True), 1699 s, 1700 "data mismatch converting USInt16 through string") 1701 1702 @LIB_CORE 1703 def test_24bit_roundtrip(self): 1704 import audiotools.pcm 1705 from audiotools.bitstream import BitstreamRecorder 1706 1707 unsigned_values = list(range(0, 2 ** 24, 1024)) 1708 unsigned_values.append(2 ** 24 - 1) 1709 1710 #unsigned, big-endian 1711 rec = BitstreamRecorder(0) 1712 rec.build("%d*24u" % (len(unsigned_values)), unsigned_values) 1713 framelist = audiotools.pcm.FrameList(rec.data(), 1, 24, True, False) 1714 self.assertEqual(len(unsigned_values), framelist.frames) 1715 self.assertEqual( 1716 [i - (1 << 23) for i in unsigned_values], 1717 list(framelist)) 1718 1719 #unsigned, little-endian 1720 rec = BitstreamRecorder(1) 1721 rec.build("%d*24u" % (len(unsigned_values)), unsigned_values) 1722 framelist = audiotools.pcm.FrameList(rec.data(), 1, 24, False, False) 1723 self.assertEqual(len(unsigned_values), framelist.frames) 1724 self.assertEqual( 1725 [i - (1 << 23) for i in unsigned_values], 1726 list(framelist)) 1727 1728 signed_values = list(range(-(2 ** 23), 2 ** 23, 1024)) 1729 signed_values.append(2 ** 23 - 1) 1730 1731 rec = BitstreamRecorder(0) 1732 rec.build("%d*24s" % (len(signed_values)), signed_values) 1733 framelist = audiotools.pcm.FrameList(rec.data(), 1, 24, True, True) 1734 self.assertEqual(len(signed_values), framelist.frames) 1735 self.assertEqual(signed_values, list(framelist)) 1736 1737 rec = BitstreamRecorder(1) 1738 rec.build("%d*24s" % (len(signed_values)), signed_values) 1739 framelist = audiotools.pcm.FrameList(rec.data(), 1, 24, False, True) 1740 self.assertEqual(len(signed_values), framelist.frames) 1741 self.assertEqual(signed_values, list(framelist)) 1742 1743 @LIB_CORE 1744 def test_24bit_roundtrip_str(self): 1745 import audiotools.pcm 1746 1747 s = b"".join(TestFrameList.Bits24()) 1748 # big-endian, unsigned 1749 self.assertEqual( 1750 audiotools.pcm.FrameList(s, 1, 24, 1751 True, False).to_bytes(True, False), s) 1752 1753 # big-endian, signed 1754 self.assertEqual( 1755 audiotools.pcm.FrameList(s, 1, 24, 1756 True, True).to_bytes(True, True), s) 1757 1758 # little-endian, unsigned 1759 self.assertEqual( 1760 audiotools.pcm.FrameList(s, 1, 24, 1761 False, False).to_bytes(False, False), s) 1762 1763 # little-endian, signed 1764 self.assertEqual( 1765 audiotools.pcm.FrameList(s, 1, 24, 1766 False, True).to_bytes(False, True), s) 1767 1768 @LIB_CORE 1769 def test_conversion(self): 1770 for format in audiotools.AVAILABLE_TYPES: 1771 temp_track = tempfile.NamedTemporaryFile( 1772 suffix="." + format.SUFFIX) 1773 try: 1774 for sine_class in [test_streams.Sine8_Stereo, 1775 test_streams.Sine16_Stereo, 1776 test_streams.Sine24_Stereo]: 1777 sine = sine_class(88200, 1778 44100, 1779 441.0, 1780 0.50, 1781 441.0, 1782 0.49, 1783 1.0) 1784 try: 1785 track = format.from_pcm(temp_track.name, sine) 1786 except audiotools.UnsupportedBitsPerSample: 1787 continue 1788 if track.lossless(): 1789 md5sum = md5() 1790 audiotools.transfer_framelist_data( 1791 track.to_pcm(), md5sum.update) 1792 self.assertEqual( 1793 md5sum.hexdigest(), sine.hexdigest(), 1794 "MD5 mismatch for %s using %s" % ( 1795 track.NAME, repr(sine))) 1796 for new_format in audiotools.AVAILABLE_TYPES: 1797 with tempfile.NamedTemporaryFile( 1798 suffix="." + format.SUFFIX) as temp_track2: 1799 try: 1800 track2 = new_format.from_pcm( 1801 temp_track2.name, 1802 track.to_pcm()) 1803 except audiotools.UnsupportedBitsPerSample: 1804 continue 1805 if track2.lossless(): 1806 md5sum2 = md5() 1807 audiotools.transfer_framelist_data( 1808 track2.to_pcm(), md5sum2.update) 1809 self.assertEqual( 1810 md5sum.hexdigest(), 1811 sine.hexdigest(), 1812 "MD5 mismatch for converting %s from %s to %s" % (repr(sine), track.NAME, track2.NAME)) 1813 finally: 1814 temp_track.close() 1815 1816 @LIB_CORE 1817 def test_errors(self): 1818 # check list that's too large 1819 self.assertRaises(ValueError, 1820 audiotools.pcm.FloatFrameList, 1821 [0.0] * 5, 2) 1822 1823 # check list that's too small 1824 self.assertRaises(ValueError, 1825 audiotools.pcm.FloatFrameList, 1826 [0.0] * 3, 2) 1827 1828 # check channels <= 0 1829 self.assertRaises(ValueError, 1830 audiotools.pcm.FloatFrameList, 1831 [0.0] * 4, 0) 1832 1833 self.assertRaises(ValueError, 1834 audiotools.pcm.FloatFrameList, 1835 [0.0] * 4, -1) 1836 1837 1838class TestFloatFrameList(unittest.TestCase): 1839 @LIB_CORE 1840 def test_basics(self): 1841 import audiotools.pcm 1842 1843 self.assertRaises(ValueError, 1844 audiotools.pcm.FloatFrameList, 1845 [1.0, 2.0, 3.0], 2) 1846 1847 self.assertRaises(TypeError, 1848 audiotools.pcm.FloatFrameList, 1849 0, 1) 1850 1851 self.assertRaises(TypeError, 1852 audiotools.pcm.FloatFrameList, 1853 [1.0, 2.0, "a"], 1) 1854 1855 f = audiotools.pcm.FloatFrameList(list(map(float, range(8))), 2) 1856 self.assertEqual(len(f), 8) 1857 self.assertEqual(f.channels, 2) 1858 self.assertEqual(f.frames, 4) 1859 self.assertRaises(IndexError, f.__getitem__, 9) 1860 1861 self.assertEqual(list(f.frame(0)), 1862 [0.0, 1.0]) 1863 self.assertEqual(list(f.frame(1)), 1864 [2.0, 3.0]) 1865 self.assertEqual(list(f.frame(2)), 1866 [4.0, 5.0]) 1867 self.assertEqual(list(f.frame(3)), 1868 [6.0, 7.0]) 1869 self.assertRaises(IndexError, f.frame, 4) 1870 self.assertRaises(IndexError, f.frame, -1) 1871 1872 self.assertEqual(list(f.channel(0)), 1873 [0.0, 2.0, 4.0, 6.0]) 1874 self.assertEqual(list(f.channel(1)), 1875 [1.0, 3.0, 5.0, 7.0]) 1876 self.assertRaises(IndexError, f.channel, 2) 1877 self.assertRaises(IndexError, f.channel, -1) 1878 1879 self.assertEqual(list(f), 1880 list(audiotools.pcm.from_float_frames([f.frame(0), 1881 f.frame(1), 1882 f.frame(2), 1883 f.frame(3)]))) 1884 self.assertEqual(list(f), 1885 list(audiotools.pcm.from_float_channels([ 1886 f.channel(0), 1887 f.channel(1)]))) 1888 1889 # FIXME - check from_frames 1890 # FIXME - check from_channels 1891 1892 self.assertRaises(IndexError, f.split, -1) 1893 1894 (f1, f2) = f.split(2) 1895 self.assertEqual(list(f1), 1896 [0.0, 1.0, 1897 2.0, 3.0]) 1898 self.assertEqual(list(f2), 1899 [4.0, 5.0, 1900 6.0, 7.0]) 1901 1902 (f1, f2) = f.split(0) 1903 self.assertEqual(list(f1), 1904 []) 1905 self.assertEqual(list(f2), 1906 [0.0, 1.0, 1907 2.0, 3.0, 1908 4.0, 5.0, 1909 6.0, 7.0]) 1910 1911 (f1, f2) = f.split(20) 1912 self.assertEqual(list(f1), 1913 [0.0, 1.0, 1914 2.0, 3.0, 1915 4.0, 5.0, 1916 6.0, 7.0]) 1917 self.assertEqual(list(f2), 1918 []) 1919 1920 for i in range(f.frames): 1921 (f1, f2) = f.split(i) 1922 self.assertEqual(len(f1), i * f.channels) 1923 self.assertEqual(len(f2), (len(f) - (i * f.channels))) 1924 self.assertEqual(list(f1 + f2), list(f)) 1925 1926 import operator 1927 1928 f1 = audiotools.pcm.FloatFrameList(list(map(float, range(10))), 2) 1929 self.assertRaises(TypeError, operator.concat, f1, [1, 2, 3]) 1930 1931 # check round-trip from float->int->float 1932 l = [float(i - 128) / (1 << 7) for i in range(0, 1 << 8)] 1933 for bps in [8, 16, 24]: 1934 for signed in [True, False]: 1935 self.assertEqual( 1936 l, 1937 list(audiotools.pcm.FloatFrameList(l, 1).to_int(bps).to_float())) 1938 1939 # check round-trip from int->float->int 1940 for bps in [8, 16, 24]: 1941 l = list(range(0, 1 << bps, 4)) 1942 self.assertEqual( 1943 [i - (1 << (bps - 1)) for i in l], 1944 list(audiotools.pcm.from_list(l, 1, bps, False).to_float().to_int(bps))) 1945 1946 l = list(range(-(1 << (bps - 1)), (1 << (bps - 1)) - 1, 4)) 1947 self.assertEqual( 1948 l, 1949 list(audiotools.pcm.from_list(l, 1, bps, True).to_float().to_int(bps))) 1950 1951 @LIB_CORE 1952 def test_errors(self): 1953 # check string that's too large 1954 self.assertRaises(ValueError, 1955 audiotools.pcm.FrameList, 1956 b"\x00" * 5, 2, 16, 1, 1) 1957 1958 # check string that's too small 1959 self.assertRaises(ValueError, 1960 audiotools.pcm.FrameList, 1961 b"\x00" * 3, 2, 16, 1, 1) 1962 1963 # check channels <= 0 1964 self.assertRaises(ValueError, 1965 audiotools.pcm.FrameList, 1966 b"\x00" * 4, 0, 16, 1, 1) 1967 1968 self.assertRaises(ValueError, 1969 audiotools.pcm.FrameList, 1970 b"\x00" * 4, -1, 16, 1, 1) 1971 1972 # check bps != 8,16,24 1973 for bps in [0, 7, 9, 15, 17, 23, 25, 64]: 1974 self.assertRaises(ValueError, 1975 audiotools.pcm.FrameList, 1976 b"\x00" * 4, 2, bps, 1, 1) 1977 1978 1979class __SimpleChunkReader__: 1980 def __init__(self, chunks): 1981 self.chunks = chunks 1982 self.i = 0 1983 1984 def read(self, bytes): 1985 try: 1986 chunk = self.chunks[self.i] 1987 self.i += 1 1988 return chunk 1989 except IndexError: 1990 return "" 1991 1992 def tell(self): 1993 return self.i 1994 1995 def seek(self, position): 1996 self.i = position 1997 1998 def close(self): 1999 pass 2000 2001 2002class ByteCounter: 2003 def __init__(self): 2004 self.bytes = 0 2005 2006 def __int__(self): 2007 return self.bytes 2008 2009 def reset(self): 2010 self.bytes = 0 2011 2012 def callback(self, i): 2013 self.bytes += 1 2014 2015 2016class Bitstream(unittest.TestCase): 2017 def __test_big_endian_reader__(self, reader, table): 2018 # check the bitstream reader 2019 # against some known big-endian values 2020 2021 pos = reader.getpos() 2022 self.assertEqual(reader.read(2), 0x2) 2023 self.assertEqual(reader.read(3), 0x6) 2024 self.assertEqual(reader.read(5), 0x07) 2025 self.assertEqual(reader.read(3), 0x5) 2026 self.assertEqual(reader.read(19), 0x53BC1) 2027 2028 reader.setpos(pos) 2029 self.assertEqual(reader.read(2), 0x2) 2030 reader.skip(3) 2031 self.assertEqual(reader.read(5), 0x07) 2032 reader.skip(3) 2033 self.assertEqual(reader.read(19), 0x53BC1) 2034 2035 reader.setpos(pos) 2036 self.assertTrue(reader.byte_aligned()) 2037 for i in range(32): 2038 bit = reader.read(1) 2039 reader.unread(bit) 2040 self.assertEqual(reader.read(1), bit) 2041 self.assertTrue(reader.byte_aligned()) 2042 2043 reader.setpos(pos) 2044 reader.unread(reader.read(1)) 2045 self.assertTrue(reader.byte_aligned()) 2046 self.assertEqual(reader.read_bytes(4), b"\xB1\xED\x3B\xC1") 2047 self.assertTrue(reader.byte_aligned()) 2048 2049 reader.setpos(pos) 2050 for i in range(8): 2051 reader.unread(0) 2052 self.assertRaises(IOError, reader.unread, 0) 2053 2054 reader.setpos(pos) 2055 self.assertEqual(reader.read_signed(2), -2) 2056 self.assertEqual(reader.read_signed(3), -2) 2057 self.assertEqual(reader.read_signed(5), 7) 2058 self.assertEqual(reader.read_signed(3), -3) 2059 self.assertEqual(reader.read_signed(19), -181311) 2060 2061 reader.setpos(pos) 2062 self.assertEqual(reader.unary(0), 1) 2063 self.assertEqual(reader.unary(0), 2) 2064 self.assertEqual(reader.unary(0), 0) 2065 self.assertEqual(reader.unary(0), 0) 2066 self.assertEqual(reader.unary(0), 4) 2067 2068 reader.setpos(pos) 2069 self.assertEqual(reader.unary(1), 0) 2070 self.assertEqual(reader.unary(1), 1) 2071 self.assertEqual(reader.unary(1), 0) 2072 self.assertEqual(reader.unary(1), 3) 2073 self.assertEqual(reader.unary(1), 0) 2074 2075 reader.setpos(pos) 2076 self.assertEqual(reader.read_huffman_code(table), 1) 2077 self.assertEqual(reader.read_huffman_code(table), 0) 2078 self.assertEqual(reader.read_huffman_code(table), 4) 2079 self.assertEqual(reader.read_huffman_code(table), 0) 2080 self.assertEqual(reader.read_huffman_code(table), 0) 2081 self.assertEqual(reader.read_huffman_code(table), 2) 2082 self.assertEqual(reader.read_huffman_code(table), 1) 2083 self.assertEqual(reader.read_huffman_code(table), 1) 2084 self.assertEqual(reader.read_huffman_code(table), 2) 2085 self.assertEqual(reader.read_huffman_code(table), 0) 2086 self.assertEqual(reader.read_huffman_code(table), 2) 2087 self.assertEqual(reader.read_huffman_code(table), 0) 2088 self.assertEqual(reader.read_huffman_code(table), 1) 2089 self.assertEqual(reader.read_huffman_code(table), 4) 2090 self.assertEqual(reader.read_huffman_code(table), 2) 2091 2092 reader.setpos(pos) 2093 self.assertEqual(reader.read(3), 5) 2094 reader.byte_align() 2095 self.assertEqual(reader.read(3), 7) 2096 reader.byte_align() 2097 reader.byte_align() 2098 self.assertEqual(reader.read(8), 59) 2099 reader.byte_align() 2100 self.assertEqual(reader.read(4), 12) 2101 2102 reader.setpos(pos) 2103 self.assertEqual(reader.read_bytes(2), b"\xB1\xED") 2104 reader.setpos(pos) 2105 self.assertEqual(reader.read(4), 11) 2106 self.assertEqual(reader.read_bytes(2), b"\x1E\xD3") 2107 2108 reader.setpos(pos) 2109 self.assertEqual(reader.read(3), 5) 2110 reader.set_endianness(1) 2111 self.assertEqual(reader.read(3), 5) 2112 reader.set_endianness(0) 2113 self.assertEqual(reader.read(4), 3) 2114 reader.set_endianness(0) 2115 self.assertEqual(reader.read(4), 12) 2116 2117 reader.setpos(pos) 2118 pos2 = reader.getpos() 2119 self.assertEqual(reader.read(4), 0xB) 2120 reader.setpos(pos2) 2121 self.assertEqual(reader.read(8), 0xB1) 2122 reader.setpos(pos2) 2123 self.assertEqual(reader.read(12), 0xB1E) 2124 del(pos2) 2125 pos3 = reader.getpos() 2126 self.assertEqual(reader.read(4), 0xD) 2127 reader.setpos(pos3) 2128 self.assertEqual(reader.read(8), 0xD3) 2129 reader.setpos(pos3) 2130 self.assertEqual(reader.read(12), 0xD3B) 2131 del(pos3) 2132 2133 reader.seek(3, 0) 2134 self.assertEqual(reader.read(8), 0xC1) 2135 reader.seek(2, 0) 2136 self.assertEqual(reader.read(8), 0x3B) 2137 reader.seek(1, 0) 2138 self.assertEqual(reader.read(8), 0xED) 2139 reader.seek(0, 0) 2140 self.assertEqual(reader.read(8), 0xB1) 2141 try: 2142 reader.seek(4, 0) 2143 reader.read(8) 2144 self.assertTrue(False) 2145 except IOError: 2146 self.assertTrue(True) 2147 try: 2148 reader.seek(-1, 0) 2149 reader.read(8) 2150 self.assertTrue(False) 2151 except IOError: 2152 self.assertTrue(True) 2153 2154 reader.seek(-1, 2) 2155 self.assertEqual(reader.read(8), 0xC1) 2156 reader.seek(-2, 2) 2157 self.assertEqual(reader.read(8), 0x3B) 2158 reader.seek(-3, 2) 2159 self.assertEqual(reader.read(8), 0xED) 2160 reader.seek(-4, 2) 2161 self.assertEqual(reader.read(8), 0xB1) 2162 # BytesIO objects allow seeking to before the beginning 2163 # of the stream, placing the cursor at the beginning 2164 # try: 2165 # reader.seek(-5, 2) 2166 # reader.read(8) 2167 # self.assertTrue(False) 2168 # except IOError: 2169 # self.assertTrue(True) 2170 try: 2171 reader.seek(1, 2) 2172 reader.read(8) 2173 self.assertTrue(False) 2174 except IOError: 2175 self.assertTrue(True) 2176 2177 reader.seek(0, 0) 2178 reader.seek(3, 1) 2179 self.assertEqual(reader.read(8), 0xC1) 2180 reader.seek(0, 0) 2181 reader.seek(2, 1) 2182 self.assertEqual(reader.read(8), 0x3B) 2183 reader.seek(0, 0) 2184 reader.seek(1, 1) 2185 self.assertEqual(reader.read(8), 0xED) 2186 reader.seek(0, 0) 2187 reader.seek(0, 1) 2188 self.assertEqual(reader.read(8), 0xB1) 2189 try: 2190 reader.seek(0, 0) 2191 reader.seek(4, 1) 2192 reader.read(8) 2193 self.assertTrue(False) 2194 except IOError: 2195 self.assertTrue(True) 2196 # BytesIO objects allow seeking to before the beginning 2197 # of the stream, placing the cursor at the beginning 2198 # try: 2199 # reader.seek(0, 0) 2200 # reader.seek(-1, 1) 2201 # reader.read(8) 2202 # self.assertTrue(False) 2203 # except IOError: 2204 # self.assertTrue(True) 2205 2206 reader.seek(0, 2) 2207 reader.seek(-1, 1) 2208 self.assertEqual(reader.read(8), 0xC1) 2209 reader.seek(0, 2) 2210 reader.seek(-2, 1) 2211 self.assertEqual(reader.read(8), 0x3B) 2212 reader.seek(0, 2) 2213 reader.seek(-3, 1) 2214 self.assertEqual(reader.read(8), 0xED) 2215 reader.seek(0, 2) 2216 reader.seek(-4, 1) 2217 self.assertEqual(reader.read(8), 0xB1) 2218 # BytesIO objects allow seeking to before the beginning 2219 # of the stream, placing the cursor at the beginning 2220 # try: 2221 # reader.seek(0, 2) 2222 # reader.seek(-5, 1) 2223 # reader.read(8) 2224 # self.assertTrue(False) 2225 # except IOError: 2226 # self.assertTrue(True) 2227 try: 2228 reader.seek(0, 2) 2229 reader.seek(1, 1) 2230 reader.read(8) 2231 self.assertTrue(False) 2232 except IOError: 2233 self.assertTrue(True) 2234 2235 reader.setpos(pos) 2236 2237 def __test_little_endian_reader__(self, reader, table): 2238 # check the bitstream reader 2239 # against some known little-endian values 2240 2241 pos = reader.getpos() 2242 self.assertEqual(reader.read(2), 0x1) 2243 self.assertEqual(reader.read(3), 0x4) 2244 self.assertEqual(reader.read(5), 0x0D) 2245 self.assertEqual(reader.read(3), 0x3) 2246 self.assertEqual(reader.read(19), 0x609DF) 2247 2248 reader.setpos(pos) 2249 self.assertEqual(reader.read(2), 0x1) 2250 reader.skip(3) 2251 self.assertEqual(reader.read(5), 0x0D) 2252 reader.skip(3) 2253 self.assertEqual(reader.read(19), 0x609DF) 2254 2255 reader.setpos(pos) 2256 self.assertTrue(reader.byte_aligned()) 2257 for i in range(32): 2258 bit = reader.read(1) 2259 reader.unread(bit) 2260 self.assertEqual(reader.read(1), bit) 2261 self.assertTrue(reader.byte_aligned()) 2262 2263 reader.setpos(pos) 2264 reader.unread(reader.read(1)) 2265 self.assertTrue(reader.byte_aligned()) 2266 self.assertEqual(reader.read_bytes(4), b"\xB1\xED\x3B\xC1") 2267 self.assertTrue(reader.byte_aligned()) 2268 2269 reader.setpos(pos) 2270 for i in range(8): 2271 reader.unread(0) 2272 self.assertRaises(IOError, reader.unread, 0) 2273 2274 reader.setpos(pos) 2275 self.assertEqual(reader.read_signed(2), 1) 2276 self.assertEqual(reader.read_signed(3), -4) 2277 self.assertEqual(reader.read_signed(5), 13) 2278 self.assertEqual(reader.read_signed(3), 3) 2279 self.assertEqual(reader.read_signed(19), -128545) 2280 2281 reader.setpos(pos) 2282 self.assertEqual(reader.unary(0), 1) 2283 self.assertEqual(reader.unary(0), 0) 2284 self.assertEqual(reader.unary(0), 0) 2285 self.assertEqual(reader.unary(0), 2) 2286 self.assertEqual(reader.unary(0), 2) 2287 2288 reader.setpos(pos) 2289 self.assertEqual(reader.unary(1), 0) 2290 self.assertEqual(reader.unary(1), 3) 2291 self.assertEqual(reader.unary(1), 0) 2292 self.assertEqual(reader.unary(1), 1) 2293 self.assertEqual(reader.unary(1), 0) 2294 2295 reader.setpos(pos) 2296 self.assertEqual(reader.read_huffman_code(table), 1) 2297 self.assertEqual(reader.read_huffman_code(table), 3) 2298 self.assertEqual(reader.read_huffman_code(table), 1) 2299 self.assertEqual(reader.read_huffman_code(table), 0) 2300 self.assertEqual(reader.read_huffman_code(table), 2) 2301 self.assertEqual(reader.read_huffman_code(table), 1) 2302 self.assertEqual(reader.read_huffman_code(table), 0) 2303 self.assertEqual(reader.read_huffman_code(table), 0) 2304 self.assertEqual(reader.read_huffman_code(table), 1) 2305 self.assertEqual(reader.read_huffman_code(table), 0) 2306 self.assertEqual(reader.read_huffman_code(table), 1) 2307 self.assertEqual(reader.read_huffman_code(table), 2) 2308 self.assertEqual(reader.read_huffman_code(table), 4) 2309 self.assertEqual(reader.read_huffman_code(table), 3) 2310 2311 reader.setpos(pos) 2312 self.assertEqual(reader.read_bytes(2), b"\xB1\xED") 2313 reader.setpos(pos) 2314 self.assertEqual(reader.read(4), 1) 2315 2316 self.assertEqual(reader.read_bytes(2), b"\xDB\xBE") 2317 2318 reader.setpos(pos) 2319 self.assertEqual(reader.read(3), 1) 2320 reader.byte_align() 2321 self.assertEqual(reader.read(3), 5) 2322 reader.byte_align() 2323 reader.byte_align() 2324 self.assertEqual(reader.read(8), 59) 2325 reader.byte_align() 2326 self.assertEqual(reader.read(4), 1) 2327 2328 reader.setpos(pos) 2329 self.assertEqual(reader.read(3), 1) 2330 reader.set_endianness(0) 2331 self.assertEqual(reader.read(3), 7) 2332 reader.set_endianness(1) 2333 self.assertEqual(reader.read(4), 11) 2334 reader.set_endianness(1) 2335 self.assertEqual(reader.read(4), 1) 2336 2337 reader.setpos(pos) 2338 pos2 = reader.getpos() 2339 self.assertEqual(reader.read(4), 0x1) 2340 reader.setpos(pos2) 2341 self.assertEqual(reader.read(8), 0xB1) 2342 reader.setpos(pos2) 2343 self.assertEqual(reader.read(12), 0xDB1) 2344 del(pos2) 2345 pos3 = reader.getpos() 2346 self.assertEqual(reader.read(4), 0xE) 2347 reader.setpos(pos3) 2348 self.assertEqual(reader.read(8), 0xBE) 2349 reader.setpos(pos3) 2350 self.assertEqual(reader.read(12), 0x3BE) 2351 del(pos3) 2352 2353 reader.setpos(pos) 2354 2355 def __test_try__(self, reader, table): 2356 start = reader.getpos() 2357 2358 # bounce to the very end of the stream 2359 reader.skip(31) 2360 pos = reader.getpos() 2361 self.assertEqual(reader.read(1), 1) 2362 reader.setpos(pos) 2363 2364 # then test all the read methods to ensure they trigger br_abort 2365 # in the case of unary/Huffman, the stream ends on a "1" bit 2366 # whether reading it big-endian or little-endian 2367 2368 self.assertRaises(IOError, reader.read, 2) 2369 reader.setpos(pos) 2370 self.assertRaises(IOError, reader.read_signed, 2) 2371 reader.setpos(pos) 2372 self.assertRaises(IOError, reader.skip, 2) 2373 reader.setpos(pos) 2374 self.assertRaises(IOError, reader.unary, 0) 2375 reader.setpos(pos) 2376 self.assertEqual(reader.unary(1), 0) 2377 self.assertRaises(IOError, reader.unary, 1) 2378 reader.setpos(pos) 2379 self.assertRaises(IOError, reader.read_huffman_code, table) 2380 reader.setpos(pos) 2381 self.assertRaises(IOError, reader.read_bytes, 2) 2382 reader.setpos(pos) 2383 self.assertRaises(IOError, reader.substream, 1) 2384 reader.setpos(pos) 2385 self.assertRaises(ValueError, reader.read, -1) 2386 reader.setpos(pos) 2387 self.assertRaises(ValueError, reader.read_signed, -1) 2388 reader.setpos(pos) 2389 self.assertRaises(ValueError, reader.read_signed, 0) 2390 reader.setpos(pos) 2391 self.assertRaises(ValueError, reader.skip, -1) 2392 reader.setpos(pos) 2393 self.assertRaises(ValueError, reader.read_bytes, -2) 2394 reader.setpos(pos) 2395 self.assertRaises(IOError, reader.skip_bytes, 2 ** 30) 2396 reader.setpos(pos) 2397 self.assertRaises(IOError, reader.skip_bytes, 2 ** 65) 2398 reader.setpos(pos) 2399 self.assertRaises(IOError, reader.read_bytes, 2 ** 30) 2400 reader.setpos(pos) 2401 self.assertRaises(IOError, reader.read_bytes, 2 ** 65) 2402 reader.setpos(pos) 2403 self.assertRaises(IOError, reader.substream, 2 ** 30) 2404 reader.setpos(pos) 2405 2406 reader.setpos(start) 2407 2408 def __test_callbacks_reader__(self, 2409 reader, 2410 unary_0_reads, 2411 unary_1_reads, 2412 table, 2413 huffman_code_count): 2414 counter = ByteCounter() 2415 start = reader.getpos() 2416 reader.add_callback(counter.callback) 2417 2418 # a single callback 2419 counter.reset() 2420 for i in range(8): 2421 reader.read(4) 2422 self.assertEqual(int(counter), 4) 2423 reader.setpos(start) 2424 2425 # calling callbacks directly 2426 counter.reset() 2427 for i in range(20): 2428 reader.call_callbacks(0) 2429 self.assertEqual(int(counter), 20) 2430 2431 # two callbacks 2432 counter.reset() 2433 reader.add_callback(counter.callback) 2434 for i in range(8): 2435 reader.read(4) 2436 self.assertEqual(int(counter), 8) 2437 reader.pop_callback() 2438 reader.setpos(start) 2439 2440 # temporarily suspending the callback 2441 counter.reset() 2442 reader.read(8) 2443 self.assertEqual(int(counter), 1) 2444 callback = reader.pop_callback() 2445 reader.read(8) 2446 reader.read(8) 2447 reader.add_callback(counter.callback) 2448 reader.read(8) 2449 self.assertEqual(int(counter), 2) 2450 reader.setpos(start) 2451 2452 # temporarily adding two callbacks 2453 counter.reset() 2454 reader.read(8) 2455 self.assertEqual(int(counter), 1) 2456 reader.add_callback(counter.callback) 2457 reader.read(8) 2458 reader.read(8) 2459 reader.pop_callback() 2460 reader.read(8) 2461 self.assertEqual(int(counter), 6) 2462 reader.setpos(start) 2463 2464 # read_signed 2465 counter.reset() 2466 for i in range(8): 2467 reader.read_signed(4) 2468 self.assertEqual(int(counter), 4) 2469 reader.setpos(start) 2470 2471 # skip 2472 counter.reset() 2473 for i in range(8): 2474 reader.skip(4) 2475 self.assertEqual(int(counter), 4) 2476 reader.setpos(start) 2477 2478 # read_unary 2479 counter.reset() 2480 for i in range(unary_0_reads): 2481 reader.unary(0) 2482 self.assertEqual(int(counter), 4) 2483 counter.reset() 2484 reader.setpos(start) 2485 for i in range(unary_1_reads): 2486 reader.unary(1) 2487 self.assertEqual(int(counter), 4) 2488 reader.setpos(start) 2489 2490 # read_huffman_code 2491 counter.reset() 2492 for i in range(huffman_code_count): 2493 reader.read_huffman_code(table) 2494 self.assertEqual(int(counter), 4) 2495 reader.setpos(start) 2496 2497 # read_bytes 2498 counter.reset() 2499 reader.read_bytes(2) 2500 reader.read_bytes(2) 2501 self.assertEqual(int(counter), 4) 2502 reader.setpos(start) 2503 2504 reader.pop_callback() 2505 del(start) 2506 2507 @LIB_BITSTREAM 2508 def test_init_error(self): 2509 from audiotools.bitstream import BitstreamReader 2510 from audiotools.bitstream import BitstreamRecorder 2511 from audiotools.bitstream import BitstreamWriter 2512 2513 self.assertRaises(TypeError, BitstreamRecorder) 2514 self.assertRaises(TypeError, BitstreamRecorder, None) 2515 self.assertRaises(TypeError, BitstreamWriter) 2516 self.assertRaises(TypeError, BitstreamReader) 2517 2518 @LIB_BITSTREAM 2519 def test_parse(self): 2520 from audiotools.bitstream import parse 2521 2522 # test basic big-endian string 2523 self.assertEqual(parse("2u3u5u3s19s", 2524 False, 2525 ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1])), 2526 [2, 6, 7, -3, -181311]) 2527 2528 # test all the defined format fields 2529 for (fields, values) in [("2u 3u 5u 3u 19u", 2530 [0x2, 0x6, 0x07, 0x5, 0x53BC1]), 2531 ("2s 3s 5s 3s 19s", 2532 [-2, -2, 7, -3, -181311]), 2533 ("2U 3U 5U 3U 19U", 2534 [0x2, 0x6, 0x07, 0x5, 0x53BC1]), 2535 ("2S 3S 5S 3S 19S", 2536 [-2, -2, 7, -3, -181311]), 2537 ("2u 3p 5u 3p 19u", 2538 [0x2, 0x07, 0x53BC1]), 2539 ("2p 1P 3u 19u", 2540 [0x5, 0x53BC1]), 2541 ("2b 2b", 2542 [b"\xB1\xED", b"\x3B\xC1"]), 2543 ("2u a 3u a 4u a 5u", 2544 [2, 7, 3, 24]), 2545 ("3* 2u", 2546 [2, 3, 0]), 2547 ("3* 2* 2u", 2548 [2, 3, 0, 1, 3, 2]), 2549 ("2u ? 3u", [2]), 2550 ("2u 10? 3u", [2]), 2551 ("2u 10* ? 3u", [2]), 2552 ("2u 10* 3? 3u", [2])]: 2553 self.assertEqual(parse(fields, 2554 False, 2555 ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1])), 2556 values) 2557 2558 # test several big-endian unsigned edge cases 2559 self.assertEqual( 2560 parse("32u 32u 32u 32u 64U 64U 64U 64U", 2561 False, 2562 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 2563 128, 0, 0, 0, 127, 255, 255, 255, 2564 0, 0, 0, 0, 0, 0, 0, 0, 2565 255, 255, 255, 255, 255, 255, 255, 255, 2566 128, 0, 0, 0, 0, 0, 0, 0, 2567 127, 255, 255, 255, 255, 255, 255, 255])), 2568 [0, 2569 4294967295, 2570 2147483648, 2571 2147483647, 2572 0, 2573 0xFFFFFFFFFFFFFFFF, 2574 9223372036854775808, 2575 9223372036854775807]) 2576 2577 # test several big-endian signed edge cases 2578 self.assertEqual( 2579 parse("32s 32s 32s 32s 64S 64S 64S 64S", 2580 False, 2581 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 2582 128, 0, 0, 0, 127, 255, 255, 255, 2583 0, 0, 0, 0, 0, 0, 0, 0, 2584 255, 255, 255, 255, 255, 255, 255, 255, 2585 128, 0, 0, 0, 0, 0, 0, 0, 2586 127, 255, 255, 255, 255, 255, 255, 255])), 2587 [0, 2588 -1, 2589 -2147483648, 2590 2147483647, 2591 0, 2592 -1, 2593 -9223372036854775808, 2594 9223372036854775807]) 2595 2596 # test big-endian read errors 2597 for s in ["3u", "3s", "3U", "3S", "3p", "3P", "3b"]: 2598 self.assertRaises(IOError, 2599 parse, 2600 "8u" + s, 2601 False, 2602 "a") 2603 2604 # test basic little-endian string 2605 self.assertEqual(parse("2u3u5u3s19s", 2606 True, 2607 ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1])), 2608 [1, 4, 13, 3, -128545]) 2609 2610 # test all the defined format fields 2611 for (fields, values) in [("2u 3u 5u 3u 19u", 2612 [0x1, 0x4, 0x0D, 0x3, 0x609DF]), 2613 ("2s 3s 5s 3s 19s", 2614 [1, -4, 13, 3, -128545]), 2615 ("2U 3U 5U 3U 19U", 2616 [0x1, 0x4, 0x0D, 0x3, 0x609DF]), 2617 ("2S 3S 5S 3S 19S", 2618 [1, -4, 13, 3, -128545]), 2619 ("2u 3p 5u 3p 19u", 2620 [0x1, 0x0D, 0x609DF]), 2621 ("2p 1P 3u 19u", 2622 [0x3, 0x609DF]), 2623 ("2b 2b", 2624 [b"\xB1\xED", b"\x3B\xC1"]), 2625 ("2u a 3u a 4u a 5u", 2626 [1, 5, 11, 1]), 2627 ("3* 2u", 2628 [1, 0, 3]), 2629 ("3* 2* 2u", 2630 [1, 0, 3, 2, 1, 3]), 2631 ("2u ? 3u", [1]), 2632 ("2u 10? 3u", [1]), 2633 ("2u 10* ? 3u", [1]), 2634 ("2u 10* 3? 3u", [1])]: 2635 self.assertEqual(parse(fields, 2636 True, 2637 ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1])), 2638 values) 2639 2640 # test several little-endian unsigned edge cases 2641 self.assertEqual( 2642 parse("32u 32u 32u 32u 64U 64U 64U 64U", 2643 True, 2644 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 2645 0, 0, 0, 128, 255, 255, 255, 127, 2646 0, 0, 0, 0, 0, 0, 0, 0, 2647 255, 255, 255, 255, 255, 255, 255, 255, 2648 0, 0, 0, 0, 0, 0, 0, 128, 2649 255, 255, 255, 255, 255, 255, 255, 127])), 2650 [0, 2651 4294967295, 2652 2147483648, 2653 2147483647, 2654 0, 2655 0xFFFFFFFFFFFFFFFF, 2656 9223372036854775808, 2657 9223372036854775807]) 2658 2659 # test several little-endian signed edge cases 2660 self.assertEqual( 2661 parse("32s 32s 32s 32s 64S 64S 64S 64S", 2662 True, 2663 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 2664 0, 0, 0, 128, 255, 255, 255, 127, 2665 0, 0, 0, 0, 0, 0, 0, 0, 2666 255, 255, 255, 255, 255, 255, 255, 255, 2667 0, 0, 0, 0, 0, 0, 0, 128, 2668 255, 255, 255, 255, 255, 255, 255, 127])), 2669 [0, 2670 -1, 2671 -2147483648, 2672 2147483647, 2673 0, 2674 -1, 2675 -9223372036854775808, 2676 9223372036854775807]) 2677 2678 # test little-endian read errors 2679 for s in ["3u", "3s", "3U", "3S", "3p", "3P", "3b"]: 2680 self.assertRaises(IOError, 2681 parse, 2682 "8u" + s, 2683 True, 2684 "a") 2685 2686 @LIB_BITSTREAM 2687 def test_build(self): 2688 from audiotools.bitstream import build 2689 2690 # test basic big-endian string 2691 self.assertEqual(build("2u3u5u3s19s", 2692 False, 2693 [2, 6, 7, -3, -181311]), 2694 ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1])) 2695 2696 # test several big-endian unsigned edge cases 2697 self.assertEqual( 2698 build("32u 32u 32u 32u 64U 64U 64U 64U", 2699 False, 2700 [0, 2701 4294967295, 2702 2147483648, 2703 2147483647, 2704 0, 2705 0xFFFFFFFFFFFFFFFF, 2706 9223372036854775808, 2707 9223372036854775807]), 2708 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 2709 128, 0, 0, 0, 127, 255, 255, 255, 2710 0, 0, 0, 0, 0, 0, 0, 0, 2711 255, 255, 255, 255, 255, 255, 255, 255, 2712 128, 0, 0, 0, 0, 0, 0, 0, 2713 127, 255, 255, 255, 255, 255, 255, 255])) 2714 2715 # test several big-endian signed edge cases 2716 self.assertEqual( 2717 build("32s 32s 32s 32s 64S 64S 64S 64S", 2718 False, 2719 [0, 2720 -1, 2721 -2147483648, 2722 2147483647, 2723 0, 2724 -1, 2725 -9223372036854775808, 2726 9223372036854775807]), 2727 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 2728 128, 0, 0, 0, 127, 255, 255, 255, 2729 0, 0, 0, 0, 0, 0, 0, 0, 2730 255, 255, 255, 255, 255, 255, 255, 255, 2731 128, 0, 0, 0, 0, 0, 0, 0, 2732 127, 255, 255, 255, 255, 255, 255, 255])) 2733 2734 # test big-endian write errors 2735 for l in [[2, 6, 7, -3], [2, 6, 7], [2, 6], [2], []]: 2736 self.assertRaises(IndexError, 2737 build, 2738 "2u3u5u3s19s", 2739 False, 2740 l) 2741 2742 # test basic little-endian string 2743 self.assertEqual(build("2u3u5u3s19s", 2744 True, 2745 [1, 4, 13, 3, -128545]), 2746 ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1])) 2747 2748 # test several little-endian unsigned edge cases 2749 self.assertEqual( 2750 build("32u 32u 32u 32u 64U 64U 64U 64U", 2751 True, 2752 [0, 2753 4294967295, 2754 2147483648, 2755 2147483647, 2756 0, 2757 0xFFFFFFFFFFFFFFFF, 2758 9223372036854775808, 2759 9223372036854775807]), 2760 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 2761 0, 0, 0, 128, 255, 255, 255, 127, 2762 0, 0, 0, 0, 0, 0, 0, 0, 2763 255, 255, 255, 255, 255, 255, 255, 255, 2764 0, 0, 0, 0, 0, 0, 0, 128, 2765 255, 255, 255, 255, 255, 255, 255, 127])) 2766 2767 # test several little-endian signed edge cases 2768 self.assertEqual( 2769 build("32s 32s 32s 32s 64S 64S 64S 64S", 2770 True, 2771 [0, 2772 -1, 2773 -2147483648, 2774 2147483647, 2775 0, 2776 -1, 2777 -9223372036854775808, 2778 9223372036854775807]), 2779 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 2780 0, 0, 0, 128, 255, 255, 255, 127, 2781 0, 0, 0, 0, 0, 0, 0, 0, 2782 255, 255, 255, 255, 255, 255, 255, 255, 2783 0, 0, 0, 0, 0, 0, 0, 128, 2784 255, 255, 255, 255, 255, 255, 255, 127])) 2785 2786 # test little-endian write errors 2787 for l in [[1, 4, 13, 3], [1, 4, 13], [1, 4], [1], []]: 2788 self.assertRaises(IndexError, 2789 build, 2790 "2u3u5u3s19s", 2791 True, 2792 l) 2793 2794 @LIB_BITSTREAM 2795 def test_build_parse_roundtrip(self): 2796 from audiotools.bitstream import build, parse 2797 2798 for (format_string, values) in [("1u a", [1]), 2799 ("2u a", [1]), 2800 ("3u a", [1]), 2801 ("4u a", [1]), 2802 ("5u a", [1]), 2803 ("6u a", [1]), 2804 ("7u a", [1]), 2805 ("8u", [1]), 2806 ("2s a", [-1]), 2807 ("3s a", [-1]), 2808 ("4s a", [-1]), 2809 ("5s a", [-1]), 2810 ("6s a", [-1]), 2811 ("7s a", [-1]), 2812 ("8s a", [-1]), 2813 ("64U", [0xFFFFFFFFFFFFFFFF]), 2814 ("64S", [-9223372036854775808]), 2815 ("10b", [b"\x00" * 10]), 2816 ("10p10b a", [b"\x01" * 10]), 2817 ("10P10b", [b"\x02" * 10])]: 2818 self.assertEqual(parse(format_string, False, 2819 build(format_string, False, values)), 2820 values) 2821 self.assertEqual(parse(format_string, True, 2822 build(format_string, True, values)), 2823 values) 2824 2825 @LIB_BITSTREAM 2826 def test_simple_reader(self): 2827 from audiotools.bitstream import BitstreamReader, HuffmanTree 2828 2829 data = b"\xB1\xED\x3B\xC1" 2830 2831 temp = tempfile.TemporaryFile() 2832 2833 temp.write(data) 2834 temp.flush() 2835 temp.seek(0, 0) 2836 2837 temp_s = BytesIO() 2838 temp_s.write(data) 2839 temp_s.seek(0, 0) 2840 2841 # test a big-endian stream 2842 for reader in [BitstreamReader(temp, False), 2843 BitstreamReader(temp_s, False), 2844 BitstreamReader(data, False)]: 2845 table_be = HuffmanTree([[1, 1], 0, 2846 [1, 0], 1, 2847 [0, 1], 2, 2848 [0, 0, 1], 3, 2849 [0, 0, 0], 4], 0) 2850 self.__test_big_endian_reader__(reader, table_be) 2851 self.__test_try__(reader, table_be) 2852 self.__test_callbacks_reader__(reader, 14, 18, table_be, 14) 2853 2854 temp.seek(0, 0) 2855 temp_s.seek(0, 0) 2856 2857 # test a little-endian stream 2858 for reader in [BitstreamReader(temp, True), 2859 BitstreamReader(temp_s, True), 2860 BitstreamReader(data, True)]: 2861 table_le = HuffmanTree([[1, 1], 0, 2862 [1, 0], 1, 2863 [0, 1], 2, 2864 [0, 0, 1], 3, 2865 [0, 0, 0], 4], 1) 2866 self.__test_little_endian_reader__(reader, table_le) 2867 self.__test_try__(reader, table_le) 2868 self.__test_callbacks_reader__(reader, 14, 18, table_le, 13) 2869 2870 # pad the stream with some additional data at both ends 2871 data = b"\xFF" + b"\xFF" + data + b"\xFF" + b"\xFF" 2872 2873 temp.seek(0, 0) 2874 temp.write(data) 2875 temp.flush() 2876 temp.seek(0, 0) 2877 2878 temp_s = BytesIO() 2879 temp_s.write(data) 2880 temp_s.seek(0, 0) 2881 2882 # check a big-endian substream 2883 for reader in [BitstreamReader(temp, False), 2884 BitstreamReader(temp_s, False), 2885 BitstreamReader(data, False)]: 2886 start = reader.getpos() 2887 2888 reader.skip(16) 2889 subreader = reader.substream(4) 2890 self.__test_big_endian_reader__(subreader, table_be) 2891 self.__test_try__(subreader, table_be) 2892 self.__test_callbacks_reader__(subreader, 14, 18, table_be, 13) 2893 2894 # check a big-endian substream built from another substream 2895 reader.setpos(start) 2896 reader.skip(8) 2897 subreader1 = reader.substream(6) 2898 subreader1.skip(8) 2899 subreader2 = subreader.substream(4) 2900 self.__test_big_endian_reader__(subreader2, table_be) 2901 self.__test_try__(subreader2, table_be) 2902 self.__test_callbacks_reader__(subreader2, 14, 18, table_be, 13) 2903 2904 temp.seek(0, 0) 2905 temp_s.seek(0, 0) 2906 2907 # check a little-endian substream built from a file 2908 for reader in [BitstreamReader(temp, True), 2909 BitstreamReader(temp_s, True), 2910 BitstreamReader(data, True)]: 2911 start = reader.getpos() 2912 2913 reader.skip(16) 2914 subreader = reader.substream(4) 2915 self.__test_little_endian_reader__(subreader, table_le) 2916 self.__test_try__(subreader, table_le) 2917 self.__test_callbacks_reader__(subreader, 14, 18, table_le, 13) 2918 2919 # check a little-endian substream built from another substream 2920 reader.setpos(start) 2921 reader.skip(8) 2922 subreader1 = reader.substream(6) 2923 subreader1.skip(8) 2924 subreader2 = subreader.substream(4) 2925 self.__test_little_endian_reader__(subreader2, table_le) 2926 self.__test_try__(subreader2, table_le) 2927 self.__test_callbacks_reader__(subreader2, 14, 18, table_le, 13) 2928 2929 temp.close() 2930 2931 # test the writer functions with each endianness 2932 self.__test_writer__(0) 2933 self.__test_writer__(1) 2934 2935 def __test_edge_reader_be__(self, reader): 2936 start = reader.getpos() 2937 2938 # try the unsigned 32 and 64 bit values 2939 reader.setpos(start) 2940 self.assertEqual(reader.read(32), 0) 2941 self.assertEqual(reader.read(32), 4294967295) 2942 self.assertEqual(reader.read(32), 2147483648) 2943 self.assertEqual(reader.read(32), 2147483647) 2944 self.assertEqual(reader.read(64), 0) 2945 self.assertEqual(reader.read(64), 0xFFFFFFFFFFFFFFFF) 2946 self.assertEqual(reader.read(64), 9223372036854775808) 2947 self.assertEqual(reader.read(64), 9223372036854775807) 2948 2949 # try the signed 32 and 64 bit values 2950 reader.setpos(start) 2951 self.assertEqual(reader.read_signed(32), 0) 2952 self.assertEqual(reader.read_signed(32), -1) 2953 self.assertEqual(reader.read_signed(32), -2147483648) 2954 self.assertEqual(reader.read_signed(32), 2147483647) 2955 self.assertEqual(reader.read_signed(64), 0) 2956 self.assertEqual(reader.read_signed(64), -1) 2957 self.assertEqual(reader.read_signed(64), -9223372036854775808) 2958 self.assertEqual(reader.read_signed(64), 9223372036854775807) 2959 2960 # try the unsigned values via parse() 2961 reader.setpos(start) 2962 (u_val_1, 2963 u_val_2, 2964 u_val_3, 2965 u_val_4, 2966 u_val64_1, 2967 u_val64_2, 2968 u_val64_3, 2969 u_val64_4) = reader.parse("32u 32u 32u 32u 64U 64U 64U 64U") 2970 self.assertEqual(u_val_1, 0) 2971 self.assertEqual(u_val_2, 4294967295) 2972 self.assertEqual(u_val_3, 2147483648) 2973 self.assertEqual(u_val_4, 2147483647) 2974 self.assertEqual(u_val64_1, 0) 2975 self.assertEqual(u_val64_2, 0xFFFFFFFFFFFFFFFF) 2976 self.assertEqual(u_val64_3, 9223372036854775808) 2977 self.assertEqual(u_val64_4, 9223372036854775807) 2978 2979 # try the signed values via parse() 2980 reader.setpos(start) 2981 (s_val_1, 2982 s_val_2, 2983 s_val_3, 2984 s_val_4, 2985 s_val64_1, 2986 s_val64_2, 2987 s_val64_3, 2988 s_val64_4) = reader.parse("32s 32s 32s 32s 64S 64S 64S 64S") 2989 self.assertEqual(s_val_1, 0) 2990 self.assertEqual(s_val_2, -1) 2991 self.assertEqual(s_val_3, -2147483648) 2992 self.assertEqual(s_val_4, 2147483647) 2993 self.assertEqual(s_val64_1, 0) 2994 self.assertEqual(s_val64_2, -1) 2995 self.assertEqual(s_val64_3, -9223372036854775808) 2996 self.assertEqual(s_val64_4, 9223372036854775807) 2997 2998 def __test_edge_reader_le__(self, reader): 2999 start = reader.getpos() 3000 3001 # try the unsigned 32 and 64 bit values 3002 self.assertEqual(reader.read(32), 0) 3003 self.assertEqual(reader.read(32), 4294967295) 3004 self.assertEqual(reader.read(32), 2147483648) 3005 self.assertEqual(reader.read(32), 2147483647) 3006 self.assertEqual(reader.read(64), 0) 3007 self.assertEqual(reader.read(64), 0xFFFFFFFFFFFFFFFF) 3008 self.assertEqual(reader.read(64), 9223372036854775808) 3009 self.assertEqual(reader.read(64), 9223372036854775807) 3010 3011 # try the signed 32 and 64 bit values 3012 reader.setpos(start) 3013 self.assertEqual(reader.read_signed(32), 0) 3014 self.assertEqual(reader.read_signed(32), -1) 3015 self.assertEqual(reader.read_signed(32), -2147483648) 3016 self.assertEqual(reader.read_signed(32), 2147483647) 3017 self.assertEqual(reader.read_signed(64), 0) 3018 self.assertEqual(reader.read_signed(64), -1) 3019 self.assertEqual(reader.read_signed(64), -9223372036854775808) 3020 self.assertEqual(reader.read_signed(64), 9223372036854775807) 3021 3022 # try the unsigned values via parse() 3023 reader.setpos(start) 3024 (u_val_1, 3025 u_val_2, 3026 u_val_3, 3027 u_val_4, 3028 u_val64_1, 3029 u_val64_2, 3030 u_val64_3, 3031 u_val64_4) = reader.parse("32u 32u 32u 32u 64U 64U 64U 64U") 3032 self.assertEqual(u_val_1, 0) 3033 self.assertEqual(u_val_2, 4294967295) 3034 self.assertEqual(u_val_3, 2147483648) 3035 self.assertEqual(u_val_4, 2147483647) 3036 self.assertEqual(u_val64_1, 0) 3037 self.assertEqual(u_val64_2, 0xFFFFFFFFFFFFFFFF) 3038 self.assertEqual(u_val64_3, 9223372036854775808) 3039 self.assertEqual(u_val64_4, 9223372036854775807) 3040 3041 # try the signed values via parse() 3042 reader.setpos(start) 3043 (s_val_1, 3044 s_val_2, 3045 s_val_3, 3046 s_val_4, 3047 s_val64_1, 3048 s_val64_2, 3049 s_val64_3, 3050 s_val64_4) = reader.parse("32s 32s 32s 32s 64S 64S 64S 64S") 3051 self.assertEqual(s_val_1, 0) 3052 self.assertEqual(s_val_2, -1) 3053 self.assertEqual(s_val_3, -2147483648) 3054 self.assertEqual(s_val_4, 2147483647) 3055 self.assertEqual(s_val64_1, 0) 3056 self.assertEqual(s_val64_2, -1) 3057 self.assertEqual(s_val64_3, -9223372036854775808) 3058 self.assertEqual(s_val64_4, 9223372036854775807) 3059 3060 def __test_edge_writer__(self, get_writer, validate_writer): 3061 # try the unsigned 32 and 64 bit values 3062 (writer, temp) = get_writer() 3063 writer.write(32, 0) 3064 writer.write(32, 4294967295) 3065 writer.write(32, 2147483648) 3066 writer.write(32, 2147483647) 3067 writer.write(64, 0) 3068 writer.write(64, 0xFFFFFFFFFFFFFFFF) 3069 writer.write(64, 9223372036854775808) 3070 writer.write(64, 9223372036854775807) 3071 validate_writer(writer, temp) 3072 3073 # try the signed 32 and 64 bit values 3074 (writer, temp) = get_writer() 3075 writer.write_signed(32, 0) 3076 writer.write_signed(32, -1) 3077 writer.write_signed(32, -2147483648) 3078 writer.write_signed(32, 2147483647) 3079 writer.write_signed(64, 0) 3080 writer.write_signed(64, -1) 3081 writer.write_signed(64, -9223372036854775808) 3082 writer.write_signed(64, 9223372036854775807) 3083 validate_writer(writer, temp) 3084 3085 # try the unsigned values via build() 3086 (writer, temp) = get_writer() 3087 u_val_1 = 0 3088 u_val_2 = 4294967295 3089 u_val_3 = 2147483648 3090 u_val_4 = 2147483647 3091 u_val64_1 = 0 3092 u_val64_2 = 0xFFFFFFFFFFFFFFFF 3093 u_val64_3 = 9223372036854775808 3094 u_val64_4 = 9223372036854775807 3095 writer.build("32u 32u 32u 32u 64u 64u 64u 64u", 3096 [u_val_1, u_val_2, u_val_3, u_val_4, 3097 u_val64_1, u_val64_2, u_val64_3, u_val64_4]) 3098 validate_writer(writer, temp) 3099 3100 # try the signed values via build() 3101 (writer, temp) = get_writer() 3102 s_val_1 = 0 3103 s_val_2 = -1 3104 s_val_3 = -2147483648 3105 s_val_4 = 2147483647 3106 s_val64_1 = 0 3107 s_val64_2 = -1 3108 s_val64_3 = -9223372036854775808 3109 s_val64_4 = 9223372036854775807 3110 writer.build("32s 32s 32s 32s 64s 64s 64s 64s", 3111 [s_val_1, s_val_2, s_val_3, s_val_4, 3112 s_val64_1, s_val64_2, s_val64_3, s_val64_4]) 3113 validate_writer(writer, temp) 3114 3115 def __get_edge_writer_be__(self): 3116 from audiotools.bitstream import BitstreamWriter 3117 3118 temp_file = tempfile.NamedTemporaryFile() 3119 return (BitstreamWriter(open(temp_file.name, "wb"), False), temp_file) 3120 3121 def __validate_edge_writer_be__(self, writer, temp_file): 3122 writer.close() 3123 3124 with open(temp_file.name, "rb") as f: 3125 self.assertEqual( 3126 f.read(), 3127 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 3128 128, 0, 0, 0, 127, 255, 255, 255, 3129 0, 0, 0, 0, 0, 0, 0, 0, 3130 255, 255, 255, 255, 255, 255, 255, 255, 3131 128, 0, 0, 0, 0, 0, 0, 0, 3132 127, 255, 255, 255, 255, 255, 255, 255])) 3133 3134 temp_file.close() 3135 3136 def __get_edge_recorder_be__(self): 3137 from audiotools.bitstream import BitstreamRecorder 3138 3139 return (BitstreamRecorder(0), tempfile.NamedTemporaryFile()) 3140 3141 def __validate_edge_recorder_be__(self, writer, temp_file): 3142 from audiotools.bitstream import BitstreamWriter 3143 3144 writer2 = BitstreamWriter(open(temp_file.name, "wb"), False) 3145 writer.copy(writer2) 3146 writer2.close() 3147 3148 with open(temp_file.name, "rb") as f: 3149 self.assertEqual( 3150 f.read(), 3151 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 3152 128, 0, 0, 0, 127, 255, 255, 255, 3153 0, 0, 0, 0, 0, 0, 0, 0, 3154 255, 255, 255, 255, 255, 255, 255, 255, 3155 128, 0, 0, 0, 0, 0, 0, 0, 3156 127, 255, 255, 255, 255, 255, 255, 255])) 3157 3158 temp_file.close() 3159 3160 def __validate_edge_accumulator_be__(self, writer, temp_file): 3161 self.assertEqual(writer.bits(), 48 * 8) 3162 3163 def __get_edge_writer_le__(self): 3164 from audiotools.bitstream import BitstreamWriter 3165 3166 temp_file = tempfile.NamedTemporaryFile() 3167 return (BitstreamWriter(open(temp_file.name, "wb"), True), temp_file) 3168 3169 def __validate_edge_writer_le__(self, writer, temp_file): 3170 writer.close() 3171 3172 with open(temp_file.name, "rb") as f: 3173 self.assertEqual( 3174 f.read(), 3175 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 3176 0, 0, 0, 128, 255, 255, 255, 127, 3177 0, 0, 0, 0, 0, 0, 0, 0, 3178 255, 255, 255, 255, 255, 255, 255, 255, 3179 0, 0, 0, 0, 0, 0, 0, 128, 3180 255, 255, 255, 255, 255, 255, 255, 127])) 3181 3182 temp_file.close() 3183 3184 def __get_edge_recorder_le__(self): 3185 from audiotools.bitstream import BitstreamRecorder 3186 3187 return (BitstreamRecorder(1), tempfile.NamedTemporaryFile()) 3188 3189 def __validate_edge_recorder_le__(self, writer, temp_file): 3190 from audiotools.bitstream import BitstreamWriter 3191 3192 writer2 = BitstreamWriter(open(temp_file.name, "wb"), True) 3193 writer.copy(writer2) 3194 writer2.close() 3195 3196 with open(temp_file.name, "rb") as f: 3197 self.assertEqual( 3198 f.read(), 3199 ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 3200 0, 0, 0, 128, 255, 255, 255, 127, 3201 0, 0, 0, 0, 0, 0, 0, 0, 3202 255, 255, 255, 255, 255, 255, 255, 255, 3203 0, 0, 0, 0, 0, 0, 0, 128, 3204 255, 255, 255, 255, 255, 255, 255, 127])) 3205 3206 temp_file.close() 3207 3208 def __validate_edge_accumulator_le__(self, writer, temp_file): 3209 self.assertEqual(writer.bits(), 48 * 8) 3210 3211 def __test_writer__(self, endianness): 3212 from audiotools.bitstream import BitstreamWriter 3213 from audiotools.bitstream import BitstreamRecorder 3214 3215 checks = [self.__writer_perform_write__, 3216 self.__writer_perform_write_signed__, 3217 self.__writer_perform_write_unary_0__, 3218 self.__writer_perform_write_unary_1__, 3219 self.__writer_perform_write_huffman__] 3220 3221 # perform file-based checks 3222 for check in checks: 3223 temp = tempfile.NamedTemporaryFile() 3224 try: 3225 writer = BitstreamWriter(open(temp.name, "wb"), endianness) 3226 check(writer, endianness) 3227 writer.close() 3228 self.__check_output_file__(temp) 3229 finally: 3230 temp.close() 3231 3232 data = BytesIO() 3233 writer = BitstreamWriter(data, endianness) 3234 check(writer, endianness) 3235 del(writer) 3236 self.assertEqual(data.getvalue(), b"\xB1\xED\x3B\xC1") 3237 3238 # perform recorder-based checks 3239 for check in checks: 3240 temp = tempfile.NamedTemporaryFile() 3241 try: 3242 writer = BitstreamWriter(open(temp.name, "wb"), endianness) 3243 recorder = BitstreamRecorder(endianness) 3244 check(recorder, endianness) 3245 recorder.copy(writer) 3246 writer.close() 3247 self.__check_output_file__(temp) 3248 self.assertEqual(recorder.bits(), 32) 3249 finally: 3250 temp.close() 3251 3252 # check swap records 3253 temp = tempfile.NamedTemporaryFile() 3254 try: 3255 writer = BitstreamWriter(open(temp.name, "wb"), endianness) 3256 recorder1 = BitstreamRecorder(endianness) 3257 recorder2 = BitstreamRecorder(endianness) 3258 recorder2.write(8, 0xB1) 3259 recorder2.write(8, 0xED) 3260 recorder1.write(8, 0x3B) 3261 recorder1.write(8, 0xC1) 3262 recorder1.swap(recorder2) 3263 recorder1.copy(writer) 3264 recorder2.copy(writer) 3265 writer.close() 3266 self.__check_output_file__(temp) 3267 finally: 3268 temp.close() 3269 3270 # check recorder reset 3271 temp = tempfile.NamedTemporaryFile() 3272 try: 3273 writer = BitstreamWriter(open(temp.name, "wb"), endianness) 3274 recorder = BitstreamRecorder(endianness) 3275 recorder.write(8, 0xAA) 3276 recorder.write(8, 0xBB) 3277 recorder.write(8, 0xCC) 3278 recorder.write(8, 0xDD) 3279 recorder.write(8, 0xEE) 3280 recorder.reset() 3281 recorder.write(8, 0xB1) 3282 recorder.write(8, 0xED) 3283 recorder.write(8, 0x3B) 3284 recorder.write(8, 0xC1) 3285 recorder.copy(writer) 3286 writer.close() 3287 self.__check_output_file__(temp) 3288 finally: 3289 temp.close() 3290 3291 # check endianness setting 3292 # FIXME 3293 3294 # check a file-based byte-align 3295 # FIXME 3296 3297 # check a recorder-based byte-align 3298 # FIXME 3299 3300 # check an accumulator-based byte-align 3301 # FIXME 3302 3303 # check a partial dump 3304 # FIXME 3305 3306 # check that recorder->recorder->file works 3307 for check in checks: 3308 temp = tempfile.NamedTemporaryFile() 3309 try: 3310 writer = BitstreamWriter(open(temp.name, "wb"), endianness) 3311 recorder1 = BitstreamRecorder(endianness) 3312 recorder2 = BitstreamRecorder(endianness) 3313 self.assertEqual(recorder1.bits(), 0) 3314 self.assertEqual(recorder2.bits(), 0) 3315 check(recorder2, endianness) 3316 self.assertEqual(recorder1.bits(), 0) 3317 self.assertEqual(recorder2.bits(), 32) 3318 recorder2.copy(recorder1) 3319 self.assertEqual(recorder1.bits(), 32) 3320 self.assertEqual(recorder2.bits(), 32) 3321 recorder1.copy(writer) 3322 writer.close() 3323 self.__check_output_file__(temp) 3324 finally: 3325 temp.close() 3326 3327 def __writer_perform_write__(self, writer, endianness): 3328 if endianness == 0: 3329 writer.write(2, 2) 3330 writer.write(3, 6) 3331 writer.write(5, 7) 3332 writer.write(3, 5) 3333 writer.write(19, 342977) 3334 else: 3335 writer.write(2, 1) 3336 writer.write(3, 4) 3337 writer.write(5, 13) 3338 writer.write(3, 3) 3339 writer.write(19, 395743) 3340 3341 self.assertRaises(ValueError, writer.write, -1, 0) 3342 3343 def __writer_perform_write_signed__(self, writer, endianness): 3344 if endianness == 0: 3345 writer.write_signed(2, -2) 3346 writer.write_signed(3, -2) 3347 writer.write_signed(5, 7) 3348 writer.write_signed(3, -3) 3349 writer.write_signed(19, -181311) 3350 else: 3351 writer.write_signed(2, 1) 3352 writer.write_signed(3, -4) 3353 writer.write_signed(5, 13) 3354 writer.write_signed(3, 3) 3355 writer.write_signed(19, -128545) 3356 3357 self.assertRaises(ValueError, writer.write_signed, -1, 0) 3358 3359 def __writer_perform_write_unary_0__(self, writer, endianness): 3360 if endianness == 0: 3361 writer.unary(0, 1) 3362 writer.unary(0, 2) 3363 writer.unary(0, 0) 3364 writer.unary(0, 0) 3365 writer.unary(0, 4) 3366 writer.unary(0, 2) 3367 writer.unary(0, 1) 3368 writer.unary(0, 0) 3369 writer.unary(0, 3) 3370 writer.unary(0, 4) 3371 writer.unary(0, 0) 3372 writer.unary(0, 0) 3373 writer.unary(0, 0) 3374 writer.unary(0, 0) 3375 writer.write(1, 1) 3376 else: 3377 writer.unary(0, 1) 3378 writer.unary(0, 0) 3379 writer.unary(0, 0) 3380 writer.unary(0, 2) 3381 writer.unary(0, 2) 3382 writer.unary(0, 2) 3383 writer.unary(0, 5) 3384 writer.unary(0, 3) 3385 writer.unary(0, 0) 3386 writer.unary(0, 1) 3387 writer.unary(0, 0) 3388 writer.unary(0, 0) 3389 writer.unary(0, 0) 3390 writer.unary(0, 0) 3391 writer.write(2, 3) 3392 3393 def __writer_perform_write_unary_1__(self, writer, endianness): 3394 if endianness == 0: 3395 writer.unary(1, 0) 3396 writer.unary(1, 1) 3397 writer.unary(1, 0) 3398 writer.unary(1, 3) 3399 writer.unary(1, 0) 3400 writer.unary(1, 0) 3401 writer.unary(1, 0) 3402 writer.unary(1, 1) 3403 writer.unary(1, 0) 3404 writer.unary(1, 1) 3405 writer.unary(1, 2) 3406 writer.unary(1, 0) 3407 writer.unary(1, 0) 3408 writer.unary(1, 1) 3409 writer.unary(1, 0) 3410 writer.unary(1, 0) 3411 writer.unary(1, 0) 3412 writer.unary(1, 5) 3413 else: 3414 writer.unary(1, 0) 3415 writer.unary(1, 3) 3416 writer.unary(1, 0) 3417 writer.unary(1, 1) 3418 writer.unary(1, 0) 3419 writer.unary(1, 1) 3420 writer.unary(1, 0) 3421 writer.unary(1, 1) 3422 writer.unary(1, 0) 3423 writer.unary(1, 0) 3424 writer.unary(1, 0) 3425 writer.unary(1, 0) 3426 writer.unary(1, 1) 3427 writer.unary(1, 0) 3428 writer.unary(1, 0) 3429 writer.unary(1, 2) 3430 writer.unary(1, 5) 3431 writer.unary(1, 0) 3432 3433 def __writer_perform_write_huffman__(self, writer, endianness): 3434 from audiotools.bitstream import HuffmanTree 3435 3436 table = HuffmanTree([[1, 1], 0, 3437 [1, 0], 1, 3438 [0, 1], 2, 3439 [0, 0, 1], 3, 3440 [0, 0, 0], 4], endianness) 3441 3442 if endianness == 0: 3443 writer.write_huffman_code(table, 1) 3444 writer.write_huffman_code(table, 0) 3445 writer.write_huffman_code(table, 4) 3446 writer.write_huffman_code(table, 0) 3447 writer.write_huffman_code(table, 0) 3448 writer.write_huffman_code(table, 2) 3449 writer.write_huffman_code(table, 1) 3450 writer.write_huffman_code(table, 1) 3451 writer.write_huffman_code(table, 2) 3452 writer.write_huffman_code(table, 0) 3453 writer.write_huffman_code(table, 2) 3454 writer.write_huffman_code(table, 0) 3455 writer.write_huffman_code(table, 1) 3456 writer.write_huffman_code(table, 4) 3457 writer.write_huffman_code(table, 2) 3458 else: 3459 writer.write_huffman_code(table, 1) 3460 writer.write_huffman_code(table, 3) 3461 writer.write_huffman_code(table, 1) 3462 writer.write_huffman_code(table, 0) 3463 writer.write_huffman_code(table, 2) 3464 writer.write_huffman_code(table, 1) 3465 writer.write_huffman_code(table, 0) 3466 writer.write_huffman_code(table, 0) 3467 writer.write_huffman_code(table, 1) 3468 writer.write_huffman_code(table, 0) 3469 writer.write_huffman_code(table, 1) 3470 writer.write_huffman_code(table, 2) 3471 writer.write_huffman_code(table, 4) 3472 writer.write_huffman_code(table, 3) 3473 writer.write(1, 1) 3474 3475 def __check_output_file__(self, temp_file): 3476 with open(temp_file.name, "rb") as f: 3477 self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1") 3478 3479 @LIB_BITSTREAM 3480 def test_read_errors(self): 3481 from audiotools.bitstream import BitstreamReader 3482 3483 for little_endian in [False, True]: 3484 for reader in [BitstreamReader(BytesIO(b"a" * 10), 3485 little_endian), 3486 BitstreamReader(BytesIO(b"a" * 10), 3487 little_endian).substream(5)]: 3488 # reading negative number of bits shouldn't work 3489 self.assertRaises(ValueError, 3490 reader.read, 3491 -1) 3492 3493 self.assertRaises(ValueError, 3494 reader.read_signed, 3495 -1) 3496 3497 # reading signed value in 0 bits shouldn't work 3498 self.assertRaises(ValueError, 3499 reader.read_signed, 3500 0) 3501 3502 self.assertRaises(ValueError, 3503 reader.parse, 3504 "0s") 3505 3506 # reading unary with non 0/1 bit shouldn't work 3507 self.assertRaises(ValueError, 3508 reader.unary, 3509 3) 3510 3511 self.assertRaises(ValueError, 3512 reader.unary, 3513 -1) 3514 3515 @LIB_BITSTREAM 3516 def test_write_errors(self): 3517 from audiotools.bitstream import BitstreamWriter 3518 from audiotools.bitstream import BitstreamRecorder 3519 3520 for little_endian in [False, True]: 3521 for writer in [BitstreamWriter(BytesIO(), 3522 little_endian), 3523 BitstreamRecorder(little_endian)]: 3524 # writing negative number of bits shouldn't work 3525 self.assertRaises(ValueError, 3526 writer.write, 3527 -1, 0) 3528 3529 self.assertRaises(ValueError, 3530 writer.write_signed, 3531 -1, 0) 3532 3533 # writing signed value in 0 bits shouldn't work 3534 self.assertRaises(ValueError, 3535 writer.write_signed, 3536 0, 0) 3537 3538 self.assertRaises(ValueError, 3539 writer.build, 3540 "0s", [0]) 3541 3542 # writing negative value as unsigned shouldn't work 3543 self.assertRaises(ValueError, 3544 writer.write, 3545 8, -1) 3546 3547 # write unsigned value that's too large shouldn't work 3548 self.assertRaises(ValueError, 3549 writer.write, 3550 8, 2 ** 8) 3551 3552 # nor should it work from the .build method 3553 self.assertRaises(ValueError, 3554 writer.build, 3555 "8u", [-1]) 3556 self.assertRaises(ValueError, 3557 writer.build, 3558 "8u", [2 ** 8]) 3559 3560 # writing negative value that's too small shouldn't work 3561 self.assertRaises(ValueError, 3562 writer.write_signed, 3563 8, -(2 ** 8)) 3564 3565 # writing signed value that's too large shouldn't work 3566 self.assertRaises(ValueError, 3567 writer.write_signed, 3568 8, 2 ** 8) 3569 3570 # nor should it work from the .build method 3571 self.assertRaises(ValueError, 3572 writer.build, 3573 "8s", [-(2 ** 8)]) 3574 self.assertRaises(ValueError, 3575 writer.build, 3576 "8s", [2 ** 8]) 3577 3578 # writing some value that's not a number shouldn't work 3579 self.assertRaises(TypeError, 3580 writer.write, 3581 8, "foo") 3582 3583 self.assertRaises(TypeError, 3584 writer.write_signed, 3585 8, "foo") 3586 3587 self.assertRaises(TypeError, 3588 writer.build, 3589 "8u", ["foo"]) 3590 3591 self.assertRaises(TypeError, 3592 writer.build, 3593 "8s", ["foo"]) 3594 3595 # writing unary with non 0/1 bit shouldn't work 3596 self.assertRaises(ValueError, 3597 writer.unary, 3598 3, 1) 3599 3600 self.assertRaises(ValueError, 3601 writer.unary, 3602 -1, 1) 3603 3604 @LIB_BITSTREAM 3605 def test_edge_cases(self): 3606 from audiotools.bitstream import BitstreamReader 3607 3608 with tempfile.NamedTemporaryFile() as temp: 3609 # write the temp file with a set of known big-endian data 3610 temp.write(ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 3611 128, 0, 0, 0, 127, 255, 255, 255, 3612 0, 0, 0, 0, 0, 0, 0, 0, 3613 255, 255, 255, 255, 255, 255, 255, 255, 3614 128, 0, 0, 0, 0, 0, 0, 0, 3615 127, 255, 255, 255, 255, 255, 255, 255])) 3616 temp.flush() 3617 3618 # ensure a big-endian reader reads the values correctly 3619 reader = BitstreamReader(open(temp.name, "rb"), False) 3620 self.__test_edge_reader_be__(reader) 3621 reader.close() 3622 3623 # ensure a big-endian sub-reader reads the values correctly 3624 reader = BitstreamReader(open(temp.name, "rb"), False) 3625 subreader = reader.substream(48) 3626 self.__test_edge_reader_be__(subreader) 3627 subreader.close() 3628 reader.close() 3629 3630 with tempfile.NamedTemporaryFile() as temp: 3631 # write the temp file with a collection of known little-endian data 3632 temp.write(ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255, 3633 0, 0, 0, 128, 255, 255, 255, 127, 3634 0, 0, 0, 0, 0, 0, 0, 0, 3635 255, 255, 255, 255, 255, 255, 255, 255, 3636 0, 0, 0, 0, 0, 0, 0, 128, 3637 255, 255, 255, 255, 255, 255, 255, 127])) 3638 temp.flush() 3639 3640 # ensure a little-endian reader reads the values correctly 3641 reader = BitstreamReader(open(temp.name, "rb"), True) 3642 self.__test_edge_reader_le__(reader) 3643 reader.close() 3644 3645 # ensure a little-endian sub-reader reads the values correctly 3646 reader = BitstreamReader(open(temp.name, "rb"), True) 3647 subreader = reader.substream(48) 3648 self.__test_edge_reader_be__(subreader) 3649 subreader.close() 3650 reader.close() 3651 3652 # test a bunch of big-endian values via the bitstream writer 3653 self.__test_edge_writer__(self.__get_edge_writer_be__, 3654 self.__validate_edge_writer_be__) 3655 3656 # test a bunch of big-endian values via the bitstream recorder 3657 self.__test_edge_writer__(self.__get_edge_recorder_be__, 3658 self.__validate_edge_recorder_be__) 3659 3660 # test a bunch of little-endian values via the bitstream writer 3661 self.__test_edge_writer__(self.__get_edge_writer_le__, 3662 self.__validate_edge_writer_le__) 3663 3664 # test a bunch of little-endian values via the bitstream recorder 3665 self.__test_edge_writer__(self.__get_edge_recorder_le__, 3666 self.__validate_edge_recorder_le__) 3667 3668 @LIB_BITSTREAM 3669 def test_huge_values(self): 3670 from audiotools.bitstream import BitstreamReader 3671 from audiotools.bitstream import BitstreamWriter 3672 from audiotools.bitstream import BitstreamRecorder 3673 3674 for b in [2, 4, 8, 16, 32, 64, 128, 256, 512]: 3675 data = os.urandom(b) 3676 bits = b * 8 3677 for little_endian in [False, True]: 3678 unsigned1 = BitstreamReader( 3679 BytesIO(data), 3680 little_endian).read(bits) 3681 3682 unsigned2 = BitstreamReader( 3683 BytesIO(data), 3684 little_endian).parse("%du" % (bits))[0] 3685 3686 signed1 = BitstreamReader( 3687 BytesIO(data), 3688 little_endian).read_signed(bits) 3689 3690 signed2 = BitstreamReader( 3691 BytesIO(data), 3692 little_endian).parse("%ds" % (bits))[0] 3693 3694 # check that reading from .read and .parse 3695 # yield the same values 3696 self.assertEqual(unsigned1, unsigned2) 3697 self.assertEqual(signed1, signed2) 3698 3699 # check that writing round-trips properly 3700 unsigned_data1 = BytesIO() 3701 unsigned_data2 = BytesIO() 3702 signed_data1 = BytesIO() 3703 signed_data2 = BytesIO() 3704 3705 BitstreamWriter(unsigned_data1, 3706 little_endian).write(bits, unsigned1) 3707 BitstreamWriter(unsigned_data2, 3708 little_endian).build("%du" % (bits), 3709 [unsigned1]) 3710 BitstreamWriter(signed_data1, 3711 little_endian).write_signed(bits, signed1) 3712 BitstreamWriter(signed_data2, 3713 little_endian).build("%ds" % (bits), 3714 [signed1]) 3715 3716 self.assertEqual(data, unsigned_data1.getvalue()) 3717 self.assertEqual(data, unsigned_data2.getvalue()) 3718 self.assertEqual(data, signed_data1.getvalue()) 3719 self.assertEqual(data, signed_data2.getvalue()) 3720 3721 unsigned_data1 = BitstreamRecorder(little_endian) 3722 unsigned_data2 = BitstreamRecorder(little_endian) 3723 signed_data1 = BitstreamRecorder(little_endian) 3724 signed_data2 = BitstreamRecorder(little_endian) 3725 3726 unsigned_data1.write(bits, unsigned1) 3727 unsigned_data2.build("%du" % (bits), [unsigned1]) 3728 signed_data1.write_signed(bits, signed1) 3729 signed_data2.build("%ds" % (bits), [signed1]) 3730 3731 self.assertEqual(data, unsigned_data1.data()) 3732 self.assertEqual(data, unsigned_data2.data()) 3733 self.assertEqual(data, signed_data1.data()) 3734 self.assertEqual(data, signed_data2.data()) 3735 3736 # check that endianness swapping works 3737 r = BitstreamReader( 3738 BytesIO(data), 3739 little_endian) 3740 3741 unsigned1 = r.read(bits // 2) 3742 r.set_endianness(not little_endian) 3743 unsigned2 = r.read(bits // 2) 3744 3745 new_data = BytesIO() 3746 w = BitstreamWriter( 3747 new_data, little_endian) 3748 w.write(bits // 2, unsigned1) 3749 w.set_endianness(not little_endian) 3750 w.write(bits // 2, unsigned2) 3751 w.flush() 3752 3753 self.assertEqual(data, new_data.getvalue()) 3754 3755 w = BitstreamRecorder(little_endian) 3756 w.write(bits // 2, unsigned1) 3757 w.set_endianness(not little_endian) 3758 w.write(bits // 2, unsigned2) 3759 3760 self.assertEqual(data, w.data()) 3761 3762 @LIB_BITSTREAM 3763 def test_python_reader(self): 3764 from audiotools.bitstream import BitstreamReader 3765 3766 # Vanilla, file-based BitstreamReader uses a 1 character buffer 3767 # and relies on stdio to perform buffering which is fast enough. 3768 # Therefore, a byte-aligned file can be seek()ed at will. 3769 # However, making lots of read(1) calls on a Python object 3770 # is unacceptably slow. 3771 # Therefore, we read a 4KB string and pull individual bytes from 3772 # it as needed, which should keep performance reasonable. 3773 def new_temp1(): 3774 temp = BytesIO() 3775 temp.write(b"\xB1") 3776 temp.write(b"\xED") 3777 temp.write(b"\x3B") 3778 temp.write(b"\xC1") 3779 temp.seek(0, 0) 3780 return temp 3781 3782 def new_temp2(): 3783 return __SimpleChunkReader__([b"\xB1" + 3784 b"\xED" + 3785 b"\x3B" + 3786 b"\xC1"]) 3787 3788 def new_temp3(): 3789 return __SimpleChunkReader__([b"\xB1" + 3790 b"\xED", 3791 b"\x3B" + 3792 b"\xC1"]) 3793 3794 def new_temp4(): 3795 return __SimpleChunkReader__([b"\xB1", 3796 b"\xED", 3797 b"\x3B" + 3798 b"\xC1"]) 3799 3800 def new_temp5(): 3801 return __SimpleChunkReader__([b"\xB1", 3802 b"\xED", 3803 b"\x3B", 3804 b"\xC1"]) 3805 3806 for new_temp in [new_temp1, new_temp2, new_temp3, new_temp4, 3807 new_temp5]: 3808 # first, check the bitstream reader 3809 # against some simple known big-endian values 3810 bitstream = BitstreamReader(new_temp(), False) 3811 3812 self.assertEqual(bitstream.read(2), 2) 3813 self.assertEqual(bitstream.read(3), 6) 3814 self.assertEqual(bitstream.read(5), 7) 3815 self.assertEqual(bitstream.read(3), 5) 3816 self.assertEqual(bitstream.read(19), 342977) 3817 3818 bitstream = BitstreamReader(new_temp(), False) 3819 self.assertEqual(bitstream.read_signed(2), -2) 3820 self.assertEqual(bitstream.read_signed(3), -2) 3821 self.assertEqual(bitstream.read_signed(5), 7) 3822 self.assertEqual(bitstream.read_signed(3), -3) 3823 self.assertEqual(bitstream.read_signed(19), -181311) 3824 3825 bitstream = BitstreamReader(new_temp(), False) 3826 self.assertEqual(bitstream.unary(0), 1) 3827 self.assertEqual(bitstream.unary(0), 2) 3828 self.assertEqual(bitstream.unary(0), 0) 3829 self.assertEqual(bitstream.unary(0), 0) 3830 self.assertEqual(bitstream.unary(0), 4) 3831 bitstream.byte_align() 3832 bitstream = BitstreamReader(new_temp(), False) 3833 self.assertEqual(bitstream.unary(1), 0) 3834 self.assertEqual(bitstream.unary(1), 1) 3835 self.assertEqual(bitstream.unary(1), 0) 3836 self.assertEqual(bitstream.unary(1), 3) 3837 self.assertEqual(bitstream.unary(1), 0) 3838 bitstream.byte_align() 3839 3840 bitstream = BitstreamReader(new_temp(), False) 3841 pos = bitstream.getpos() 3842 self.assertTrue(bitstream.byte_aligned()) 3843 for i in range(32): 3844 bit = bitstream.read(1) 3845 bitstream.unread(bit) 3846 re_read = bitstream.read(1) 3847 self.assertEqual(bit, re_read) 3848 self.assertTrue(bitstream.byte_aligned()) 3849 3850 bitstream.setpos(pos) 3851 bitstream.unread(bitstream.read(1)) 3852 self.assertTrue(bitstream.byte_aligned()) 3853 self.assertEqual(bitstream.read_bytes(4), b"\xB1\xED\x3B\xC1") 3854 self.assertTrue(bitstream.byte_aligned()) 3855 3856 bitstream = BitstreamReader(new_temp(), False) 3857 pos = bitstream.getpos() 3858 self.assertEqual(bitstream.read(4), 0xB) 3859 bitstream.setpos(pos) 3860 self.assertEqual(bitstream.read(8), 0xB1) 3861 bitstream.setpos(pos) 3862 self.assertEqual(bitstream.read(12), 0xB1E) 3863 pos = bitstream.getpos() 3864 self.assertEqual(bitstream.read(4), 0xD) 3865 bitstream.setpos(pos) 3866 self.assertEqual(bitstream.read(8), 0xD3) 3867 bitstream.setpos(pos) 3868 self.assertEqual(bitstream.read(12), 0xD3B) 3869 3870 del(bitstream) 3871 bitstream = BitstreamReader(new_temp(), False) 3872 3873 # then, check the bitstream reader 3874 # against some simple known little-endian values 3875 bitstream = BitstreamReader(new_temp(), True) 3876 3877 self.assertEqual(bitstream.read(2), 1) 3878 self.assertEqual(bitstream.read(3), 4) 3879 self.assertEqual(bitstream.read(5), 13) 3880 self.assertEqual(bitstream.read(3), 3) 3881 self.assertEqual(bitstream.read(19), 395743) 3882 3883 bitstream = BitstreamReader(new_temp(), True) 3884 self.assertEqual(bitstream.read_signed(2), 1) 3885 self.assertEqual(bitstream.read_signed(3), -4) 3886 self.assertEqual(bitstream.read_signed(5), 13) 3887 self.assertEqual(bitstream.read_signed(3), 3) 3888 self.assertEqual(bitstream.read_signed(19), -128545) 3889 3890 bitstream = BitstreamReader(new_temp(), True) 3891 self.assertEqual(bitstream.unary(0), 1) 3892 self.assertEqual(bitstream.unary(0), 0) 3893 self.assertEqual(bitstream.unary(0), 0) 3894 self.assertEqual(bitstream.unary(0), 2) 3895 self.assertEqual(bitstream.unary(0), 2) 3896 bitstream.byte_align() 3897 bitstream = BitstreamReader(new_temp(), True) 3898 self.assertEqual(bitstream.unary(1), 0) 3899 self.assertEqual(bitstream.unary(1), 3) 3900 self.assertEqual(bitstream.unary(1), 0) 3901 self.assertEqual(bitstream.unary(1), 1) 3902 self.assertEqual(bitstream.unary(1), 0) 3903 bitstream.byte_align() 3904 3905 bitstream = BitstreamReader(new_temp(), True) 3906 pos = bitstream.getpos() 3907 self.assertTrue(bitstream.byte_aligned()) 3908 for i in range(32): 3909 bit = bitstream.read(1) 3910 bitstream.unread(bit) 3911 re_read = bitstream.read(1) 3912 self.assertEqual(bit, re_read) 3913 self.assertTrue(bitstream.byte_aligned()) 3914 3915 bitstream.setpos(pos) 3916 bitstream.unread(bitstream.read(1)) 3917 self.assertTrue(bitstream.byte_aligned()) 3918 self.assertEqual(bitstream.read_bytes(4), b"\xB1\xED\x3B\xC1") 3919 self.assertTrue(bitstream.byte_aligned()) 3920 3921 bitstream = BitstreamReader(new_temp(), True) 3922 pos = bitstream.getpos() 3923 self.assertEqual(bitstream.read(4), 0x1) 3924 bitstream.setpos(pos) 3925 self.assertEqual(bitstream.read(8), 0xB1) 3926 bitstream.setpos(pos) 3927 self.assertEqual(bitstream.read(12), 0xDB1) 3928 pos = bitstream.getpos() 3929 self.assertEqual(bitstream.read(4), 0xE) 3930 bitstream.setpos(pos) 3931 self.assertEqual(bitstream.read(8), 0xBE) 3932 bitstream.setpos(pos) 3933 self.assertEqual(bitstream.read(12), 0x3BE) 3934 3935 @LIB_BITSTREAM 3936 def test_simple_writer(self): 3937 from audiotools.bitstream import BitstreamWriter 3938 3939 with tempfile.NamedTemporaryFile() as temp: 3940 # first, have the bitstream writer generate 3941 # a set of known big-endian values 3942 3943 f = open(temp.name, "wb") 3944 bitstream = BitstreamWriter(f, False) 3945 bitstream.write(2, 2) 3946 bitstream.write(3, 6) 3947 bitstream.write(5, 7) 3948 bitstream.write(3, 5) 3949 bitstream.write(19, 342977) 3950 bitstream.flush() 3951 bitstream.close() 3952 with open(temp.name, "rb") as f: 3953 self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1") 3954 3955 f = open(temp.name, "wb") 3956 bitstream = BitstreamWriter(f, False) 3957 bitstream.write_signed(2, -2) 3958 bitstream.write_signed(3, -2) 3959 bitstream.write_signed(5, 7) 3960 bitstream.write_signed(3, -3) 3961 bitstream.write_signed(19, -181311) 3962 bitstream.flush() 3963 bitstream.close() 3964 with open(temp.name, "rb") as f: 3965 self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1") 3966 3967 f = open(temp.name, "wb") 3968 bitstream = BitstreamWriter(f, False) 3969 bitstream.unary(0, 1) 3970 bitstream.unary(0, 2) 3971 bitstream.unary(0, 0) 3972 bitstream.unary(0, 0) 3973 bitstream.unary(0, 4) 3974 bitstream.unary(0, 2) 3975 bitstream.unary(0, 1) 3976 bitstream.unary(0, 0) 3977 bitstream.unary(0, 3) 3978 bitstream.unary(0, 4) 3979 bitstream.unary(0, 0) 3980 bitstream.unary(0, 0) 3981 bitstream.unary(0, 0) 3982 bitstream.unary(0, 0) 3983 bitstream.write(1, 1) 3984 bitstream.flush() 3985 bitstream.close() 3986 with open(temp.name, "rb") as f: 3987 self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1") 3988 3989 f = open(temp.name, "wb") 3990 bitstream = BitstreamWriter(f, False) 3991 bitstream.unary(1, 0) 3992 bitstream.unary(1, 1) 3993 bitstream.unary(1, 0) 3994 bitstream.unary(1, 3) 3995 bitstream.unary(1, 0) 3996 bitstream.unary(1, 0) 3997 bitstream.unary(1, 0) 3998 bitstream.unary(1, 1) 3999 bitstream.unary(1, 0) 4000 bitstream.unary(1, 1) 4001 bitstream.unary(1, 2) 4002 bitstream.unary(1, 0) 4003 bitstream.unary(1, 0) 4004 bitstream.unary(1, 1) 4005 bitstream.unary(1, 0) 4006 bitstream.unary(1, 0) 4007 bitstream.unary(1, 0) 4008 bitstream.unary(1, 5) 4009 bitstream.flush() 4010 bitstream.close() 4011 with open(temp.name, "rb") as f: 4012 self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1") 4013 4014 # then, have the bitstream writer generate 4015 # a set of known little-endian values 4016 f = open(temp.name, "wb") 4017 bitstream = BitstreamWriter(f, True) 4018 bitstream.write(2, 1) 4019 bitstream.write(3, 4) 4020 bitstream.write(5, 13) 4021 bitstream.write(3, 3) 4022 bitstream.write(19, 395743) 4023 bitstream.flush() 4024 bitstream.close() 4025 with open(temp.name, "rb") as f: 4026 self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1") 4027 4028 f = open(temp.name, "wb") 4029 bitstream = BitstreamWriter(f, True) 4030 bitstream.write_signed(2, 1) 4031 bitstream.write_signed(3, -4) 4032 bitstream.write_signed(5, 13) 4033 bitstream.write_signed(3, 3) 4034 bitstream.write_signed(19, -128545) 4035 bitstream.flush() 4036 bitstream.close() 4037 with open(temp.name, "rb") as f: 4038 self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1") 4039 4040 f = open(temp.name, "wb") 4041 bitstream = BitstreamWriter(f, True) 4042 bitstream.unary(0, 1) 4043 bitstream.unary(0, 0) 4044 bitstream.unary(0, 0) 4045 bitstream.unary(0, 2) 4046 bitstream.unary(0, 2) 4047 bitstream.unary(0, 2) 4048 bitstream.unary(0, 5) 4049 bitstream.unary(0, 3) 4050 bitstream.unary(0, 0) 4051 bitstream.unary(0, 1) 4052 bitstream.unary(0, 0) 4053 bitstream.unary(0, 0) 4054 bitstream.unary(0, 0) 4055 bitstream.unary(0, 0) 4056 bitstream.write(2, 3) 4057 bitstream.flush() 4058 bitstream.close() 4059 with open(temp.name, "rb") as f: 4060 self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1") 4061 4062 f = open(temp.name, "wb") 4063 bitstream = BitstreamWriter(f, True) 4064 bitstream.unary(1, 0) 4065 bitstream.unary(1, 3) 4066 bitstream.unary(1, 0) 4067 bitstream.unary(1, 1) 4068 bitstream.unary(1, 0) 4069 bitstream.unary(1, 1) 4070 bitstream.unary(1, 0) 4071 bitstream.unary(1, 1) 4072 bitstream.unary(1, 0) 4073 bitstream.unary(1, 0) 4074 bitstream.unary(1, 0) 4075 bitstream.unary(1, 0) 4076 bitstream.unary(1, 1) 4077 bitstream.unary(1, 0) 4078 bitstream.unary(1, 0) 4079 bitstream.unary(1, 2) 4080 bitstream.unary(1, 5) 4081 bitstream.unary(1, 0) 4082 bitstream.flush() 4083 bitstream.close() 4084 with open(temp.name, "rb") as f: 4085 self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1") 4086 4087 f = open(temp.name, "wb") 4088 bitstream = BitstreamWriter(f, True) 4089 bitstream.write(4, 0x1) 4090 bitstream.byte_align() 4091 bitstream.write(4, 0xD) 4092 bitstream.byte_align() 4093 bitstream.flush() 4094 bitstream.close() 4095 with open(temp.name, "rb") as f: 4096 self.assertEqual(f.read(), b"\x01\x0D") 4097 4098 # and have the bitstream reader check those values are accurate 4099 4100 @LIB_BITSTREAM 4101 def test_reader_close(self): 4102 from audiotools.bitstream import BitstreamReader, HuffmanTree 4103 4104 def test_reader(reader): 4105 self.assertRaises(IOError, reader.read, 1) 4106 self.assertRaises(IOError, reader.skip, 3) 4107 self.assertRaises(IOError, reader.skip_bytes, 1) 4108 self.assertRaises(IOError, reader.read_signed, 2) 4109 self.assertRaises(IOError, reader.unary, 1) 4110 self.assertRaises(IOError, reader.read_bytes, 1) 4111 self.assertRaises(IOError, reader.parse, "1b2b3b") 4112 self.assertRaises(IOError, reader.substream, 2) 4113 self.assertRaises(IOError, reader.read_huffman_code, 4114 HuffmanTree([(1, ), 1, 4115 (0, 1), 2, 4116 (0, 0, 1), 3, 4117 (0, 0, 0), 4], False)) 4118 4119 def new_temp(): 4120 temp = BytesIO() 4121 temp.write(b"\xB1") 4122 temp.write(b"\xED") 4123 temp.write(b"\x3B") 4124 temp.write(b"\xC1") 4125 temp.seek(0, 0) 4126 return temp 4127 4128 # test a BitstreamReader from a Python file object 4129 f = open("test_core.py", "rb") 4130 4131 reader = BitstreamReader(f, False) 4132 reader.close() 4133 test_reader(reader) 4134 reader.set_endianness(1) 4135 test_reader(reader) 4136 4137 reader = BitstreamReader(f, True) 4138 reader.close() 4139 test_reader(reader) 4140 reader.set_endianness(0) 4141 test_reader(reader) 4142 4143 f.close() 4144 del(f) 4145 4146 # test a BitstreamReader from a Python BytesIO object 4147 reader = BitstreamReader(new_temp(), False) 4148 reader.close() 4149 test_reader(reader) 4150 reader.set_endianness(1) 4151 test_reader(reader) 4152 4153 reader = BitstreamReader(new_temp(), True) 4154 reader.close() 4155 test_reader(reader) 4156 reader.set_endianness(0) 4157 test_reader(reader) 4158 4159 @LIB_BITSTREAM 4160 def test_reader_context(self): 4161 from io import BytesIO 4162 from audiotools.bitstream import BitstreamReader 4163 4164 b = BytesIO(b"\xB1\xED\x3B\xC1") 4165 self.assertFalse(b.closed) 4166 r = BitstreamReader(b, False) 4167 self.assertEqual(r.read(2), 0x2) 4168 self.assertEqual(r.read(3), 0x6) 4169 self.assertEqual(r.read(5), 0x07) 4170 self.assertEqual(r.read(3), 0x5) 4171 self.assertEqual(r.read(19), 0x53BC1) 4172 self.assertRaises(IOError, r.read, 8) 4173 self.assertFalse(b.closed) 4174 del(r) 4175 self.assertFalse(b.closed) 4176 4177 b = BytesIO(b"\xB1\xED\x3B\xC1") 4178 self.assertFalse(b.closed) 4179 r = BitstreamReader(b, True) 4180 self.assertEqual(r.read(2), 0x1) 4181 self.assertEqual(r.read(3), 0x4) 4182 self.assertEqual(r.read(5), 0x0D) 4183 self.assertEqual(r.read(3), 0x3) 4184 self.assertEqual(r.read(19), 0x609DF) 4185 self.assertRaises(IOError, r.read, 8) 4186 self.assertFalse(b.closed) 4187 del(r) 4188 self.assertFalse(b.closed) 4189 4190 b = BytesIO(b"\xB1\xED\x3B\xC1") 4191 self.assertFalse(b.closed) 4192 with BitstreamReader(b, False) as r: 4193 self.assertEqual(r.read(2), 0x2) 4194 self.assertEqual(r.read(3), 0x6) 4195 self.assertEqual(r.read(5), 0x07) 4196 self.assertEqual(r.read(3), 0x5) 4197 self.assertEqual(r.read(19), 0x53BC1) 4198 self.assertRaises(IOError, r.read, 8) 4199 self.assertFalse(b.closed) 4200 self.assertTrue(b.closed) 4201 4202 b = BytesIO(b"\xB1\xED\x3B\xC1") 4203 self.assertFalse(b.closed) 4204 with BitstreamReader(b, True) as r: 4205 self.assertEqual(r.read(2), 0x1) 4206 self.assertEqual(r.read(3), 0x4) 4207 self.assertEqual(r.read(5), 0x0D) 4208 self.assertEqual(r.read(3), 0x3) 4209 self.assertEqual(r.read(19), 0x609DF) 4210 self.assertRaises(IOError, r.read, 8) 4211 self.assertFalse(b.closed) 4212 self.assertTrue(b.closed) 4213 4214 @LIB_BITSTREAM 4215 def test_writer_close(self): 4216 from audiotools.bitstream import BitstreamWriter 4217 from audiotools.bitstream import BitstreamRecorder 4218 4219 def test_writer(writer): 4220 self.assertRaises(IOError, writer.write, 1, 1) 4221 self.assertRaises(IOError, writer.write_signed, 2, 1) 4222 self.assertRaises(IOError, writer.unary, 1, 1) 4223 self.assertRaises(IOError, writer.write_bytes, b"foo") 4224 self.assertRaises(IOError, writer.build, "1u2u3u", [0, 1, 2]) 4225 4226 # test a BitstreamWriter to a Python file object 4227 f = open("test.bin", "wb") 4228 try: 4229 writer = BitstreamWriter(f, False) 4230 writer.close() 4231 test_writer(writer) 4232 writer.set_endianness(1) 4233 test_writer(writer) 4234 f.close() 4235 del(f) 4236 finally: 4237 os.unlink("test.bin") 4238 4239 f = open("test.bin", "wb") 4240 try: 4241 writer = BitstreamWriter(f, True) 4242 writer.close() 4243 test_writer(writer) 4244 writer.set_endianness(0) 4245 test_writer(writer) 4246 f.close() 4247 del(f) 4248 finally: 4249 os.unlink("test.bin") 4250 4251 # test a BitstreamWriter to a Python BytesIO object 4252 s = BytesIO() 4253 writer = BitstreamWriter(s, False) 4254 writer.close() 4255 test_writer(writer) 4256 writer.set_endianness(1) 4257 test_writer(writer) 4258 del(writer) 4259 del(s) 4260 4261 s = BytesIO() 4262 writer = BitstreamWriter(s, True) 4263 writer.close() 4264 test_writer(writer) 4265 writer.set_endianness(0) 4266 test_writer(writer) 4267 del(writer) 4268 del(s) 4269 4270 # test a BitstreamRecorder 4271 writer = BitstreamRecorder(0) 4272 writer.close() 4273 test_writer(writer) 4274 writer.set_endianness(1) 4275 test_writer(writer) 4276 del(writer) 4277 4278 writer = BitstreamRecorder(1) 4279 writer.close() 4280 test_writer(writer) 4281 writer.set_endianness(0) 4282 test_writer(writer) 4283 del(writer) 4284 4285 @LIB_BITSTREAM 4286 def test_writer_context(self): 4287 from io import BytesIO 4288 from audiotools.bitstream import BitstreamWriter 4289 from audiotools.bitstream import BitstreamRecorder 4290 4291 # if simply deallocated, writers should flush contents 4292 # but not close internal stream 4293 b = BytesIO() 4294 self.assertFalse(b.closed) 4295 w = BitstreamWriter(b, False) 4296 w.write(2, 0x2) 4297 w.write(3, 0x6) 4298 w.write(5, 0x07) 4299 w.write(3, 0x5) 4300 w.write(19, 0x53BC1) 4301 del(w) 4302 self.assertFalse(b.closed) 4303 self.assertEqual(b.getvalue(), b"\xB1\xED\x3B\xC1") 4304 4305 b = BytesIO() 4306 self.assertFalse(b.closed) 4307 w = BitstreamWriter(b, True) 4308 w.write(2, 0x1) 4309 w.write(3, 0x4) 4310 w.write(5, 0x0D) 4311 w.write(3, 0x3) 4312 w.write(19, 0x609DF) 4313 del(w) 4314 self.assertFalse(b.closed) 4315 self.assertEqual(b.getvalue(), b"\xB1\xED\x3B\xC1") 4316 4317 # if put into a context manager, writers should flush contents 4318 # but also close internal stream 4319 b = open("test.bin", "wb") 4320 try: 4321 self.assertFalse(b.closed) 4322 with BitstreamWriter(b, False) as w: 4323 w.write(2, 0x2) 4324 w.write(3, 0x6) 4325 w.write(5, 0x07) 4326 w.write(3, 0x5) 4327 w.write(19, 0x53BC1) 4328 self.assertFalse(b.closed) 4329 self.assertTrue(b.closed) 4330 with open("test.bin", "rb") as f: 4331 self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1") 4332 finally: 4333 os.unlink("test.bin") 4334 4335 b = open("test.bin", "wb") 4336 try: 4337 self.assertFalse(b.closed) 4338 with BitstreamWriter(b, True) as w: 4339 w.write(2, 0x1) 4340 w.write(3, 0x4) 4341 w.write(5, 0x0D) 4342 w.write(3, 0x3) 4343 w.write(19, 0x609DF) 4344 self.assertFalse(b.closed) 4345 self.assertTrue(b.closed) 4346 with open("test.bin", "rb") as f: 4347 self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1") 4348 finally: 4349 os.unlink("test.bin") 4350 4351 # recorders should work in a context manager 4352 # even if it's not particularly useful 4353 with BitstreamRecorder(False) as w: 4354 w.write(2, 0x2) 4355 w.write(3, 0x6) 4356 w.write(5, 0x07) 4357 w.write(3, 0x5) 4358 w.write(19, 0x53BC1) 4359 self.assertEqual(w.data(), b"\xB1\xED\x3B\xC1") 4360 4361 with BitstreamRecorder(True) as w: 4362 w.write(2, 0x1) 4363 w.write(3, 0x4) 4364 w.write(5, 0x0D) 4365 w.write(3, 0x3) 4366 w.write(19, 0x609DF) 4367 self.assertEqual(w.data(), b"\xB1\xED\x3B\xC1") 4368 4369 def __test_writer_marks__(self, writer): 4370 writer.write(1, 1) 4371 self.assertRaises(IOError, writer.getpos) 4372 writer.write(2, 3) 4373 self.assertRaises(IOError, writer.getpos) 4374 writer.write(3, 7) 4375 self.assertRaises(IOError, writer.getpos) 4376 writer.write(2, 3) 4377 pos = writer.getpos() 4378 writer.write(4, 15) 4379 self.assertRaises(IOError, writer.setpos, pos) 4380 writer.write(4, 15) 4381 writer.write(8, 0xFF) 4382 writer.setpos(pos) 4383 writer.write(8, 0) 4384 4385 @LIB_BITSTREAM 4386 def test_writer_marks(self): 4387 from audiotools.bitstream import BitstreamWriter 4388 4389 f = open("test.bin", "wb") 4390 try: 4391 writer = BitstreamWriter(f, False) 4392 self.__test_writer_marks__(writer) 4393 del(writer) 4394 f.close() 4395 with open("test.bin", "rb") as f: 4396 self.assertEqual(f.read(), b"\xFF\x00\xFF") 4397 finally: 4398 os.unlink("test.bin") 4399 4400 f = open("test.bin", "wb") 4401 try: 4402 writer = BitstreamWriter(f, True) 4403 self.__test_writer_marks__(writer) 4404 del(writer) 4405 f.close() 4406 with open("test.bin", "rb") as f: 4407 self.assertEqual(f.read(), b"\xFF\x00\xFF") 4408 finally: 4409 os.unlink("test.bin") 4410 4411 s = BytesIO() 4412 writer = BitstreamWriter(s, False) 4413 self.__test_writer_marks__(writer) 4414 del(writer) 4415 self.assertEqual(s.getvalue(), b"\xFF\x00\xFF") 4416 4417 s = BytesIO() 4418 writer = BitstreamWriter(s, True) 4419 self.__test_writer_marks__(writer) 4420 del(writer) 4421 self.assertEqual(s.getvalue(), b"\xFF\x00\xFF") 4422 4423 4424class TestReplayGain(unittest.TestCase): 4425 @LIB_REPLAYGAIN 4426 def test_basics(self): 4427 import audiotools.replaygain 4428 import audiotools.pcm 4429 4430 # check for invalid sample rate 4431 self.assertRaises(ValueError, 4432 audiotools.replaygain.ReplayGain, 4433 200000) 4434 4435 # check for no samples 4436 gain = audiotools.replaygain.ReplayGain(44100) 4437 self.assertRaises(ValueError, gain.title_gain) 4438 self.assertRaises(ValueError, gain.album_gain) 4439 self.assertEqual(gain.title_peak(), 0.0) 4440 self.assertEqual(gain.album_peak(), 0.0) 4441 4442 @LIB_REPLAYGAIN 4443 def test_valid_rates(self): 4444 import audiotools.replaygain 4445 4446 for sample_rate in [8000, 11025, 12000, 16000, 18900, 22050, 24000, 4447 32000, 37800, 44100, 48000, 56000, 64000, 88200, 4448 96000, 112000, 128000, 144000, 176400, 192000]: 4449 gain = audiotools.replaygain.ReplayGain(sample_rate) 4450 reader = test_streams.Simple_Sine(sample_rate * 2, 4451 sample_rate, 4452 0x4, 4453 16, 4454 (30000, sample_rate // 100)) 4455 audiotools.transfer_data(reader.read, gain.update) 4456 title_gain = gain.title_gain() 4457 title_peak = gain.title_peak() 4458 gain.next_title() 4459 album_gain = gain.album_gain() 4460 album_peak = gain.album_peak() 4461 self.assertLess(title_gain, -4.0) 4462 self.assertGreater(title_peak, .90) 4463 self.assertEqual(title_gain, album_gain) 4464 self.assertEqual(title_peak, album_peak) 4465 4466 @LIB_REPLAYGAIN 4467 def test_pcm(self): 4468 import audiotools.replaygain 4469 4470 gain = audiotools.replaygain.ReplayGain(44100) 4471 audiotools.transfer_data( 4472 test_streams.Sine16_Stereo(44100, 44100, 4473 441.0, 0.50, 4474 4410.0, 0.49, 1.0).read, 4475 gain.update) 4476 4477 title_gain = gain.title_gain() 4478 title_peak = gain.title_peak() 4479 gain.next_title() 4480 4481 main_reader = test_streams.Sine16_Stereo(44100, 44100, 4482 441.0, 0.50, 4483 4410.0, 0.49, 1.0) 4484 4485 reader = audiotools.replaygain.ReplayGainReader(main_reader, 4486 title_gain, 4487 title_peak) 4488 4489 # read FrameLists from ReplayGainReader 4490 f = reader.read(4096) 4491 while len(f) > 0: 4492 f = reader.read(4096) 4493 4494 # ensure subsequent reads return empty FrameLists 4495 for i in range(10): 4496 self.assertEqual(len(reader.read(4096)), 0) 4497 4498 # ensure closing the ReplayGainReader raises ValueError 4499 # on subsequent reads 4500 reader.close() 4501 4502 self.assertRaises(ValueError, reader.read, 4096) 4503 4504 # ensure wrapped reader is also closed 4505 self.assertRaises(ValueError, main_reader.read, 4096) 4506 4507 @LIB_REPLAYGAIN 4508 def test_reader(self): 4509 import audiotools.replaygain 4510 4511 test_format = audiotools.WaveAudio 4512 4513 dummy1 = tempfile.NamedTemporaryFile(suffix="." + test_format.SUFFIX) 4514 dummy2 = tempfile.NamedTemporaryFile(suffix="." + test_format.SUFFIX) 4515 try: 4516 # build dummy file 4517 track1 = test_format.from_pcm( 4518 dummy1.name, 4519 test_streams.Sine16_Stereo(44100, 44100, 4520 441.0, 0.50, 4521 4410.0, 0.49, 1.0)) 4522 4523 # calculate its ReplayGain 4524 gain = audiotools.replaygain.ReplayGain(track1.sample_rate()) 4525 with track1.to_pcm() as pcm: 4526 audiotools.transfer_data(pcm.read, gain.update) 4527 title_gain = gain.title_gain() 4528 title_peak = gain.title_peak() 4529 gain.next_title() 4530 4531 # apply gain to dummy file 4532 track2 = test_format.from_pcm( 4533 dummy2.name, 4534 audiotools.replaygain.ReplayGainReader(track1.to_pcm(), 4535 title_gain, 4536 title_peak)) 4537 4538 # ensure gain applied is quieter than without gain applied 4539 gain2 = audiotools.replaygain.ReplayGain(track1.sample_rate()) 4540 with track2.to_pcm() as pcm: 4541 audiotools.transfer_data(pcm.read, gain2.update) 4542 title_gain2 = gain2.title_gain() 4543 title_peak2 = gain2.title_peak() 4544 gain2.next_title() 4545 4546 self.assertGreater(title_gain2, title_gain) 4547 finally: 4548 dummy1.close() 4549 dummy2.close() 4550 4551 4552class testcuesheet(unittest.TestCase): 4553 def setUp(self): 4554 from audiotools.cue import Cuesheet, read_cuesheet, write_cuesheet 4555 self.suffix = ".cue" 4556 self.sheet_class = Cuesheet 4557 self.read_sheet = read_cuesheet 4558 4559 def __sheets__(self): 4560 from audiotools import Sheet, SheetTrack, SheetIndex 4561 4562 def timestamp_to_frac(m, s, f): 4563 from fractions import Fraction 4564 return Fraction((m * 60 * 75) + (s * 75) + f, 75) 4565 4566 # an ordinary cuesheet with no pre-gaps 4567 yield Sheet([SheetTrack( 4568 number=1, 4569 track_indexes=[ 4570 SheetIndex(1, timestamp_to_frac(0, 0, 0))]), 4571 SheetTrack( 4572 number=2, 4573 track_indexes=[ 4574 SheetIndex(1, timestamp_to_frac(5, 50, 65))]), 4575 SheetTrack( 4576 number=3, 4577 track_indexes=[ 4578 SheetIndex(1, timestamp_to_frac(9, 47, 50))]), 4579 SheetTrack( 4580 number=4, 4581 track_indexes=[ 4582 SheetIndex(1, timestamp_to_frac(15, 12, 53))]), 4583 SheetTrack( 4584 number=5, 4585 track_indexes=[ 4586 SheetIndex(1, timestamp_to_frac(25, 2, 40))]), 4587 SheetTrack( 4588 number=6, 4589 track_indexes=[ 4590 SheetIndex(1, timestamp_to_frac(27, 34, 5))]), 4591 SheetTrack( 4592 number=7, 4593 track_indexes=[ 4594 SheetIndex(1, timestamp_to_frac(31, 58, 53))]), 4595 SheetTrack( 4596 number=8, 4597 track_indexes=[ 4598 SheetIndex(1, timestamp_to_frac(35, 8, 65))])]) 4599 4600 # a cuesheet spread across a couple of different files 4601 yield Sheet([SheetTrack(number=1, 4602 track_indexes=[ 4603 SheetIndex(1, 4604 timestamp_to_frac(0, 0, 0))], 4605 filename=u"TRACK1.WAV"), 4606 SheetTrack(number=2, 4607 track_indexes=[ 4608 SheetIndex(1, 4609 timestamp_to_frac(5, 50, 65))], 4610 filename=u"TRACK1.WAV"), 4611 SheetTrack(number=3, 4612 track_indexes=[ 4613 SheetIndex(1, 4614 timestamp_to_frac(9, 47, 50))], 4615 filename=u"TRACK1.WAV"), 4616 SheetTrack(number=4, 4617 track_indexes=[ 4618 SheetIndex(1, 4619 timestamp_to_frac(15, 12, 53))], 4620 filename=u"TRACK1.WAV"), 4621 SheetTrack(number=5, 4622 track_indexes=[ 4623 SheetIndex(1, 4624 timestamp_to_frac(0, 0, 0))], 4625 filename=u"TRACK2.WAV"), 4626 SheetTrack(number=6, 4627 track_indexes=[ 4628 SheetIndex(1, 4629 timestamp_to_frac(2, 31, 40))], 4630 filename=u"TRACK2.WAV"), 4631 SheetTrack(number=7, 4632 track_indexes=[ 4633 SheetIndex(1, 4634 timestamp_to_frac(6, 56, 13))], 4635 filename=u"TRACK2.WAV"), 4636 SheetTrack(number=8, 4637 track_indexes=[ 4638 SheetIndex(1, 4639 timestamp_to_frac(10, 6, 25))], 4640 filename=u"TRACK2.WAV")]) 4641 4642 # mix in some pre-gaps 4643 yield Sheet([SheetTrack(number=1, 4644 track_indexes=[ 4645 SheetIndex(1, 4646 timestamp_to_frac(0, 0, 0))]), 4647 SheetTrack(number=2, 4648 track_indexes=[ 4649 SheetIndex(0, 4650 timestamp_to_frac(5, 49, 65)), 4651 SheetIndex(1, 4652 timestamp_to_frac(5, 50, 65))]), 4653 SheetTrack(number=3, 4654 track_indexes=[ 4655 SheetIndex(0, 4656 timestamp_to_frac(9, 45, 50)), 4657 SheetIndex(1, 4658 timestamp_to_frac(9, 47, 50))]), 4659 SheetTrack(number=4, 4660 track_indexes=[ 4661 SheetIndex(0, 4662 timestamp_to_frac(15, 9, 53)), 4663 SheetIndex(1, 4664 timestamp_to_frac(15, 12, 53))]) 4665 ]) 4666 4667 # add catalog numbers, ISRCs and multiple index points 4668 yield Sheet([SheetTrack(number=1, 4669 track_indexes=[ 4670 SheetIndex(1, 4671 timestamp_to_frac(0, 0, 0)), 4672 SheetIndex(2, 4673 timestamp_to_frac(2, 0, 0)), 4674 SheetIndex(3, 4675 timestamp_to_frac(4, 0, 0))], 4676 metadata=audiotools.MetaData( 4677 ISRC=u"ABCDE1234567"), 4678 filename=u"MYAUDIO1.WAV"), 4679 SheetTrack(number=2, 4680 track_indexes=[ 4681 SheetIndex(1, 4682 timestamp_to_frac(0, 0, 0))], 4683 metadata=audiotools.MetaData( 4684 ISRC=u"XYZZY0000000"), 4685 filename=u"MYAUDIO2.WAV"), 4686 SheetTrack(number=3, 4687 track_indexes=[ 4688 SheetIndex(0, 4689 timestamp_to_frac(3, 0, 0)), 4690 SheetIndex(1, 4691 timestamp_to_frac(3, 2, 0)), 4692 SheetIndex(2, 4693 timestamp_to_frac(5, 34, 32)), 4694 SheetIndex(3, 4695 timestamp_to_frac(8, 12, 49)), 4696 SheetIndex(4, 4697 timestamp_to_frac(10, 1, 74))], 4698 metadata=audiotools.MetaData( 4699 ISRC=u"123456789012"), 4700 filename=u"MYAUDIO2.WAV")], 4701 metadata=audiotools.MetaData(catalog=u"3898347789120")) 4702 4703 def __metadata_sheets__(self): 4704 from audiotools import Sheet, SheetTrack, SheetIndex 4705 4706 def timestamp_to_frac(m, s, f): 4707 from fractions import Fraction 4708 return Fraction((m * 60 * 75) + (s * 75) + f, 75) 4709 4710 # a sheet with a portable set of plain metadata 4711 yield Sheet([SheetTrack(number=1, 4712 track_indexes=[ 4713 SheetIndex(1, 4714 timestamp_to_frac(0, 0, 0))], 4715 metadata=audiotools.MetaData( 4716 track_name=u"Track 1", 4717 performer_name=u"Performer 1", 4718 artist_name=u"Artist 1"), 4719 filename=u"CDImage.wav"), 4720 SheetTrack(number=2, 4721 track_indexes=[ 4722 SheetIndex(0, 4723 timestamp_to_frac(4, 36, 50)), 4724 SheetIndex(1, 4725 timestamp_to_frac(4, 41, 10))], 4726 metadata=audiotools.MetaData( 4727 track_name=u"Track 2", 4728 performer_name=u"Performer 2", 4729 artist_name=u"Artist 2"), 4730 filename=u"CDImage.wav")], 4731 metadata=audiotools.MetaData( 4732 album_name=u"Album Name", 4733 performer_name=u"Album Performer", 4734 artist_name=u"Album Artist")) 4735 4736 # a sheet with a lot of strings that need escaping 4737 yield Sheet([SheetTrack(number=1, 4738 track_indexes=[ 4739 SheetIndex(1, 4740 timestamp_to_frac(0, 0, 0))], 4741 metadata=audiotools.MetaData( 4742 track_name=u"Track \"1\"", 4743 performer_name=u"Performer \"1\"", 4744 artist_name=u"Artist \"1\""), 4745 filename=u"CD\"Image\".wav"), 4746 SheetTrack(number=2, 4747 track_indexes=[ 4748 SheetIndex(0, 4749 timestamp_to_frac(4, 36, 50)), 4750 SheetIndex(1, 4751 timestamp_to_frac(4, 41, 10))], 4752 metadata=audiotools.MetaData( 4753 track_name=u"Track \"2\"", 4754 performer_name=u"Performer \"2\"", 4755 artist_name=u"Artist \"2\""), 4756 filename=u"CD\"Image\".wav")], 4757 metadata=audiotools.MetaData( 4758 album_name=u"Album \"Name\"", 4759 performer_name=u"Album \"Performer\"", 4760 artist_name=u"Album \"Artist\"")) 4761 4762 # a sheet with lots of backslashes that need escaping 4763 yield Sheet([SheetTrack(number=1, 4764 track_indexes=[ 4765 SheetIndex(1, 4766 timestamp_to_frac(0, 0, 0))], 4767 metadata=audiotools.MetaData( 4768 track_name=u"Track \\ 1", 4769 performer_name=u"Performer \\ 1", 4770 artist_name=u"Artist \\ 1"), 4771 filename=u"CD\\Image.wav"), 4772 SheetTrack(number=2, 4773 track_indexes=[ 4774 SheetIndex(0, 4775 timestamp_to_frac(4, 36, 50)), 4776 SheetIndex(1, 4777 timestamp_to_frac(4, 41, 10))], 4778 metadata=audiotools.MetaData( 4779 track_name=u"Track \\ 2", 4780 performer_name=u"Performer \\ 2", 4781 artist_name=u"Artist \\ 2"), 4782 filename=u"CD\\Image.wav")], 4783 metadata=audiotools.MetaData( 4784 album_name=u"Album \\ Name", 4785 performer_name=u"Album \\ Performer", 4786 artist_name=u"Album \\ Artist")) 4787 4788 @LIB_CUESHEET 4789 def test_attributes(self): 4790 from audiotools import Sheet, SheetTrack, SheetIndex 4791 4792 def timestamp_to_frac(m, s, f): 4793 from fractions import Fraction 4794 return Fraction((m * 60 * 75) + (s * 75) + f, 75) 4795 4796 raw_sheet = \ 4797 Sheet([SheetTrack(number=1, 4798 track_indexes=[ 4799 SheetIndex(0, 4800 timestamp_to_frac(0, 0, 0)), 4801 SheetIndex(1, 4802 timestamp_to_frac(0, 1, 0))], 4803 metadata=audiotools.MetaData( 4804 track_name=u"Track 1"), 4805 filename=u"CDImage.wav"), 4806 SheetTrack(number=2, 4807 track_indexes=[ 4808 SheetIndex(1, 4809 timestamp_to_frac(0, 5, 0))], 4810 metadata=audiotools.MetaData( 4811 track_name=u"Track 2"), 4812 filename=u"CDImage.wav"), 4813 SheetTrack(number=3, 4814 track_indexes=[ 4815 SheetIndex(0, 4816 timestamp_to_frac(0, 9, 0)), 4817 SheetIndex(1, 4818 timestamp_to_frac(0, 11, 0)), 4819 SheetIndex(2, 4820 timestamp_to_frac(0, 12, 0))], 4821 metadata=audiotools.MetaData( 4822 track_name=u"Track 3"), 4823 filename=u"CDImage.wav")]) 4824 4825 sheet = self.sheet_class.converted(raw_sheet) 4826 4827 self.assertEqual(sheet, raw_sheet) 4828 for other_sheet in self.__sheets__(): 4829 self.assertNotEqual(sheet, other_sheet) 4830 4831 self.assertEqual(sheet.track_numbers(), [1, 2, 3]) 4832 self.assertRaises(KeyError, sheet.track, 0) 4833 self.assertRaises(KeyError, sheet.track, 4) 4834 self.assertEqual(sheet.pre_gap(), 1) 4835 self.assertEqual(sheet.track_offset(1), 1) 4836 self.assertEqual(sheet.track_length(1), 4) 4837 self.assertEqual(sheet.track_offset(2), 5) 4838 self.assertEqual(sheet.track_length(2), 6) 4839 self.assertEqual(sheet.track_offset(3), 11) 4840 self.assertEqual(sheet.track_length(3), None) 4841 4842 self.assertEqual(sheet.track(1).indexes(), [0, 1]) 4843 self.assertRaises(KeyError, sheet.track(1).index, 2) 4844 self.assertEqual(sheet.track(2).indexes(), [1]) 4845 self.assertRaises(KeyError, sheet.track(2).index, 0) 4846 self.assertRaises(KeyError, sheet.track(2).index, 2) 4847 self.assertEqual(sheet.track(3).indexes(), [0, 1, 2]) 4848 self.assertRaises(KeyError, sheet.track(3).index, 3) 4849 4850 round_trip_sheet = Sheet.converted(sheet) 4851 self.assertEqual(round_trip_sheet, sheet) 4852 4853 @LIB_CUESHEET 4854 def test_round_trip(self): 4855 for sheet in self.__sheets__(): 4856 converted = self.sheet_class.converted(sheet) 4857 self.assertEqual(converted, sheet) 4858 temp_sheet = tempfile.NamedTemporaryFile(suffix=self.suffix) 4859 temp_sheet.write(converted.build().encode("UTF-8")) 4860 temp_sheet.flush() 4861 re_read = self.read_sheet(temp_sheet.name) 4862 temp_sheet.close() 4863 self.assertEqual(re_read, sheet) 4864 4865 @LIB_CUESHEET 4866 def test_metadata(self): 4867 for sheet in self.__metadata_sheets__(): 4868 converted = self.sheet_class.converted(sheet) 4869 self.assertEqual(converted, sheet) 4870 temp_sheet = tempfile.NamedTemporaryFile(suffix=self.suffix) 4871 temp_sheet.write(converted.build().encode("UTF-8")) 4872 temp_sheet.flush() 4873 re_read = self.read_sheet(temp_sheet.name) 4874 temp_sheet.close() 4875 self.assertEqual(re_read, sheet) 4876 4877 @LIB_CUESHEET 4878 def test_flags(self): 4879 from audiotools import Sheet, SheetTrack, SheetIndex 4880 from fractions import Fraction 4881 4882 raw_sheet = \ 4883 Sheet([SheetTrack(number=1, 4884 track_indexes=[SheetIndex(1, Fraction(0, 1))], 4885 filename=u"track1.wav", 4886 pre_emphasis=False, 4887 copy_permitted=False), 4888 SheetTrack(number=2, 4889 track_indexes=[SheetIndex(1, Fraction(0, 1))], 4890 filename=u"track2.wav", 4891 pre_emphasis=True, 4892 copy_permitted=False), 4893 SheetTrack(number=3, 4894 track_indexes=[SheetIndex(1, Fraction(0, 1))], 4895 filename=u"track3.wav", 4896 pre_emphasis=False, 4897 copy_permitted=True), 4898 SheetTrack(number=4, 4899 track_indexes=[SheetIndex(1, Fraction(0, 1))], 4900 filename=u"track4.wav", 4901 pre_emphasis=True, 4902 copy_permitted=True)]) 4903 4904 self.assertEqual(raw_sheet.track(1).filename(), u"track1.wav") 4905 self.assertEqual(raw_sheet.track(1).pre_emphasis(), False) 4906 self.assertEqual(raw_sheet.track(1).copy_permitted(), False) 4907 self.assertEqual(raw_sheet.track(2).filename(), u"track2.wav") 4908 self.assertEqual(raw_sheet.track(2).pre_emphasis(), True) 4909 self.assertEqual(raw_sheet.track(2).copy_permitted(), False) 4910 self.assertEqual(raw_sheet.track(3).filename(), u"track3.wav") 4911 self.assertEqual(raw_sheet.track(3).pre_emphasis(), False) 4912 self.assertEqual(raw_sheet.track(3).copy_permitted(), True) 4913 self.assertEqual(raw_sheet.track(4).filename(), u"track4.wav") 4914 self.assertEqual(raw_sheet.track(4).pre_emphasis(), True) 4915 self.assertEqual(raw_sheet.track(4).copy_permitted(), True) 4916 4917 sheet = self.sheet_class.converted(raw_sheet) 4918 4919 self.assertEqual(sheet.track(1).filename(), u"track1.wav") 4920 self.assertEqual(sheet.track(1).pre_emphasis(), False) 4921 self.assertEqual(sheet.track(1).copy_permitted(), False) 4922 self.assertEqual(sheet.track(2).filename(), u"track2.wav") 4923 self.assertEqual(sheet.track(2).pre_emphasis(), True) 4924 self.assertEqual(sheet.track(2).copy_permitted(), False) 4925 self.assertEqual(sheet.track(3).filename(), u"track3.wav") 4926 self.assertEqual(sheet.track(3).pre_emphasis(), False) 4927 self.assertEqual(sheet.track(3).copy_permitted(), True) 4928 self.assertEqual(sheet.track(4).filename(), u"track4.wav") 4929 self.assertEqual(sheet.track(4).pre_emphasis(), True) 4930 self.assertEqual(sheet.track(4).copy_permitted(), True) 4931 4932 self.assertEqual(sheet, raw_sheet) 4933 4934 for other_sheet in self.__sheets__(): 4935 self.assertNotEqual(sheet, other_sheet) 4936 4937 # round-trip sheet to disk to ensure it still works 4938 temp_sheet = tempfile.NamedTemporaryFile(suffix=self.suffix) 4939 temp_sheet.write(sheet.build().encode("UTF-8")) 4940 temp_sheet.flush() 4941 re_read = self.read_sheet(temp_sheet.name) 4942 temp_sheet.close() 4943 self.assertEqual(re_read, sheet) 4944 4945 4946class testtocfile(testcuesheet): 4947 def setUp(self): 4948 from audiotools.toc import TOCFile, read_tocfile, write_tocfile 4949 self.suffix = ".toc" 4950 self.sheet_class = TOCFile 4951 self.read_sheet = read_tocfile 4952 4953 4954class test_flac_cuesheet(testcuesheet): 4955 def setUp(self): 4956 self.audio_class = audiotools.FlacAudio 4957 4958 def __sheets__(self): 4959 # unlike the regular testcuesheet files 4960 # these only contain CD images 4961 4962 for sheet in testcuesheet.__sheets__(self): 4963 if sheet.image_formatted(): 4964 yield sheet 4965 4966 @LIB_CUESHEET 4967 def test_attributes(self): 4968 from audiotools import Sheet, SheetTrack, SheetIndex 4969 from audiotools.flac import Flac_CUESHEET 4970 4971 def timestamp_to_frac(m, s, f): 4972 from fractions import Fraction 4973 return Fraction((m * 60 * 75) + (s * 75) + f, 75) 4974 4975 raw_sheet = \ 4976 Sheet([SheetTrack(number=1, 4977 track_indexes=[ 4978 SheetIndex(0, 4979 timestamp_to_frac(0, 0, 0)), 4980 SheetIndex(1, 4981 timestamp_to_frac(0, 1, 0))], 4982 metadata=audiotools.MetaData( 4983 track_name=u"Track 1"), 4984 filename=u"CDImage.wav"), 4985 SheetTrack(number=2, 4986 track_indexes=[ 4987 SheetIndex(1, 4988 timestamp_to_frac(0, 5, 0))], 4989 metadata=audiotools.MetaData( 4990 track_name=u"Track 2"), 4991 filename=u"CDImage.wav"), 4992 SheetTrack(number=3, 4993 track_indexes=[ 4994 SheetIndex(0, 4995 timestamp_to_frac(0, 9, 0)), 4996 SheetIndex(1, 4997 timestamp_to_frac(0, 11, 0)), 4998 SheetIndex(2, 4999 timestamp_to_frac(0, 12, 0))], 5000 metadata=audiotools.MetaData( 5001 track_name=u"Track 3"), 5002 filename=u"CDImage.wav")]) 5003 5004 sheet = Flac_CUESHEET.converted(raw_sheet, 882000, 44100) 5005 5006 self.assertEqual(sheet, raw_sheet) 5007 for other_sheet in self.__sheets__(): 5008 self.assertNotEqual(sheet, other_sheet) 5009 5010 self.assertEqual(sheet.track_numbers(), [1, 2, 3]) 5011 self.assertRaises(KeyError, sheet.track, 0) 5012 self.assertRaises(KeyError, sheet.track, 4) 5013 self.assertEqual(sheet.pre_gap(), 1) 5014 self.assertEqual(sheet.track_offset(1), 1) 5015 self.assertEqual(sheet.track_length(1), 4) 5016 self.assertEqual(sheet.track_offset(2), 5) 5017 self.assertEqual(sheet.track_length(2), 6) 5018 self.assertEqual(sheet.track_offset(3), 11) 5019 self.assertEqual(sheet.track_length(3), 9) 5020 5021 self.assertEqual(sheet.track(1).indexes(), [0, 1]) 5022 self.assertRaises(KeyError, sheet.track(1).index, 2) 5023 self.assertEqual(sheet.track(2).indexes(), [1]) 5024 self.assertRaises(KeyError, sheet.track(2).index, 0) 5025 self.assertRaises(KeyError, sheet.track(2).index, 2) 5026 self.assertEqual(sheet.track(3).indexes(), [0, 1, 2]) 5027 self.assertRaises(KeyError, sheet.track(3).index, 3) 5028 5029 round_trip_sheet = Sheet.converted(sheet) 5030 self.assertEqual(round_trip_sheet, sheet) 5031 5032 @LIB_CUESHEET 5033 def test_round_trip(self): 5034 sample_rate = 44100 5035 5036 for sheet in self.__sheets__(): 5037 # tack on 1 minute to cuesheet's last index for total size 5038 total_length = int((sheet[-1][-1].offset() + 60) * sample_rate) 5039 5040 # create dummy file 5041 temp_file = tempfile.NamedTemporaryFile( 5042 suffix="." + self.audio_class.SUFFIX) 5043 temp_track = self.audio_class.from_pcm( 5044 temp_file.name, 5045 EXACT_SILENCE_PCM_Reader(pcm_frames=total_length, 5046 sample_rate=sample_rate, 5047 channels=2, 5048 bits_per_sample=16, 5049 channel_mask=0x3), 5050 total_pcm_frames=total_length) 5051 5052 # set cuesheet 5053 temp_track.set_cuesheet(sheet) 5054 5055 # get cuesheet 5056 track_sheet = audiotools.open(temp_file.name).get_cuesheet() 5057 self.assertIsNot(track_sheet, None) 5058 5059 # ensure they match 5060 self.assertEqual(track_sheet, sheet) 5061 5062 # clean out dummy file 5063 temp_file.close() 5064 5065 @LIB_CUESHEET 5066 def test_metadata(self): 5067 # FLAC cuesheets don't support meaningful metadata 5068 # outside of catalog and ISRC 5069 self.assertTrue(True) 5070 5071 @LIB_CUESHEET 5072 def test_flags(self): 5073 # FLAC cuesheets only support pre-emphasis flag 5074 #FIXME 5075 5076 self.assertTrue(True) 5077 5078 5079class test_oggflac_cuesheet(test_flac_cuesheet): 5080 def setUp(self): 5081 self.audio_class = audiotools.OggFlacAudio 5082 5083 @LIB_CUESHEET 5084 def test_attributes(self): 5085 pass 5086 5087 5088class test_tta_cuesheet(test_flac_cuesheet): 5089 def setUp(self): 5090 self.audio_class = audiotools.TrueAudio 5091 5092 @LIB_CUESHEET 5093 def test_attributes(self): 5094 pass 5095 5096 5097class test_wavpack_cuesheet(test_flac_cuesheet): 5098 def setUp(self): 5099 self.audio_class = audiotools.WavPackAudio 5100 5101 @LIB_CUESHEET 5102 def test_attributes(self): 5103 pass 5104 5105 5106class TestCDTOC(unittest.TestCase): 5107 @LIB_CUESHEET 5108 def test_round_trip(self): 5109 from audiotools.cdtoc import CDTOC 5110 from audiotools import Sheet 5111 from fractions import Fraction 5112 5113 tocs = [u"A+96+4975+99F4+E600+130AB+16D98+1A538+" + 5114 u"1EC41+22DE4+27579+2C6D2", 5115 u"A+2373+85AC+D381+124BC+15A95+1A7CA+" + 5116 u"2112E+25BC7+2A3B6+3001B+39DC8", 5117 u"D+96+49F3+8A23+C240+10991+1491C+18F27+1DBD8+" + 5118 u"216CC+25C00+2ACA3+2F41F+33D59+382D4+44C3D", 5119 u"B+908C+F7F7+14708+1A8BC+206F8+27294+2C51B+" + 5120 u"31A48+3862F+3D89D+44661+4A29C+X96"] 5121 5122 lead_outs = [181820, 236850, 281511, 303622] 5123 5124 for (toc, lead_out) in zip(tocs, lead_outs): 5125 cdtoc = CDTOC.from_unicode(toc) 5126 self.assertEqual(cdtoc.__unicode__(), toc) 5127 sheet = Sheet.converted(cdtoc) 5128 self.assertEqual(cdtoc, sheet) 5129 cdtoc2 = CDTOC.converted(sheet, Fraction(lead_out, 75)) 5130 self.assertEqual(cdtoc2, cdtoc) 5131 self.assertEqual(cdtoc2.__unicode__(), toc) 5132 5133 @LIB_CUESHEET 5134 def test_flac(self): 5135 from audiotools import Sheet, SheetTrack, SheetIndex 5136 from audiotools.flac import Flac_CUESHEET 5137 from fractions import Fraction 5138 5139 with tempfile.NamedTemporaryFile(suffix=".flac") as temp_file: 5140 temp_track = audiotools.FlacAudio.from_pcm( 5141 temp_file.name, 5142 EXACT_SILENCE_PCM_Reader(44100 * 13), 5143 total_pcm_frames=44100 * 13) 5144 5145 # check malformed CDTOC tag 5146 self.assertIsNone(temp_track.get_cuesheet()) 5147 metadata = temp_track.get_metadata() 5148 comment = metadata.get_block(4) 5149 comment[u"CDTOC"] = [u"INVALID"] 5150 temp_track.update_metadata(metadata) 5151 self.assertEqual(temp_track.get_metadata().get_block(4)[u"CDTOC"], 5152 [u"INVALID"]) 5153 self.assertIsNone(temp_track.get_cuesheet()) 5154 5155 # check CDTOC tag 5156 metadata = temp_track.get_metadata() 5157 comment = metadata.get_block(4) 5158 comment[u"CDTOC"] = [u"3+E1+177+2A3+465"] 5159 temp_track.update_metadata(metadata) 5160 cuesheet = temp_track.get_cuesheet() 5161 self.assertIsNotNone(cuesheet) 5162 self.assertEqual(cuesheet.pre_gap(), 1) 5163 self.assertEqual(cuesheet.track(1).index(1).offset(), 1) 5164 self.assertEqual(cuesheet.track_length(1), 2) 5165 self.assertEqual(cuesheet.track(2).index(1).offset(), 3) 5166 self.assertEqual(cuesheet.track_length(2), 4) 5167 self.assertEqual(cuesheet.track(3).index(1).offset(), 7) 5168 self.assertEqual(cuesheet.track_length(3), 6) 5169 5170 # ensure CUESHEET block takes precedence over tag in get_cuesheet 5171 new_sheet = Sheet([SheetTrack(1, [SheetIndex(0, Fraction(0, 1)), 5172 SheetIndex(1, Fraction(1, 1))]), 5173 SheetTrack(2, [SheetIndex(1, Fraction(4, 1))]), 5174 SheetTrack(3, [SheetIndex(1, Fraction(6, 1))])]) 5175 5176 metadata = temp_track.get_metadata() 5177 metadata.replace_blocks( 5178 5, 5179 [Flac_CUESHEET.converted(new_sheet, 5180 temp_track.total_frames(), 5181 temp_track.sample_rate())]) 5182 temp_track.update_metadata(metadata) 5183 metadata = temp_track.get_metadata() 5184 self.assertTrue(metadata.has_block(5)) 5185 self.assertEqual(metadata.get_block(4)[u"CDTOC"], 5186 [u"3+E1+177+2A3+465"]) 5187 cuesheet = temp_track.get_cuesheet() 5188 self.assertIsNotNone(cuesheet) 5189 self.assertEqual(cuesheet.pre_gap(), 1) 5190 self.assertEqual(cuesheet.track(1).index(1).offset(), 1) 5191 self.assertEqual(cuesheet.track_length(1), 3) 5192 self.assertEqual(cuesheet.track(2).index(1).offset(), 4) 5193 self.assertEqual(cuesheet.track_length(2), 2) 5194 self.assertEqual(cuesheet.track(3).index(1).offset(), 6) 5195 self.assertEqual(cuesheet.track_length(3), 7) 5196 5197 # set_cuesheet overwrites block and wipes out tag 5198 new_sheet2 = Sheet([SheetTrack(1, [SheetIndex(1, Fraction(0, 1))]), 5199 SheetTrack(2, [SheetIndex(1, Fraction(5, 1))])]) 5200 temp_track.set_cuesheet(new_sheet2) 5201 metadata = temp_track.get_metadata() 5202 self.assertEqual(len(metadata.get_blocks(5)), 1) 5203 vorbiscomment = metadata.get_block(4) 5204 self.assertRaises(KeyError, 5205 vorbiscomment.__getitem__, 5206 u"CDTOC") 5207 cuesheet = temp_track.get_cuesheet() 5208 self.assertIsNotNone(cuesheet) 5209 self.assertEqual(cuesheet.pre_gap(), 0) 5210 self.assertEqual(cuesheet.track(1).index(1).offset(), 0) 5211 self.assertEqual(cuesheet.track_length(1), 5) 5212 self.assertEqual(cuesheet.track(2).index(1).offset(), 5) 5213 self.assertEqual(cuesheet.track_length(2), 8) 5214 5215 # delete_cuesheet wipes out both block and tag 5216 metadata = temp_track.get_metadata() 5217 metadata.get_block(4)[u"CDTOC"] = [u"3+E1+177+2A3+465"] 5218 metadata.replace_blocks( 5219 5, 5220 [Flac_CUESHEET.converted(new_sheet, 5221 temp_track.total_frames(), 5222 temp_track.sample_rate())]) 5223 temp_track.update_metadata(metadata) 5224 temp_track.delete_cuesheet() 5225 self.assertIsNone(temp_track.get_cuesheet()) 5226 metadata = temp_track.get_metadata() 5227 self.assertFalse(metadata.has_block(5)) 5228 self.assertRaises(KeyError, 5229 metadata.get_block(4).__getitem__, 5230 u"CDTOC") 5231 5232 5233class TestMultiChannel(unittest.TestCase): 5234 def setUp(self): 5235 # these support the full range of ChannelMasks 5236 self.wav_channel_masks = [audiotools.WaveAudio, 5237 audiotools.WavPackAudio] 5238 5239 # these support a subset of ChannelMasks up to 6 channels 5240 self.flac_channel_masks = [audiotools.FlacAudio, 5241 audiotools.OggFlacAudio] 5242 5243 # these support a reordered subset of ChannelMasks up to 8 channels 5244 self.vorbis_channel_masks = [audiotools.VorbisAudio, 5245 audiotools.OpusAudio] 5246 5247 def __test_mask_blank__(self, audio_class, channel_mask): 5248 with tempfile.NamedTemporaryFile( 5249 suffix="." + audio_class.SUFFIX) as temp_file: 5250 temp_track = audio_class.from_pcm( 5251 temp_file.name, 5252 Join_Reader( 5253 [BLANK_PCM_Reader(2, channels=1) 5254 for i in range(len(channel_mask))], 5255 int(channel_mask))) 5256 self.assertEqual(temp_track.channel_mask(), channel_mask, 5257 "%s != %s for format %s" % 5258 (temp_track.channel_mask(), 5259 channel_mask, 5260 audio_class.NAME)) 5261 5262 pcm = temp_track.to_pcm() 5263 self.assertEqual(int(pcm.channel_mask), int(channel_mask)) 5264 audiotools.transfer_framelist_data(pcm, lambda x: None) 5265 5266 def __test_undefined_mask_blank__(self, audio_class, channels, 5267 should_be_blank): 5268 temp_file = tempfile.NamedTemporaryFile( 5269 suffix="." + audio_class.SUFFIX) 5270 try: 5271 temp_track = audio_class.from_pcm( 5272 temp_file.name, 5273 Join_Reader( 5274 [BLANK_PCM_Reader(2, channels=1) 5275 for i in range(channels)], 5276 int(audiotools.ChannelMask(0)))) 5277 self.assertEqual(temp_track.channels(), channels) 5278 if should_be_blank: 5279 self.assertEqual(int(temp_track.channel_mask()), 0) 5280 pcm = temp_track.to_pcm() 5281 self.assertEqual(int(pcm.channel_mask), 0) 5282 audiotools.transfer_framelist_data(pcm, lambda x: None) 5283 else: 5284 self.assertNotEqual(int(temp_track.channel_mask()), 0, 5285 "mask = %s for format %s at %d channels" % 5286 (temp_track.channel_mask(), 5287 audio_class, 5288 channels)) 5289 pcm = temp_track.to_pcm() 5290 self.assertEqual(int(pcm.channel_mask), 5291 int(temp_track.channel_mask())) 5292 audiotools.transfer_framelist_data(pcm, lambda x: None) 5293 finally: 5294 temp_file.close() 5295 5296 def __test_error_mask_blank__(self, audio_class, channels, 5297 channel_mask): 5298 temp_file = tempfile.NamedTemporaryFile( 5299 suffix="." + audio_class.SUFFIX) 5300 try: 5301 self.assertRaises( 5302 audiotools.UnsupportedChannelMask, 5303 audio_class.from_pcm, 5304 temp_file.name, 5305 Join_Reader([BLANK_PCM_Reader(2, channels=1) 5306 for i in range(channels)], 5307 int(channel_mask))) 5308 finally: 5309 temp_file.close() 5310 5311 def __test_error_channel_count__(self, audio_class, channels, 5312 channel_mask): 5313 temp_file = tempfile.NamedTemporaryFile( 5314 suffix="." + audio_class.SUFFIX) 5315 try: 5316 self.assertRaises( 5317 audiotools.UnsupportedChannelCount, 5318 audio_class.from_pcm, 5319 temp_file.name, 5320 Join_Reader([BLANK_PCM_Reader(2, channels=1) 5321 for i in range(channels)], 5322 int(channel_mask))) 5323 finally: 5324 temp_file.close() 5325 5326 def __test_pcm_conversion__(self, 5327 source_audio_class, 5328 target_audio_class, 5329 channel_mask): 5330 source_file = tempfile.NamedTemporaryFile( 5331 suffix="." + source_audio_class.SUFFIX) 5332 target_file = tempfile.NamedTemporaryFile( 5333 suffix="." + target_audio_class.SUFFIX) 5334 wav_file = tempfile.NamedTemporaryFile(suffix=".wav") 5335 try: 5336 source_track = source_audio_class.from_pcm( 5337 source_file.name, 5338 Join_Reader( 5339 [BLANK_PCM_Reader(2, channels=1) 5340 for i in range(len(channel_mask))], 5341 int(channel_mask))) 5342 self.assertEqual(source_track.channel_mask(), channel_mask) 5343 5344 source_pcm = source_track.to_pcm() 5345 5346 self.assertEqual(isinstance(source_pcm.channel_mask, int), 5347 True, 5348 "%s's to_pcm() PCMReader is not an int" % 5349 (source_audio_class.NAME)) 5350 5351 target_track = target_audio_class.from_pcm( 5352 target_file.name, 5353 source_pcm) 5354 5355 self.assertEqual(target_track.channel_mask(), channel_mask) 5356 self.assertEqual(source_track.channel_mask(), 5357 target_track.channel_mask()) 5358 5359 source_track.convert(wav_file.name, audiotools.WaveAudio) 5360 wav = audiotools.open(wav_file.name) 5361 wav.verify() 5362 self.assertEqual(source_track.channel_mask(), 5363 wav.channel_mask()) 5364 target_track = wav.convert(target_file.name, 5365 audiotools.WaveAudio) 5366 self.assertEqual(target_track.channel_mask(), channel_mask) 5367 self.assertEqual(source_track.channel_mask(), 5368 target_track.channel_mask()) 5369 finally: 5370 source_file.close() 5371 target_file.close() 5372 wav_file.close() 5373 5374 @LIB_CORE 5375 def test_channel_mask(self): 5376 from_fields = audiotools.ChannelMask.from_fields 5377 5378 for audio_class in (self.wav_channel_masks + 5379 self.flac_channel_masks + 5380 self.vorbis_channel_masks): 5381 for mask in [from_fields(front_center=True), 5382 from_fields(front_left=True, 5383 front_right=True), 5384 from_fields(front_left=True, 5385 front_right=True, 5386 front_center=True), 5387 from_fields(front_right=True, 5388 front_left=True, 5389 back_right=True, 5390 back_left=True), 5391 from_fields(front_right=True, 5392 front_center=True, 5393 front_left=True, 5394 back_right=True, 5395 back_left=True), 5396 from_fields(front_right=True, 5397 front_center=True, 5398 low_frequency=True, 5399 front_left=True, 5400 back_right=True, 5401 back_left=True)]: 5402 self.__test_mask_blank__(audio_class, mask) 5403 5404 for audio_class in (self.wav_channel_masks + 5405 self.vorbis_channel_masks): 5406 for mask in [from_fields(front_left=True, front_right=True, 5407 front_center=True, 5408 side_left=True, side_right=True, 5409 back_center=True, low_frequency=True), 5410 from_fields(front_left=True, front_right=True, 5411 side_left=True, side_right=True, 5412 back_left=True, back_right=True, 5413 front_center=True, low_frequency=True)]: 5414 self.__test_mask_blank__(audio_class, mask) 5415 5416 for audio_class in self.wav_channel_masks: 5417 for mask in [from_fields(front_left=True, front_right=True, 5418 side_left=True, side_right=True, 5419 back_left=True, back_right=True, 5420 front_center=True, back_center=True, 5421 low_frequency=True), 5422 from_fields(front_left=True, front_right=True, 5423 side_left=True, side_right=True, 5424 back_left=True, back_right=True, 5425 front_center=True, back_center=True)]: 5426 self.__test_mask_blank__(audio_class, mask) 5427 5428 @LIB_CORE 5429 def test_channel_mask_conversion(self): 5430 from_fields = audiotools.ChannelMask.from_fields 5431 5432 for source_audio_class in audiotools.AVAILABLE_TYPES: 5433 for target_audio_class in audiotools.AVAILABLE_TYPES: 5434 self.__test_pcm_conversion__(source_audio_class, 5435 target_audio_class, 5436 from_fields(front_left=True, 5437 front_right=True)) 5438 5439 for source_audio_class in (self.wav_channel_masks + 5440 self.flac_channel_masks + 5441 self.vorbis_channel_masks): 5442 for target_audio_class in (self.wav_channel_masks + 5443 self.flac_channel_masks + 5444 self.vorbis_channel_masks): 5445 for mask in [from_fields(front_center=True), 5446 from_fields(front_left=True, 5447 front_right=True), 5448 from_fields(front_left=True, 5449 front_right=True, 5450 front_center=True), 5451 from_fields(front_right=True, 5452 front_left=True, 5453 back_right=True, 5454 back_left=True), 5455 from_fields(front_right=True, 5456 front_center=True, 5457 front_left=True, 5458 back_right=True, 5459 back_left=True), 5460 from_fields(front_right=True, 5461 front_center=True, 5462 low_frequency=True, 5463 front_left=True, 5464 back_right=True, 5465 back_left=True)]: 5466 self.__test_pcm_conversion__(source_audio_class, 5467 target_audio_class, 5468 mask) 5469 5470 for source_audio_class in (self.wav_channel_masks + 5471 self.vorbis_channel_masks): 5472 for target_audio_class in (self.wav_channel_masks + 5473 self.vorbis_channel_masks): 5474 for mask in [from_fields(front_left=True, 5475 front_right=True, 5476 front_center=True, 5477 side_left=True, 5478 side_right=True, 5479 back_center=True, 5480 low_frequency=True), 5481 from_fields(front_left=True, 5482 front_right=True, 5483 side_left=True, 5484 side_right=True, 5485 back_left=True, 5486 back_right=True, 5487 front_center=True, 5488 low_frequency=True)]: 5489 self.__test_pcm_conversion__(source_audio_class, 5490 target_audio_class, 5491 mask) 5492 5493 for source_audio_class in self.wav_channel_masks: 5494 for target_audio_class in self.wav_channel_masks: 5495 for mask in [from_fields(front_left=True, 5496 front_right=True, 5497 side_left=True, 5498 side_right=True, 5499 back_left=True, 5500 back_right=True, 5501 front_center=True, 5502 back_center=True, 5503 low_frequency=True), 5504 from_fields(front_left=True, 5505 front_right=True, 5506 side_left=True, 5507 side_right=True, 5508 back_left=True, 5509 back_right=True, 5510 front_center=True, 5511 back_center=True)]: 5512 self.__test_pcm_conversion__(source_audio_class, 5513 target_audio_class, 5514 mask) 5515 5516 @LIB_CORE 5517 def test_unsupported_channel_mask_from_pcm(self): 5518 for channels in range(1, 6 + 1): 5519 self.__test_undefined_mask_blank__(audiotools.WaveAudio, 5520 channels, 5521 False) 5522 for channels in range(1, 3): 5523 self.__test_undefined_mask_blank__(audiotools.WavPackAudio, 5524 channels, 5525 False) 5526 for channels in range(3, 21): 5527 self.__test_undefined_mask_blank__(audiotools.WavPackAudio, 5528 channels, 5529 True) 5530 5531 for channels in range(1, 9): 5532 self.__test_undefined_mask_blank__(audiotools.ALACAudio, 5533 channels, 5534 False) 5535 for channels in range(9, 21): 5536 self.__test_undefined_mask_blank__(audiotools.ALACAudio, 5537 channels, 5538 True) 5539 5540 for audio_class in [audiotools.FlacAudio, audiotools.OggFlacAudio]: 5541 for channels in range(1, 9): 5542 self.__test_undefined_mask_blank__(audio_class, 5543 channels, 5544 False) 5545 5546 self.__test_error_channel_count__(audio_class, 5547 9, audiotools.ChannelMask(0)) 5548 self.__test_error_channel_count__(audio_class, 5549 10, audiotools.ChannelMask(0)) 5550 5551 for stereo_audio_class in [audiotools.MP3Audio, 5552 audiotools.MP2Audio]: 5553 5554 self.__test_undefined_mask_blank__(stereo_audio_class, 5555 2, False) 5556 for channels in range(3, 20): 5557 temp_file = tempfile.NamedTemporaryFile( 5558 suffix="." + stereo_audio_class.SUFFIX) 5559 try: 5560 temp_track = stereo_audio_class.from_pcm( 5561 temp_file.name, 5562 Join_Reader( 5563 [BLANK_PCM_Reader(2, channels=1) 5564 for i in range(channels)], 5565 int(audiotools.ChannelMask(0)))) 5566 self.assertEqual(temp_track.channels(), 2) 5567 self.assertEqual( 5568 int(temp_track.channel_mask()), 5569 int(audiotools.ChannelMask.from_fields( 5570 front_left=True, front_right=True))) 5571 pcm = temp_track.to_pcm() 5572 self.assertEqual(int(pcm.channel_mask), 5573 int(temp_track.channel_mask())) 5574 audiotools.transfer_framelist_data(pcm, lambda x: x) 5575 finally: 5576 temp_file.close() 5577 5578 for channels in range(1, 9): 5579 self.__test_undefined_mask_blank__(audiotools.VorbisAudio, 5580 channels, 5581 False) 5582 5583 for channels in range(9, 20): 5584 self.__test_undefined_mask_blank__(audiotools.VorbisAudio, 5585 channels, 5586 True) 5587 5588 for channels in [1, 2]: 5589 self.__test_undefined_mask_blank__(audiotools.AiffAudio, 5590 channels, 5591 False) 5592 5593 for channels in [3, 4, 5, 6, 7, 8, 9, 10]: 5594 self.__test_undefined_mask_blank__(audiotools.AiffAudio, 5595 channels, 5596 True) 5597 5598 for channels in [1, 2]: 5599 self.__test_undefined_mask_blank__(audiotools.AuAudio, 5600 channels, 5601 False) 5602 for channels in range(3, 11): 5603 self.__test_undefined_mask_blank__(audiotools.AuAudio, 5604 channels, 5605 True) 5606 5607 5608class Test_FreeDB(unittest.TestCase): 5609 def __test_disc_id_tracks__(self, disc_id_obj, 5610 track_lengths, 5611 cuesheet, 5612 disc_id): 5613 from audiotools.cdio import CDDAReader 5614 from shutil import rmtree 5615 from sys import version_info 5616 5617 self.assertTrue(isinstance(cuesheet, bytes)) 5618 5619 dir = tempfile.mkdtemp() 5620 try: 5621 # dump cuesheet to temporary directory 5622 with open(os.path.join(dir, "CDImage.cue"), "wb") as f: 5623 f.write(cuesheet) 5624 5625 # build CD image from track lengths 5626 with open(os.path.join(dir, "CDImage.bin"), "wb") as f: 5627 f.write(b"\x00" * 2 * 2 * sum(track_lengths)) 5628 5629 # open disc image with CDDAReader 5630 cddareader = CDDAReader(os.path.join(dir, "CDImage.cue")) 5631 5632 # ensure DiscID from CDDAReader matches 5633 self.assertEqual(str(disc_id_obj.from_cddareader(cddareader)), 5634 disc_id) 5635 5636 # dump contents of CDDAReader to individual tracks 5637 tracks = [] 5638 for i in sorted(cddareader.track_offsets.keys()): 5639 offset = cddareader.track_offsets[i] 5640 length = cddareader.track_lengths[i] 5641 self.assertEqual(length, track_lengths[i - 1]) 5642 self.assertEqual(cddareader.seek(offset), offset) 5643 tracks.append(audiotools.WaveAudio.from_pcm( 5644 os.path.join(dir, "track%d.wav"), 5645 audiotools.PCMReaderHead(cddareader, length, False), 5646 total_pcm_frames=length)) 5647 5648 # ensure DiscID from tracks matches 5649 self.assertEqual(str(disc_id_obj.from_tracks(tracks)), 5650 str(disc_id_obj.from_cddareader(cddareader))) 5651 5652 # open cuesheet as a Sheet object 5653 sheet = audiotools.read_sheet(os.path.join(dir, "CDImage.cue")) 5654 5655 # ensure DiscID from sheet matches 5656 self.assertEqual(str(disc_id_obj.from_sheet(sheet, 5657 sum(track_lengths), 5658 44100)), 5659 str(disc_id_obj.from_cddareader(cddareader))) 5660 finally: 5661 rmtree(dir) 5662 5663 def __test_disc_id__(self, disc_id_obj, 5664 total_length, 5665 cuesheet, 5666 disc_id): 5667 from audiotools.cdio import CDDAReader 5668 from shutil import rmtree 5669 5670 self.assertTrue(isinstance(cuesheet, bytes)) 5671 5672 dir = tempfile.mkdtemp() 5673 try: 5674 # dump cuesheet to temporary directory 5675 f = open(os.path.join(dir, "CDImage.cue"), "wb") 5676 f.write(cuesheet) 5677 f.close() 5678 5679 # build CD image from total length 5680 f = open(os.path.join(dir, "CDImage.bin"), "wb") 5681 f.write(b"\x00" * 2 * 2 * total_length) 5682 f.close() 5683 5684 # open disc image with CDDAReader 5685 cddareader = CDDAReader(os.path.join(dir, "CDImage.cue")) 5686 5687 # ensure DiscID from CDDAReader matches 5688 self.assertEqual(str(disc_id_obj.from_cddareader(cddareader)), 5689 disc_id) 5690 5691 # open cuesheet as a Sheet object 5692 sheet = audiotools.read_sheet(os.path.join(dir, "CDImage.cue")) 5693 5694 # ensure DiscID from sheet matches 5695 self.assertEqual(str(disc_id_obj.from_sheet(sheet, 5696 total_length, 5697 44100)), 5698 str(disc_id_obj.from_cddareader(cddareader))) 5699 finally: 5700 rmtree(dir) 5701 5702 @LIB_FREEDB 5703 def test_testcd(self): 5704 from audiotools import read_sheet 5705 from audiotools.freedb import DiscID 5706 from audiotools.cdio import CDDAReader 5707 from shutil import rmtree 5708 5709 # use the official FreeDB test disc to verify ID calculation 5710 5711 testdir = tempfile.mkdtemp() 5712 try: 5713 # dump CD image to temporary directory 5714 5715 with open(os.path.join(testdir, "CDImage.cue"), "wb") as f: 5716 f.write(b"FILE \"CDImage.bin\" BINARY\r\n" + 5717 b"TRACK 01 AUDIO\r\nINDEX 00 00:00:00\r\n" + 5718 b"INDEX 01 00:01:71\r\n") 5719 5720 with open(os.path.join(testdir, "CDImage.bin"), "wb") as f: 5721 f.write(b"\x00" * 60328800) 5722 5723 # ensure DiscID works from CDDAReader 5724 discid = DiscID.from_cddareader( 5725 CDDAReader(os.path.join(testdir, "CDImage.cue"))) 5726 5727 self.assertEqual(discid.__unicode__(), u"03015501") 5728 self.assertEqual(discid.track_count, 1) 5729 self.assertEqual(discid.offsets, [296]) 5730 self.assertEqual(discid.playable_length, 344) 5731 5732 # ensure DiscID works from cuesheet + length 5733 discid = DiscID.from_sheet( 5734 sheet=read_sheet(os.path.join(testdir, "CDImage.cue")), 5735 total_pcm_frames=60328800 // 4, 5736 sample_rate=44100) 5737 5738 self.assertEqual(discid.__unicode__(), u"03015501") 5739 self.assertEqual(discid.track_count, 1) 5740 self.assertEqual(discid.offsets, [296]) 5741 self.assertEqual(discid.playable_length, 344) 5742 finally: 5743 rmtree(testdir) 5744 5745 @LIB_FREEDB 5746 def test_discid(self): 5747 from audiotools.freedb import DiscID 5748 5749 with open("freedb_test_discid-1.cue", "rb") as cue: 5750 self.__test_disc_id_tracks__( 5751 disc_id_obj=DiscID, 5752 track_lengths=[7939176, 4799256, 6297480, 5383140, 5753 5246136, 5052684, 5013876], 5754 cuesheet=cue.read(), 5755 disc_id="5A038407") 5756 5757 with open("freedb_test_discid-2.cue", "rb") as cue: 5758 self.__test_disc_id_tracks__( 5759 disc_id_obj=DiscID, 5760 track_lengths=[1339464, 4048380, 692076, 10600464, 10602816, 5761 1178940, 7454664, 2664816, 989604, 7008960, 5762 9632616, 1070160, 6094620, 1622880, 13361124, 5763 403956, 5208504, 7373520, 483336, 12012840, 5764 8534820, 439824, 7626360, 1262436, 4874520, 5765 398664, 11229036, 483924, 9003456, 883764, 5766 5018580], 5767 cuesheet=cue.read(), 5768 disc_id="BE0D9A1F") 5769 5770 with open("freedb_test_discid-3.cue", "rb") as cue: 5771 self.__test_disc_id__( 5772 disc_id_obj=DiscID, 5773 total_length=190928304, 5774 cuesheet=cue.read(), 5775 disc_id="A610E90A") 5776 5777 with open("freedb_test_discid-4.cue", "rb") as cue: 5778 self.__test_disc_id__( 5779 disc_id_obj=DiscID, 5780 total_length=127937040, 5781 cuesheet=cue.read(), 5782 disc_id="CE0AD30E") 5783 5784 with open("freedb_test_discid-5.cue", "rb") as cue: 5785 self.__test_disc_id__( 5786 disc_id_obj=DiscID, 5787 total_length=119882616, 5788 cuesheet=cue.read(), 5789 disc_id="FC0A9E14") 5790 5791 5792class Test_MusicBrainz(Test_FreeDB): 5793 @LIB_MUSICBRAINZ 5794 def test_testcd(self): 5795 # test disc constructed from MusicBrainz's documentation 5796 5797 from audiotools.musicbrainz import DiscID 5798 from audiotools.freedb import DiscID as FDiscID 5799 from audiotools.cdio import CDDAReader 5800 from audiotools import read_sheet 5801 from shutil import rmtree 5802 5803 testdir = tempfile.mkdtemp() 5804 try: 5805 # dump CD image to temporary directory 5806 with open(os.path.join(testdir, "CDImage.cue"), "wb") as f: 5807 f.write(b'FILE "CDImage.bin" BINARY\n ' + 5808 b'TRACK 01 AUDIO\nINDEX 01 00:00:00\n ' + 5809 b'TRACK 02 AUDIO\n INDEX 01 03:22:63\n ' + 5810 b'TRACK 03 AUDIO\n INDEX 01 07:08:64\n ' + 5811 b'TRACK 04 AUDIO\n INDEX 01 10:19:17\n ' + 5812 b'TRACK 05 AUDIO\n INDEX 01 14:03:39\n ' + 5813 b'TRACK 06 AUDIO\n INDEX 01 17:51:14\n') 5814 5815 with open(os.path.join(testdir, "CDImage.bin"), "wb") as f: 5816 f.write(b"\x00" * 224173824) 5817 5818 # ensure DiscID works with CDDAReader 5819 cddareader = CDDAReader(os.path.join(testdir, "CDImage.cue")) 5820 5821 discid = DiscID.from_cddareader(cddareader) 5822 5823 self.assertEqual(discid.__unicode__(), 5824 u"49HHV7Eb8UKF3aQiNmu1GR8vKTY-") 5825 5826 self.assertEqual(FDiscID.from_cddareader(cddareader).__unicode__(), 5827 u"3404F606") 5828 5829 # ensure DiscID works with cuesheet + length 5830 discid = DiscID.from_sheet( 5831 sheet=read_sheet(os.path.join(testdir, "CDImage.cue")), 5832 total_pcm_frames=224173824 // 4, 5833 sample_rate=44100) 5834 5835 self.assertEqual(discid.__unicode__(), 5836 u"49HHV7Eb8UKF3aQiNmu1GR8vKTY-") 5837 5838 self.assertEqual(FDiscID.from_cddareader(cddareader).__unicode__(), 5839 u"3404F606") 5840 finally: 5841 rmtree(testdir) 5842 5843 @LIB_MUSICBRAINZ 5844 def test_discid(self): 5845 from audiotools.musicbrainz import DiscID 5846 5847 with open("freedb_test_discid-1.cue", "rb") as cue: 5848 self.__test_disc_id_tracks__( 5849 disc_id_obj=DiscID, 5850 track_lengths=[7939176, 4799256, 6297480, 5383140, 5851 5246136, 5052684, 5013876], 5852 cuesheet=cue.read(), 5853 disc_id="SJco4q4a9rzKdBw7HcFvBQugKc8-") 5854 5855 with open("freedb_test_discid-2.cue", "rb") as cue: 5856 self.__test_disc_id_tracks__( 5857 disc_id_obj=DiscID, 5858 track_lengths=[1339464, 4048380, 692076, 10600464, 10602816, 5859 1178940, 7454664, 2664816, 989604, 7008960, 5860 9632616, 1070160, 6094620, 1622880, 13361124, 5861 403956, 5208504, 7373520, 483336, 12012840, 5862 8534820, 439824, 7626360, 1262436, 4874520, 5863 398664, 11229036, 483924, 9003456, 883764, 5864 5018580], 5865 cuesheet=cue.read(), 5866 disc_id="yrelpXuXXP2WKDpTUqrS62keIFE-") 5867 5868 with open("freedb_test_discid-3.cue", "rb") as cue: 5869 self.__test_disc_id__( 5870 disc_id_obj=DiscID, 5871 total_length=190928304, 5872 cuesheet=cue.read(), 5873 disc_id="naJ8mpfbMHx_qQnJbyRx4lE_h4E-") 5874 5875 with open("freedb_test_discid-4.cue", "rb") as cue: 5876 self.__test_disc_id__( 5877 disc_id_obj=DiscID, 5878 total_length=127937040, 5879 cuesheet=cue.read(), 5880 disc_id="1o5aDeltYCEwCecU1cMMi1cvees-") 5881 5882 with open("freedb_test_discid-5.cue", "rb") as cue: 5883 self.__test_disc_id__( 5884 disc_id_obj=DiscID, 5885 total_length=119882616, 5886 cuesheet=cue.read(), 5887 disc_id="aS0RfXDrxs718yypC2AlgpNEIE0-") 5888 5889 5890class Test_Accuraterip(Test_FreeDB): 5891 @LIB_ACCURATERIP 5892 def test_testcd(self): 5893 # not sure if there is an AccurateRip test disc 5894 5895 self.assertTrue(True) 5896 5897 @LIB_ACCURATERIP 5898 def test_discid(self): 5899 from audiotools.accuraterip import DiscID 5900 5901 with open("freedb_test_discid-1.cue", "rb") as cue: 5902 self.__test_disc_id_tracks__( 5903 disc_id_obj=DiscID, 5904 track_lengths=[7939176, 4799256, 6297480, 5383140, 5905 5246136, 5052684, 5013876], 5906 cuesheet=cue.read(), 5907 disc_id="dBAR-007-00045db7-0019b8d8-5a038407.bin") 5908 5909 with open("freedb_test_discid-2.cue", "rb") as cue: 5910 self.__test_disc_id_tracks__( 5911 disc_id_obj=DiscID, 5912 track_lengths=[1339464, 4048380, 692076, 10600464, 10602816, 5913 1178940, 7454664, 2664816, 989604, 7008960, 5914 9632616, 1070160, 6094620, 1622880, 13361124, 5915 403956, 5208504, 7373520, 483336, 12012840, 5916 8534820, 439824, 7626360, 1262436, 4874520, 5917 398664, 11229036, 483924, 9003456, 883764, 5918 5018580], 5919 cuesheet=cue.read(), 5920 disc_id="dBAR-031-003fee31-058c64b9-be0d9a1f.bin") 5921 5922 with open("freedb_test_discid-3.cue", "rb") as cue: 5923 self.__test_disc_id__( 5924 disc_id_obj=DiscID, 5925 total_length=190928304, 5926 cuesheet=cue.read(), 5927 disc_id="dBAR-010-00193b54-00c9f723-a610e90a.bin") 5928 5929 with open("freedb_test_discid-4.cue", "rb") as cue: 5930 self.__test_disc_id__( 5931 disc_id_obj=DiscID, 5932 total_length=127937040, 5933 cuesheet=cue.read(), 5934 disc_id="dBAR-014-001977f9-01097144-ce0ad30e.bin") 5935 5936 with open("freedb_test_discid-5.cue", "rb") as cue: 5937 self.__test_disc_id__( 5938 disc_id_obj=DiscID, 5939 total_length=119882616, 5940 cuesheet=cue.read(), 5941 disc_id="dBAR-020-001d4d13-01bab5f6-fc0a9e14.bin") 5942 5943 @LIB_ACCURATERIP 5944 def test_checksum(self): 5945 from audiotools.accuraterip import Checksum 5946 from test_streams import Simple_Sine, Generate02 5947 5948 # sanity checking for initial options 5949 self.assertRaises(ValueError, 5950 Checksum, 5951 total_pcm_frames=10, 5952 sample_rate=0, 5953 is_first=False, 5954 is_last=False, 5955 accurateripv2_offset=0) 5956 5957 self.assertRaises(ValueError, 5958 Checksum, 5959 total_pcm_frames=10, 5960 sample_rate=-1, 5961 is_first=False, 5962 is_last=False, 5963 accurateripv2_offset=0) 5964 5965 self.assertRaises(ValueError, 5966 Checksum, 5967 total_pcm_frames=0, 5968 sample_rate=44100, 5969 is_first=False, 5970 is_last=False, 5971 accurateripv2_offset=0) 5972 5973 self.assertRaises(ValueError, 5974 Checksum, 5975 total_pcm_frames=-1, 5976 sample_rate=44100, 5977 is_first=False, 5978 is_last=False, 5979 accurateripv2_offset=0) 5980 5981 self.assertRaises(ValueError, 5982 Checksum, 5983 total_pcm_frames=-1, 5984 sample_rate=-1, 5985 is_first=False, 5986 is_last=False, 5987 accurateripv2_offset=0) 5988 5989 self.assertRaises(ValueError, 5990 Checksum, 5991 total_pcm_frames=10, 5992 sample_rate=44100, 5993 is_first=False, 5994 is_last=False, 5995 pcm_frame_range=0, 5996 accurateripv2_offset=0) 5997 5998 self.assertRaises(ValueError, 5999 Checksum, 6000 total_pcm_frames=10, 6001 sample_rate=44100, 6002 is_first=False, 6003 is_last=False, 6004 pcm_frame_range=-1, 6005 accurateripv2_offset=0) 6006 6007 self.assertRaises(ValueError, 6008 Checksum, 6009 total_pcm_frames=10, 6010 sample_rate=44100, 6011 is_first=False, 6012 is_last=False, 6013 accurateripv2_offset=-1) 6014 6015 checksum = Checksum(total_pcm_frames=200000, 6016 sample_rate=44100, 6017 is_first=False, 6018 is_last=False, 6019 pcm_frame_range=1) 6020 6021 for v in [None, 0, 1, "foo", "bar"]: 6022 self.assertRaises(TypeError, 6023 checksum.update, 6024 v) 6025 6026 # sanity checking for stream parameters 6027 for params in [[200000, 44100, 0x04, 8, (25, 10000)], # 8bps 1ch 6028 [200000, 44100, 0x03, 8, (25, 10000), 6029 (50, 20000)], # 8bps 2ch 6030 [200000, 44100, 0x07, 8, (25, 10000), 6031 (50, 20000), 6032 (120, 30000)], # 8bps 3ch 6033 [200000, 44100, 0x04, 16, (6400, 10000)], # 16bps 1ch 6034 [200000, 44100, 0x07, 16, (6400, 10000), 6035 (12800, 20000), 6036 (30720, 30000)], # 16bps 3ch 6037 [200000, 44100, 0x04, 24, 6038 (1638400, 10000)], # 24bps 1ch 6039 [200000, 44100, 0x03, 24, 6040 (1638400, 10000), 6041 (3276800, 20000)], # 24bps 2ch 6042 [200000, 44100, 0x07, 24, 6043 (1638400, 10000), 6044 (3276800, 20000), 6045 (7864320, 30000)]]: # 24bps 3ch 6046 sine = Simple_Sine(*params) 6047 checksum = Checksum(total_pcm_frames=200000, 6048 sample_rate=44100, 6049 is_first=False, 6050 is_last=False, 6051 pcm_frame_range=1) 6052 self.assertRaises(ValueError, 6053 audiotools.transfer_data, 6054 sine.read, checksum.update) 6055 6056 # ensure very short streams work correctly 6057 # whether middle, first, last or only track 6058 short_track = Checksum(total_pcm_frames=1, 6059 sample_rate=44100, 6060 is_first=False, 6061 is_last=False, 6062 pcm_frame_range=1) 6063 audiotools.transfer_data( 6064 Generate02(44100).read, short_track.update) 6065 self.assertEqual(short_track.checksums_v1(), [0x7FFF8000]) 6066 self.assertEqual(short_track.checksum_v2(), 0x7FFF8000) 6067 6068 short_track = Checksum(total_pcm_frames=1, 6069 sample_rate=44100, 6070 is_first=True, 6071 is_last=False, 6072 pcm_frame_range=1) 6073 audiotools.transfer_data( 6074 Generate02(44100).read, short_track.update) 6075 self.assertEqual(short_track.checksums_v1(), [0]) 6076 self.assertEqual(short_track.checksum_v2(), 0) 6077 6078 short_track = Checksum(total_pcm_frames=1, 6079 sample_rate=44100, 6080 is_first=False, 6081 is_last=True, 6082 pcm_frame_range=1) 6083 audiotools.transfer_data( 6084 Generate02(44100).read, short_track.update) 6085 self.assertEqual(short_track.checksums_v1(), [0]) 6086 self.assertEqual(short_track.checksum_v2(), 0) 6087 6088 short_track = Checksum(total_pcm_frames=1, 6089 sample_rate=44100, 6090 is_first=True, 6091 is_last=True, 6092 pcm_frame_range=1) 6093 audiotools.transfer_data( 6094 Generate02(44100).read, short_track.update) 6095 self.assertEqual(short_track.checksums_v1(), [0]) 6096 self.assertEqual(short_track.checksum_v2(), 0) 6097 6098 track = audiotools.open("tone.flac") 6099 6100 # ensure various checksum range options work correctly 6101 # values taken from reference implementation 6102 6103 middle_track = Checksum(total_pcm_frames=track.total_frames(), 6104 sample_rate=track.sample_rate(), 6105 is_first=False, 6106 is_last=False) 6107 with track.to_pcm() as pcmreader: 6108 audiotools.transfer_data(pcmreader.read, middle_track.update) 6109 self.assertEqual(middle_track.checksums_v1(), [0xF6E4AD26]) 6110 self.assertEqual(middle_track.checksum_v2(), 0x4781FC37) 6111 6112 middle_track = Checksum(total_pcm_frames=track.total_frames(), 6113 sample_rate=track.sample_rate(), 6114 is_first=False, 6115 is_last=False, 6116 pcm_frame_range=3, 6117 accurateripv2_offset=1) 6118 with audiotools.PCMReaderWindow(track.to_pcm(), 6119 -1, 6120 track.total_frames() + 2) as pcmreader: 6121 audiotools.transfer_data(pcmreader.read, middle_track.update) 6122 self.assertEqual(middle_track.checksums_v1(), [0xCA705E69, 6123 0xF6E4AD26, 6124 0x951FB12F]) 6125 self.assertEqual(middle_track.checksum_v2(), 0x4781FC37) 6126 # self.assertEqual(middle_track.checksums_v2(), [0x1B0BE28C, 6127 # 0x4781FC37, 6128 # 0xE5B9A28E]) 6129 6130 first_track = Checksum(total_pcm_frames=track.total_frames(), 6131 sample_rate=track.sample_rate(), 6132 is_first=True, 6133 is_last=False) 6134 with track.to_pcm() as pcmreader: 6135 audiotools.transfer_data(pcmreader.read, first_track.update) 6136 self.assertEqual(first_track.checksums_v1(), [0xEE4DBEB4]) 6137 self.assertEqual(first_track.checksum_v2(), 0x3ECA2C04) 6138 6139 first_track = Checksum(total_pcm_frames=track.total_frames(), 6140 sample_rate=track.sample_rate(), 6141 is_first=True, 6142 is_last=False, 6143 pcm_frame_range=3, 6144 accurateripv2_offset=1) 6145 with audiotools.PCMReaderWindow(track.to_pcm(), 6146 -1, 6147 track.total_frames() + 2) as pcmreader: 6148 audiotools.transfer_data(pcmreader.read, first_track.update) 6149 self.assertEqual(first_track.checksums_v1(), [0x7CC66A55, 6150 0xEE4DBEB4, 6151 0x9A58C7EC]) 6152 self.assertEqual(first_track.checksum_v2(), 0x3ECA2C04) 6153 # self.assertEqual(first_track.checksums_v2(), [0xCD410A97, 6154 # 0x3ECA2C04, 6155 # 0xEAD1D8AA]) 6156 6157 last_track = Checksum(total_pcm_frames=track.total_frames(), 6158 sample_rate=track.sample_rate(), 6159 is_first=False, 6160 is_last=True) 6161 with track.to_pcm() as pcmreader: 6162 audiotools.transfer_data(pcmreader.read, last_track.update) 6163 self.assertEqual(last_track.checksums_v1(), [0xF819E862]) 6164 self.assertEqual(last_track.checksum_v2(), 0x222E32FA) 6165 6166 last_track = Checksum(total_pcm_frames=track.total_frames(), 6167 sample_rate=track.sample_rate(), 6168 is_first=False, 6169 is_last=True, 6170 pcm_frame_range=3, 6171 accurateripv2_offset=1) 6172 with audiotools.PCMReaderWindow(track.to_pcm(), 6173 -1, 6174 track.total_frames() + 2) as pcmreader: 6175 audiotools.transfer_data(pcmreader.read, last_track.update) 6176 self.assertEqual(last_track.checksums_v1(), [0x682F9316, 6177 0xF819E862, 6178 0x00DBAF4E]) 6179 self.assertEqual(last_track.checksum_v2(), 0x222E32FA) 6180 # self.assertEqual(last_track.checksums_v2(), [0x92419BB8, 6181 # 0x222E32FA, 6182 # 0x2AF10061]) 6183 6184 only_track = Checksum(total_pcm_frames=track.total_frames(), 6185 sample_rate=track.sample_rate(), 6186 is_first=True, 6187 is_last=True) 6188 with track.to_pcm() as pcmreader: 6189 audiotools.transfer_data(pcmreader.read, only_track.update) 6190 self.assertEqual(only_track.checksums_v1(), [0xEF82F9F0]) 6191 self.assertEqual(only_track.checksum_v2(), 0x197662C7) 6192 6193 only_track = Checksum(total_pcm_frames=track.total_frames(), 6194 sample_rate=track.sample_rate(), 6195 is_first=True, 6196 is_last=True, 6197 pcm_frame_range=3, 6198 accurateripv2_offset=1) 6199 with audiotools.PCMReaderWindow(track.to_pcm(), 6200 -1, 6201 track.total_frames() + 2) as pcmreader: 6202 audiotools.transfer_data(pcmreader.read, only_track.update) 6203 self.assertEqual(only_track.checksums_v1(), [0x1A859F02, 6204 0xEF82F9F0, 6205 0x0614C60B]) 6206 self.assertEqual(only_track.checksum_v2(), 0x197662C7) 6207 # self.assertEqual(only_track.checksums_v2(), [0x4476C3C3, 6208 # 0x197662C7, 6209 # 0x3009367D]) 6210 6211 # ensure feeding checksum with not enough samples 6212 # raises ValueError at checksums()-time 6213 insufficient_samples = Checksum( 6214 total_pcm_frames=track.total_frames() + 1, 6215 sample_rate=track.sample_rate(), 6216 is_first=False, 6217 is_last=False, 6218 pcm_frame_range=1) 6219 with track.to_pcm() as pcmreader: 6220 audiotools.transfer_data(pcmreader.read, 6221 insufficient_samples.update) 6222 self.assertRaises(ValueError, insufficient_samples.checksums_v1) 6223 self.assertRaises(ValueError, insufficient_samples.checksum_v2) 6224 6225 # ensure insufficient samples also works with a range 6226 insufficient_samples = Checksum( 6227 total_pcm_frames=track.total_frames(), 6228 sample_rate=track.sample_rate(), 6229 is_first=False, 6230 is_last=False, 6231 pcm_frame_range=2, 6232 accurateripv2_offset=1) 6233 with track.to_pcm() as pcmreader: 6234 audiotools.transfer_data(pcmreader.read, 6235 insufficient_samples.update) 6236 self.assertRaises(ValueError, insufficient_samples.checksums_v1) 6237 self.assertRaises(ValueError, insufficient_samples.checksum_v2) 6238 6239 # ensure feeding checksum with too many samples 6240 # raises ValueError at update()-time 6241 too_many_samples = Checksum( 6242 total_pcm_frames=track.total_frames() - 1, 6243 sample_rate=track.sample_rate(), 6244 is_first=False, 6245 is_last=False, 6246 pcm_frame_range=1) 6247 with track.to_pcm() as pcmreader: 6248 self.assertRaises(ValueError, 6249 audiotools.transfer_data, 6250 pcmreader.read, 6251 too_many_samples.update) 6252 6253 # ensure too many samples also works with a range 6254 too_many_samples = Checksum( 6255 total_pcm_frames=track.total_frames() - 2, 6256 sample_rate=track.sample_rate(), 6257 is_first=False, 6258 is_last=False, 6259 pcm_frame_range=2) 6260 with track.to_pcm() as pcmreader: 6261 self.assertRaises(ValueError, 6262 audiotools.transfer_data, 6263 pcmreader.read, 6264 too_many_samples.update) 6265 6266 @LIB_ACCURATERIP 6267 def test_perform_lookup(self): 6268 from audiotools.freedb import DiscID as FDiscID 6269 from audiotools.accuraterip import DiscID, perform_lookup 6270 import time 6271 6272 offsets = [0, 2278, 9163, 10340, 28368, 46400, 48405, 6273 61083, 65615, 67298, 79218, 95600, 97420, 6274 107785, 110545, 133268, 133955, 142813, 6275 155353, 156175, 176605, 191120, 191868, 6276 204838, 206985, 215275, 215953, 235050, 6277 235873, 251185, 252688] 6278 6279 freedb_disc_id = FDiscID( 6280 offsets=offsets, 6281 total_length=3482, 6282 track_count=31, 6283 playable_length=3482) 6284 6285 disc_id = DiscID( 6286 track_numbers=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 6287 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 6288 23, 24, 25, 26, 27, 28, 29, 30, 31], 6289 track_offsets=offsets, 6290 lead_out_offset=261223, 6291 freedb_disc_id=freedb_disc_id) 6292 6293 matches = perform_lookup(disc_id) 6294 time.sleep(1) 6295 6296 self.assertEqual(set(matches.keys()), set(range(1, 32))) 6297 6298 @LIB_ACCURATERIP 6299 def test_accuraterip_lookup(self): 6300 import time 6301 6302 lengths = [1339464, 4048380, 692076, 10600464, 10602816, 6303 1178940, 7454664, 2664816, 989604, 7008960, 6304 9632616, 1070160, 6094620, 1622880, 13361124, 6305 403956, 5208504, 7373520, 483336, 12012840, 6306 8534820, 439824, 7626360, 1262436, 4874520, 6307 398664, 11229036, 483924, 9003456, 883764, 5018580] 6308 tempfiles = [tempfile.NamedTemporaryFile(suffix=".flac") 6309 for l in lengths] 6310 6311 tracks = [audiotools.FlacAudio.from_pcm( 6312 t.name, 6313 EXACT_SILENCE_PCM_Reader(pcm_frames=l, 6314 sample_rate=44100, 6315 channels=2, 6316 channel_mask=0x3, 6317 bits_per_sample=16), 6318 total_pcm_frames=l) 6319 for (t, l) in zip(tempfiles, lengths)] 6320 6321 matches = audiotools.accuraterip_lookup(tracks) 6322 time.sleep(1) 6323 6324 for t in tempfiles: 6325 t.close() 6326 6327 self.assertEqual(set(matches.keys()), set(range(1, 32))) 6328 6329 @LIB_ACCURATERIP 6330 def test_accuraterip_sheet_lookup(self): 6331 import time 6332 6333 cuesheet = tempfile.NamedTemporaryFile(suffix=".cue") 6334 with open("freedb_test_discid-2.cue", "rb") as f: 6335 cuesheet.write(f.read()) 6336 cuesheet.flush() 6337 sheet = audiotools.read_sheet(cuesheet.name) 6338 cuesheet.close() 6339 6340 matches = audiotools.accuraterip_sheet_lookup(sheet, 153599124, 44100) 6341 time.sleep(1) 6342 6343 self.assertEqual(set(matches.keys()), set(range(1, 32))) 6344 6345 @LIB_ACCURATERIP 6346 def test_match_offset(self): 6347 from audiotools.accuraterip import match_offset 6348 6349 # no checksums, raise Exception 6350 self.assertRaises(ValueError, 6351 match_offset, 6352 [(1, 1, 0)], 6353 [], 6354 0) 6355 6356 # no AR matches, one checksum, no checksum found 6357 (checksum, 6358 confidence, 6359 offset) = match_offset([], [1], 0) 6360 self.assertEqual(checksum, 1) 6361 self.assertEqual(confidence, None) 6362 self.assertEqual(offset, 0) 6363 6364 # one AR match, one checksum, no checksum found in matches 6365 (checksum, 6366 confidence, 6367 offset) = match_offset([(1, 1, 0)], [10], 0) 6368 self.assertEqual(checksum, 10) 6369 self.assertEqual(confidence, None) 6370 self.assertEqual(offset, 0) 6371 6372 # one AR match, one checksum, one checksum found in matches 6373 (checksum, 6374 confidence, 6375 offset) = match_offset([(1, 1, 0)], [1], 0) 6376 self.assertEqual(checksum, 1) 6377 self.assertEqual(confidence, 1) 6378 self.assertEqual(offset, 0) 6379 6380 # one AR match, multiple checksums, no checksum found in matches 6381 (checksum, 6382 confidence, 6383 offset) = match_offset([(1, 1, 0)], [2, 3, 4], -1) 6384 self.assertEqual(checksum, 3) 6385 self.assertEqual(confidence, None) 6386 self.assertEqual(offset, 0) 6387 6388 # one AR match, multiple checksums, one checksum found in matches 6389 (checksum, 6390 confidence, 6391 offset) = match_offset([(1, 1, 0)], [99, 0, 1], -1) 6392 self.assertEqual(checksum, 1) 6393 self.assertEqual(confidence, 1) 6394 self.assertEqual(offset, 1) 6395 6396 # multiple AR matches, one checksum, no checksum found in matches 6397 (checksum, 6398 confidence, 6399 offset) = match_offset([(1, 1, 0), (2, 2, 0)], [3], 0) 6400 self.assertEqual(checksum, 3) 6401 self.assertEqual(confidence, None) 6402 self.assertEqual(offset, 0) 6403 6404 # multiple AR matches, one checksum, one checksum found in matches 6405 (checksum, 6406 confidence, 6407 offset) = match_offset([(1, 1, 0), (2, 2, 0)], [2], 0) 6408 self.assertEqual(checksum, 2) 6409 self.assertEqual(confidence, 2) 6410 self.assertEqual(offset, 0) 6411 6412 # multiple AR matches, multiple checksums, no checksums found 6413 (checksum, 6414 confidence, 6415 offset) = match_offset([(1, 1, 0), (2, 2, 0)], [3, 4, 5], -1) 6416 self.assertEqual(checksum, 4) 6417 self.assertEqual(confidence, None) 6418 self.assertEqual(offset, 0) 6419 6420 # multiple AR matches, multiple checksums, one match found 6421 (checksum, 6422 confidence, 6423 offset) = match_offset([(1, 1, 0), (2, 2, 0)], [98, 99, 2], -1) 6424 self.assertEqual(checksum, 2) 6425 self.assertEqual(confidence, 2) 6426 self.assertEqual(offset, 1) 6427 6428 # multiple AR matches, multiple checksums, multiple matches found 6429 (checksum, 6430 confidence, 6431 offset) = match_offset([(1, 1, 0), (2, 2, 0)], [0, 1, 2], -1) 6432 self.assertEqual(checksum, 2) 6433 self.assertEqual(confidence, 2) 6434 self.assertEqual(offset, 1) 6435 6436 6437class Test_Lookup(unittest.TestCase): 6438 @LIB_FREEDB 6439 @LIB_MUSICBRAINZ 6440 def test_metadata_lookup(self): 6441 from audiotools.freedb import DiscID as FDiscID 6442 from audiotools.musicbrainz import DiscID as MDiscID 6443 import time 6444 6445 freedb_disc_id = FDiscID( 6446 offsets=[150, 2428, 9313, 10490, 28518, 46550, 48555, 61233, 6447 65765, 67448, 79368, 95750, 97570, 107935, 110695, 6448 133418, 134105, 142963, 155503, 156325, 176755, 6449 191270, 192018, 204988, 207135, 215425, 216103, 6450 235200, 236023, 251335, 252838], 6451 total_length=3482, 6452 track_count=31, 6453 playable_length=3482) 6454 musicbrainz_disc_id = MDiscID( 6455 first_track_number=1, 6456 last_track_number=31, 6457 lead_out_offset=261373, 6458 offsets=[150, 2428, 9313, 10490, 28518, 46550, 48555, 61233, 6459 65765, 67448, 79368, 95750, 97570, 107935, 110695, 6460 133418, 134105, 142963, 155503, 156325, 176755, 6461 191270, 192018, 204988, 207135, 215425, 216103, 6462 235200, 236023, 251335, 252838]) 6463 total_tracks = 31 6464 6465 self.assertEqual(str(freedb_disc_id), 6466 "BE0D9A1F") 6467 self.assertEqual(str(musicbrainz_disc_id), 6468 "yrelpXuXXP2WKDpTUqrS62keIFE-") 6469 6470 # since the contents of lookup services 6471 # can change over time, all we can really verify 6472 # is the track count 6473 for choice in audiotools.metadata_lookup( 6474 musicbrainz_disc_id=musicbrainz_disc_id, 6475 freedb_disc_id=freedb_disc_id): 6476 self.assertEqual(len(choice), total_tracks) 6477 time.sleep(1) 6478 6479 @LIB_FREEDB 6480 @LIB_MUSICBRAINZ 6481 def test_cddareader_lookup(self): 6482 from shutil import rmtree 6483 from audiotools.cdio import CDDAReader 6484 from audiotools.freedb import DiscID as FDiscID 6485 from audiotools.musicbrainz import DiscID as MDiscID 6486 import time 6487 6488 dir = tempfile.mkdtemp() 6489 try: 6490 # dump cuesheet to file 6491 with open(os.path.join(dir, "CDImage.cue"), "wb") as w: 6492 with open("freedb_test_discid-2.cue", "rb") as r: 6493 w.write(r.read()) 6494 6495 # dump CD image to file 6496 with open(os.path.join(dir, "CDImage.bin"), "wb") as w: 6497 w.write(b"\x00" * 2 * 2 * 153599124) 6498 6499 r = CDDAReader(os.path.join(dir, "CDImage.cue")) 6500 6501 freedb_disc_id = FDiscID.from_cddareader(r) 6502 musicbrainz_disc_id = MDiscID.from_cddareader(r) 6503 total_tracks = 31 6504 6505 self.assertEqual(str(freedb_disc_id), 6506 "BE0D9A1F") 6507 self.assertEqual(str(musicbrainz_disc_id), 6508 "yrelpXuXXP2WKDpTUqrS62keIFE-") 6509 6510 # since the contents of lookup services 6511 # can change over time, all we can really verify 6512 # is the track count 6513 for choice in audiotools.metadata_lookup( 6514 musicbrainz_disc_id=musicbrainz_disc_id, 6515 freedb_disc_id=freedb_disc_id): 6516 self.assertEqual(len(choice), total_tracks) 6517 time.sleep(1) 6518 finally: 6519 rmtree(dir) 6520 6521 @LIB_FREEDB 6522 @LIB_MUSICBRAINZ 6523 def test_track_lookup(self): 6524 from audiotools.freedb import DiscID as FDiscID 6525 from audiotools.musicbrainz import DiscID as MDiscID 6526 import time 6527 6528 lengths = [1339464, 4048380, 692076, 10600464, 10602816, 6529 1178940, 7454664, 2664816, 989604, 7008960, 6530 9632616, 1070160, 6094620, 1622880, 13361124, 6531 403956, 5208504, 7373520, 483336, 12012840, 6532 8534820, 439824, 7626360, 1262436, 4874520, 6533 398664, 11229036, 483924, 9003456, 883764, 5018580] 6534 tempfiles = [tempfile.NamedTemporaryFile(suffix=".flac") 6535 for l in lengths] 6536 6537 tracks = [audiotools.FlacAudio.from_pcm( 6538 t.name, 6539 EXACT_SILENCE_PCM_Reader(pcm_frames=l, 6540 sample_rate=44100, 6541 channels=2, 6542 channel_mask=0x3, 6543 bits_per_sample=16), 6544 total_pcm_frames=l) 6545 for (t, l) in zip(tempfiles, lengths)] 6546 6547 freedb_disc_id = FDiscID.from_tracks(tracks) 6548 musicbrainz_disc_id = MDiscID.from_tracks(tracks) 6549 total_tracks = 31 6550 6551 self.assertEqual(str(freedb_disc_id), 6552 "BE0D9A1F") 6553 self.assertEqual(str(musicbrainz_disc_id), 6554 "yrelpXuXXP2WKDpTUqrS62keIFE-") 6555 6556 # since the contents of lookup services 6557 # can change over time, all we can really verify 6558 # is the track count 6559 for choice in audiotools.metadata_lookup( 6560 musicbrainz_disc_id=musicbrainz_disc_id, 6561 freedb_disc_id=freedb_disc_id): 6562 self.assertEqual(len(choice), total_tracks) 6563 time.sleep(1) 6564 6565 for t in tempfiles: 6566 t.close() 6567 6568 @LIB_FREEDB 6569 @LIB_MUSICBRAINZ 6570 def test_sheet_lookup(self): 6571 from audiotools.freedb import DiscID as FDiscID 6572 from audiotools.musicbrainz import DiscID as MDiscID 6573 import time 6574 6575 with tempfile.NamedTemporaryFile(suffix=".cue") as cuesheet: 6576 with open("freedb_test_discid-2.cue", "rb") as r: 6577 cuesheet.write(r.read()) 6578 cuesheet.flush() 6579 sheet = audiotools.read_sheet(cuesheet.name) 6580 6581 freedb_disc_id = FDiscID.from_sheet(sheet, 153599124, 44100) 6582 musicbrainz_disc_id = MDiscID.from_sheet(sheet, 153599124, 44100) 6583 total_tracks = 31 6584 6585 self.assertEqual(str(freedb_disc_id), 6586 "BE0D9A1F") 6587 self.assertEqual(str(musicbrainz_disc_id), 6588 "yrelpXuXXP2WKDpTUqrS62keIFE-") 6589 6590 # since the contents of lookup services 6591 # can change over time, all we can really verify 6592 # is the track count 6593 for choice in audiotools.metadata_lookup( 6594 musicbrainz_disc_id=musicbrainz_disc_id, 6595 freedb_disc_id=freedb_disc_id): 6596 self.assertEqual(len(choice), total_tracks) 6597 time.sleep(1) 6598 6599 6600class Test_Ogg(unittest.TestCase): 6601 @LIB_OGG 6602 def test_roundtrip(self): 6603 import audiotools.ogg 6604 6605 for packet_len in range(0, 1000): 6606 packet = os.urandom(packet_len) 6607 ogg_stream = BytesIO() 6608 self.assertEqual(packet_len, len(packet)) 6609 6610 ogg_writer = audiotools.ogg.PageWriter(ogg_stream) 6611 for page in audiotools.ogg.packet_to_pages(packet, 1234): 6612 ogg_writer.write(page) 6613 ogg_writer.flush() 6614 6615 ogg_stream.seek(0) 6616 6617 ogg_reader = audiotools.ogg.PacketReader( 6618 audiotools.ogg.PageReader(ogg_stream)) 6619 6620 self.assertEqual(packet, ogg_reader.read_packet()) 6621 6622 ogg_writer.close() 6623 ogg_reader.close() 6624 6625 6626class Test_Image(unittest.TestCase): 6627 @LIB_IMAGE 6628 def test_metrics(self): 6629 from audiotools.image import image_metrics 6630 6631 with open("image_test_metrics-1.jpg", "rb") as f: 6632 jpeg = image_metrics(f.read()) 6633 self.assertEqual(jpeg.width, 3) 6634 self.assertEqual(jpeg.height, 2) 6635 self.assertEqual(jpeg.bits_per_pixel, 24) 6636 self.assertEqual(jpeg.color_count, 0) 6637 self.assertEqual(jpeg.mime_type, "image/jpeg") 6638 6639 with open("image_test_metrics-2.png", "rb") as f: 6640 png1 = image_metrics(f.read()) 6641 self.assertEqual(png1.width, 3) 6642 self.assertEqual(png1.height, 2) 6643 self.assertEqual(png1.bits_per_pixel, 24) 6644 self.assertEqual(png1.color_count, 0) 6645 self.assertEqual(png1.mime_type, "image/png") 6646 6647 with open("image_test_metrics-3.png", "rb") as f: 6648 png2 = image_metrics(f.read()) 6649 self.assertEqual(png2.width, 3) 6650 self.assertEqual(png2.height, 2) 6651 self.assertEqual(png2.bits_per_pixel, 8) 6652 self.assertEqual(png2.color_count, 1) 6653 self.assertEqual(png2.mime_type, "image/png") 6654 6655 with open("image_test_metrics-4.gif", "rb") as f: 6656 gif = image_metrics(f.read()) 6657 self.assertEqual(gif.width, 3) 6658 self.assertEqual(gif.height, 2) 6659 self.assertEqual(gif.bits_per_pixel, 8) 6660 self.assertEqual(gif.color_count, 2) 6661 self.assertEqual(gif.mime_type, "image/gif") 6662 6663 with open("image_test_metrics-5.bmp", "rb") as f: 6664 bmp = image_metrics(f.read()) 6665 self.assertEqual(bmp.width, 3) 6666 self.assertEqual(bmp.height, 2) 6667 self.assertEqual(bmp.bits_per_pixel, 24) 6668 self.assertEqual(bmp.color_count, 0) 6669 self.assertEqual(bmp.mime_type, "image/x-ms-bmp") 6670 6671 with open("image_test_metrics-6.tiff", "rb") as f: 6672 tiff = image_metrics(f.read()) 6673 self.assertEqual(tiff.width, 3) 6674 self.assertEqual(tiff.height, 2) 6675 self.assertEqual(tiff.bits_per_pixel, 24) 6676 self.assertEqual(tiff.color_count, 0) 6677 self.assertEqual(tiff.mime_type, "image/tiff") 6678 6679 6680class Test_ExecProgressQueue(unittest.TestCase): 6681 @LIB_CORE 6682 def test_queue(self): 6683 def range_sum(start, end, progress): 6684 import time 6685 6686 sum_ = 0 6687 for i in range(start, end): 6688 progress(start, end) 6689 sum_ += i 6690 time.sleep(0.1) 6691 return sum_ 6692 6693 def range_sum_output(total): 6694 return u"%d" % (total) 6695 6696 for max_processes in range(1, 21): 6697 queue = audiotools.ExecProgressQueue(audiotools.SilentMessenger()) 6698 6699 for i in range(100): 6700 queue.execute( 6701 function=range_sum, 6702 progress_text=u"Sum %d" % (i + 1), 6703 completion_output=((u"Sum %d Finished" % (i + 1)) 6704 if (i % 2) else range_sum_output), 6705 start=i, 6706 end=i + 10) 6707 6708 results = queue.run(100) 6709 6710 for i in range(max_processes): 6711 self.assertEqual(results[i], sum(range(i, i + 10))) 6712 6713 6714class Test_Output_Text(unittest.TestCase): 6715 @LIB_CORE 6716 def test_output_text(self): 6717 from audiotools import output_text 6718 6719 # ensure invalid colors and styles raise an exception 6720 self.assertRaises(ValueError, 6721 output_text, 6722 unicode_string=u"Foo", 6723 fg_color="unknown") 6724 6725 self.assertRaises(ValueError, 6726 output_text, 6727 unicode_string=u"Foo", 6728 bg_color="unknown") 6729 6730 self.assertRaises(ValueError, 6731 output_text, 6732 unicode_string=u"Foo", 6733 style="unknown") 6734 6735 # ensure setting format returns new output_text with that format 6736 t1 = output_text(unicode_string=u"Foo") 6737 self.assertEqual(u"%s" % (t1,), u"Foo") 6738 self.assertEqual(t1.format(False), u"Foo") 6739 self.assertIn(u"Foo", t1.format(True)) 6740 self.assertEqual(t1.fg_color(), None) 6741 self.assertEqual(t1.fg_color(), None) 6742 self.assertEqual(t1.style(), None) 6743 6744 t2 = t1.set_format(fg_color="black", 6745 bg_color="blue", 6746 style="underline") 6747 self.assertEqual(u"%s" % (t2,), u"Foo") 6748 self.assertEqual(t2.format(False), u"Foo") 6749 self.assertIn(u"Foo", t2.format(True)) 6750 self.assertEqual(t2.fg_color(), "black") 6751 self.assertEqual(t2.bg_color(), "blue") 6752 self.assertEqual(t2.style(), "underline") 6753 6754 t3 = t2.set_format(fg_color=None, 6755 bg_color=None, 6756 style=None) 6757 self.assertEqual(u"%s" % (t3,), u"Foo") 6758 self.assertEqual(t3.format(False), u"Foo") 6759 self.assertIn(u"Foo", t3.format(True)) 6760 self.assertEqual(t3.fg_color(), None) 6761 self.assertEqual(t3.fg_color(), None) 6762 self.assertEqual(t3.style(), None) 6763 6764 # ensure negative head, tail and split values raise ValueError 6765 self.assertRaises(ValueError, 6766 t1.head, 6767 -1) 6768 6769 self.assertRaises(ValueError, 6770 t1.tail, 6771 -1) 6772 6773 self.assertRaises(ValueError, 6774 t1.split, 6775 -1) 6776 6777 for string in [u"a", 6778 u"Foo", 6779 u"a" * 100, 6780 u'\u1107' * 50, 6781 u"a" + (u'\u1107' * 50), 6782 u"a" + (u'\u1107' * 50) + u"b"]: 6783 for (fg_color, 6784 bg_color, 6785 style) in Possibilities([None, "black"], 6786 [None, "blue"], 6787 [None, "underline"]): 6788 6789 t = output_text(unicode_string=string, 6790 fg_color=fg_color, 6791 bg_color=bg_color, 6792 style=style) 6793 6794 # ensure calling head returns a new output_text 6795 # with no more than "display_characters" and 6796 # with the same formatting as the original 6797 for i in range(len(t) + 5): 6798 t2 = t.head(i) 6799 self.assertLessEqual(len(t2), i) 6800 self.assertEqual(t2.fg_color(), fg_color) 6801 self.assertEqual(t2.bg_color(), bg_color) 6802 self.assertEqual(t2.style(), style) 6803 6804 # ensure calling tail returns a new output_text 6805 # with no more than "display_characters" and 6806 # with the same formatting as the original 6807 for i in range(len(t) + 5): 6808 t2 = t.tail(i) 6809 self.assertLessEqual(len(t2), i) 6810 self.assertEqual(t2.fg_color(), fg_color) 6811 self.assertEqual(t2.bg_color(), bg_color) 6812 self.assertEqual(t2.style(), style) 6813 6814 # ensure calling split returns a tuple of new output_text 6815 # with no more than "display_characters" in the head 6816 # and with the same formatting as the original 6817 for i in range(len(t) + 5): 6818 (t2, t3) = t.split(i) 6819 self.assertLessEqual(len(t2), i) 6820 self.assertEqual(len(t2) + len(t3), len(t)) 6821 self.assertEqual(t2.fg_color(), fg_color) 6822 self.assertEqual(t2.bg_color(), bg_color) 6823 self.assertEqual(t2.style(), style) 6824 self.assertEqual(t3.fg_color(), fg_color) 6825 self.assertEqual(t3.bg_color(), bg_color) 6826 self.assertEqual(t3.style(), style) 6827 6828 @LIB_CORE 6829 def test_output_list(self): 6830 from audiotools import output_text, output_list 6831 6832 # ensure invalid colors and styles raise an exception 6833 self.assertRaises(ValueError, 6834 output_list, 6835 output_texts=[u"Foo", u"Bar"], 6836 fg_color="unknown") 6837 6838 self.assertRaises(ValueError, 6839 output_list, 6840 output_texts=[u"Foo", u"Bar"], 6841 bg_color="unknown") 6842 6843 self.assertRaises(ValueError, 6844 output_list, 6845 output_texts=[u"Foo", u"Bar"], 6846 style="unknown") 6847 6848 # ensure setting format returns new output_list with that format 6849 t1 = output_list(output_texts=[u"Foo", output_text(u"Bar")]) 6850 self.assertEqual(u"%s" % (t1,), u"FooBar") 6851 self.assertEqual(t1.format(False), u"FooBar") 6852 self.assertIn(u"FooBar", t1.format(True)) 6853 self.assertEqual(t1.fg_color(), None) 6854 self.assertEqual(t1.fg_color(), None) 6855 self.assertEqual(t1.style(), None) 6856 6857 t2 = t1.set_format(fg_color="black", 6858 bg_color="blue", 6859 style="underline") 6860 self.assertEqual(u"%s" % (t2,), u"FooBar") 6861 self.assertEqual(t2.format(False), u"FooBar") 6862 self.assertIn(u"FooBar", t2.format(True)) 6863 self.assertEqual(t2.fg_color(), "black") 6864 self.assertEqual(t2.bg_color(), "blue") 6865 self.assertEqual(t2.style(), "underline") 6866 6867 t3 = t2.set_format(fg_color=None, 6868 bg_color=None, 6869 style=None) 6870 self.assertEqual(u"%s" % (t3,), u"FooBar") 6871 self.assertEqual(t3.format(False), u"FooBar") 6872 self.assertIn(u"FooBar", t3.format(True)) 6873 self.assertEqual(t3.fg_color(), None) 6874 self.assertEqual(t3.fg_color(), None) 6875 self.assertEqual(t3.style(), None) 6876 6877 # ensure negative head, tail and split values raise ValueError 6878 self.assertRaises(ValueError, 6879 t1.head, 6880 -1) 6881 6882 self.assertRaises(ValueError, 6883 t1.tail, 6884 -1) 6885 6886 self.assertRaises(ValueError, 6887 t1.split, 6888 -1) 6889 6890 for strings in [[u"a"], 6891 [output_text(u"a", 6892 fg_color="white", 6893 bg_color="blue", 6894 style="underline")], 6895 [u"Foo"], 6896 [u"a" * 100], 6897 [output_text(u"Foo")], 6898 [u"Foo", output_text(u"Bar")], 6899 [output_text(u'\u1107' * 10)], 6900 [u"a", output_text(u'\u1107' * 20, 6901 fg_color="white", 6902 bg_color="blue", 6903 style="underline"), u"b"]]: 6904 for (fg_color, 6905 bg_color, 6906 style) in Possibilities([None, "black"], 6907 [None, "blue"], 6908 [None, "underline"]): 6909 6910 t = output_list(output_texts=strings, 6911 fg_color=fg_color, 6912 bg_color=bg_color, 6913 style=style) 6914 6915 # ensure calling head returns a new output_list 6916 # with no more than "display_characters" and 6917 # with the same formatting as the original 6918 for i in range(len(t) + 5): 6919 t2 = t.head(i) 6920 self.assertLessEqual(len(t2), i) 6921 self.assertEqual(t2.fg_color(), fg_color) 6922 self.assertEqual(t2.bg_color(), bg_color) 6923 self.assertEqual(t2.style(), style) 6924 6925 # ensure calling tail returns a new output_list 6926 # with no more than "display_characters" and 6927 # with the same formatting as the original 6928 for i in range(len(t) + 5): 6929 t2 = t.tail(i) 6930 self.assertLessEqual(len(t2), i) 6931 self.assertEqual(t2.fg_color(), fg_color) 6932 self.assertEqual(t2.bg_color(), bg_color) 6933 self.assertEqual(t2.style(), style) 6934 6935 # ensure calling split returns a tuple of new output_list 6936 # with no more than "display_characters" in the head 6937 # and with the same formatting as the original 6938 for i in range(len(t) + 5): 6939 (t2, t3) = t.split(i) 6940 self.assertLessEqual(len(t2), i) 6941 self.assertEqual(len(t2) + len(t3), len(t)) 6942 self.assertEqual(t2.fg_color(), fg_color) 6943 self.assertEqual(t2.bg_color(), bg_color) 6944 self.assertEqual(t2.style(), style) 6945 self.assertEqual(t3.fg_color(), fg_color) 6946 self.assertEqual(t3.bg_color(), bg_color) 6947 self.assertEqual(t3.style(), style) 6948 6949 @LIB_CORE 6950 def test_output_table(self): 6951 from audiotools import output_table, output_text, output_list 6952 6953 # check a table with mismatched columns 6954 err = output_table() 6955 row1 = err.row() 6956 row1.add_column(u"Foo") 6957 self.assertEqual(len(list(err.format(False))), 1) 6958 self.assertEqual(len(list(err.format(True))), 1) 6959 row2 = err.row() 6960 row2.add_column(u"Foo") 6961 row2.add_column(u"Bar") 6962 self.assertRaises(ValueError, 6963 list, 6964 err.format(False)) 6965 self.assertRaises(ValueError, 6966 list, 6967 err.format(True)) 6968 6969 # check a table with nothing but dividers 6970 dividers = output_table() 6971 dividers.divider_row([u"-", u"-"]) 6972 dividers.divider_row([u"*", u"*"]) 6973 dividers.divider_row([u"x", u"x"]) 6974 self.assertEqual(len(list(dividers.format(False))), 3) 6975 self.assertEqual(len(list(dividers.format(True))), 3) 6976 dividers.divider_row([u"_"]) 6977 self.assertRaises(ValueError, 6978 list, 6979 dividers.format(False)) 6980 self.assertRaises(ValueError, 6981 list, 6982 dividers.format(True)) 6983 6984 # check a table with nothing but blank rows 6985 blanks = output_table() 6986 blanks.blank_row() 6987 blanks.blank_row() 6988 blanks.blank_row() 6989 blanks.blank_row() 6990 self.assertEqual(len(list(blanks.format(False))), 4) 6991 self.assertEqual(len(list(blanks.format(True))), 4) 6992 6993 # check a typical table with black rows and dividers 6994 table = output_table() 6995 6996 row1 = table.blank_row() 6997 row2 = table.divider_row([u"-", u"-", u"-"]) 6998 row3 = table.row() 6999 row3.add_column(u"a", "left") 7000 row3.add_column(u"b", "center") 7001 row3.add_column(u"c", "right") 7002 self.assertRaises(ValueError, 7003 row3.add_column, 7004 u"d", "unknown") 7005 7006 row4 = table.row() 7007 row4.add_column(output_text(u"Foo", 7008 fg_color="black", 7009 bg_color="white")) 7010 row4.add_column(output_list([u"Bar", u'\u1107' * 5], 7011 fg_color="blue", 7012 bg_color="red"), "center") 7013 row4.add_column(output_text(u"Blah"), "right") 7014 7015 row5 = table.divider_row([u"-", u"-", u"-"]) 7016 row6 = table.blank_row() 7017 7018 self.assertEqual(len(list(table.format(False))), 6) 7019 self.assertEqual(len(list(table.format(True))), 6) 7020 7021 7022class Test_Most_Numerous(unittest.TestCase): 7023 @LIB_CORE 7024 def test_most_numerous(self): 7025 from audiotools import most_numerous 7026 7027 self.assertEqual(most_numerous([1, 1, 2, 3]), 1) 7028 self.assertEqual(most_numerous([1, 2, 2, 3]), 2) 7029 self.assertEqual(most_numerous([1, 2, 3, 3]), 3) 7030 self.assertEqual(most_numerous([], empty_list=-1), -1) 7031 self.assertEqual(most_numerous([1, 2, 3], 7032 empty_list=-1, 7033 all_differ=-2), -2) 7034 7035 7036class Test_Test_Iter(unittest.TestCase): 7037 @LIB_CORE 7038 def test_iter(self): 7039 from audiotools import iter_first, iter_last 7040 7041 self.assertEqual(list(iter_first([])), 7042 []) 7043 self.assertEqual(list(iter_first([1])), 7044 [(True, 1)]) 7045 self.assertEqual(list(iter_first([1, 2])), 7046 [(True, 1), (False, 2)]) 7047 self.assertEqual(list(iter_first([1, 2, 3])), 7048 [(True, 1), (False, 2), (False, 3)]) 7049 7050 self.assertEqual(list(iter_last([])), 7051 []) 7052 self.assertEqual(list(iter_last([1])), 7053 [(True, 1)]) 7054 self.assertEqual(list(iter_last([1, 2])), 7055 [(False, 1), (True, 2)]) 7056 self.assertEqual(list(iter_last([1, 2, 3])), 7057 [(False, 1), (False, 2), (True, 3)]) 7058