1#!/usr/bin/python
2
3# Audio Tools, a module and set of tools for manipulating audio data
4# Copyright (C) 2007-2014  Brian Langenberger
5
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19
20import sys
21import unittest
22import audiotools
23import struct
24import random
25import tempfile
26import decimal
27import os
28import os.path
29import test_streams
30from io import BytesIO
31from hashlib import md5
32
33from test import (parser, Variable_Reader, BLANK_PCM_Reader,
34                  RANDOM_PCM_Reader, EXACT_SILENCE_PCM_Reader,
35                  EXACT_BLANK_PCM_Reader, SHORT_PCM_COMBINATIONS,
36                  MD5_Reader, FrameCounter, Join_Reader,
37                  Combinations, Possibilities,
38                  TEST_COVER1, TEST_COVER2, TEST_COVER3, HUGE_BMP)
39
40
41def do_nothing(self):
42    pass
43
44
45# add a bunch of decorator metafunctions like LIB_CORE
46# which can be wrapped around individual tests as needed
47for section in parser.sections():
48    for option in parser.options(section):
49        if parser.getboolean(section, option):
50            vars()["%s_%s" % (section.upper(),
51                              option.upper())] = lambda function: function
52        else:
53            vars()["%s_%s" % (section.upper(),
54                              option.upper())] = lambda function: do_nothing
55
56
57if sys.version_info[0] >= 3:
58    def ints_to_bytes(l):
59        return bytes(l)
60
61    def bytes_to_ints(b):
62        return list(b)
63else:
64    def ints_to_bytes(l):
65        return b"".join(map(chr, l))
66
67    def bytes_to_ints(b):
68        return map(ord, b)
69
70
71class PCMReader(unittest.TestCase):
72    @LIB_PCM
73    def test_pcm(self):
74        from audiotools.pcm import from_list
75
76        # try reading lots of bps/signed/endianness combinations
77        for bps in [8, 16, 24]:
78            for big_endian in [True, False]:
79                for signed in [True, False]:
80                    reader = audiotools.PCMFileReader(
81                        BytesIO(
82                            from_list(list(range(-5, 5)),
83                                      1,
84                                      bps,
85                                      True).to_bytes(big_endian, signed)),
86                        sample_rate=44100,
87                        channels=1,
88                        channel_mask=0x4,
89                        bits_per_sample=bps,
90                        signed=signed,
91                        big_endian=big_endian)
92
93                    self.assertEqual(reader.sample_rate, 44100)
94                    self.assertEqual(reader.channels, 1)
95                    self.assertEqual(reader.channel_mask, 0x4)
96                    self.assertEqual(reader.bits_per_sample, bps)
97
98                    # ensure the FrameList is read correctly
99                    f = reader.read((bps // 8) * 10)
100                    self.assertEqual(len(f), 10)
101                    self.assertEqual(list(f), list(range(-5, 5)))
102
103                    # ensure subsequent reads return empty FrameLists
104                    for i in range(10):
105                        f = reader.read((bps // 8) * 10)
106                        self.assertEqual(len(f), 0)
107
108                    # ensure closing the stream raises ValueErrors
109                    # on subsequent reads
110                    reader.close()
111
112                    self.assertRaises(ValueError, reader.read, (bps // 8) * 10)
113
114
115class PCMCat(unittest.TestCase):
116    @LIB_PCM
117    def test_pcm(self):
118        from audiotools.pcm import from_list
119
120        # ensure mismatched streams raise ValueError at init time
121        audiotools.PCMCat([audiotools.PCMFileReader(BytesIO(b""),
122                                                    sample_rate=44100,
123                                                    channels=1,
124                                                    channel_mask=0x4,
125                                                    bits_per_sample=16)])
126
127        self.assertRaises(ValueError,
128                          audiotools.PCMCat,
129                          [audiotools.PCMFileReader(BytesIO(b""),
130                                                    sample_rate=96000,
131                                                    channels=1,
132                                                    channel_mask=0x4,
133                                                    bits_per_sample=16),
134                           audiotools.PCMFileReader(BytesIO(b""),
135                                                    sample_rate=44100,
136                                                    channels=1,
137                                                    channel_mask=0x4,
138                                                    bits_per_sample=16)])
139
140        self.assertRaises(ValueError,
141                          audiotools.PCMCat,
142                          [audiotools.PCMFileReader(BytesIO(b""),
143                                                    sample_rate=44100,
144                                                    channels=2,
145                                                    channel_mask=0x3,
146                                                    bits_per_sample=16),
147                           audiotools.PCMFileReader(BytesIO(b""),
148                                                    sample_rate=44100,
149                                                    channels=1,
150                                                    channel_mask=0x4,
151                                                    bits_per_sample=16)])
152
153        self.assertRaises(ValueError,
154                          audiotools.PCMCat,
155                          [audiotools.PCMFileReader(BytesIO(b""),
156                                                    sample_rate=44100,
157                                                    channels=1,
158                                                    channel_mask=0x4,
159                                                    bits_per_sample=24),
160                           audiotools.PCMFileReader(BytesIO(b""),
161                                                    sample_rate=44100,
162                                                    channels=1,
163                                                    channel_mask=0x4,
164                                                    bits_per_sample=16)])
165
166        main_readers = [
167            audiotools.PCMFileReader(
168                BytesIO(
169                    from_list(samples, 1, 16, True).to_bytes(True,
170                                                             True)),
171                sample_rate=44100,
172                channels=1,
173                channel_mask=0x4,
174                bits_per_sample=16,
175                signed=True,
176                big_endian=True)
177            for samples in [range(-15, -5), range(-5, 5), range(5, 15)]]
178
179        reader = audiotools.PCMCat(main_readers)
180
181        # ensure PCMCat's stream attributes match first reader's
182        self.assertEqual(reader.sample_rate, 44100)
183        self.assertEqual(reader.channels, 1)
184        self.assertEqual(reader.channel_mask, 0x4)
185        self.assertEqual(reader.bits_per_sample, 16)
186
187        # ensure all the substreams are read correctly
188        samples = []
189        f = reader.read(2)
190        while len(f) > 0:
191            samples.extend(list(f))
192            f = reader.read(2)
193
194        self.assertEqual(samples, list(range(-15, 15)))
195
196        # ensure subsequent reads return empty FrameLists
197        for i in range(10):
198            self.assertEqual(len(reader.read(2)), 0)
199
200        # main readers should not yet be closed
201        for r in main_readers:
202            for i in range(10):
203                self.assertEqual(len(r.read(2)), 0)
204
205        # ensure closing the stream raises ValueErrors
206        # on subsequent reads
207        reader.close()
208
209        self.assertRaises(ValueError, reader.read, 2)
210
211        # sub readers should also be closed by PCMCat's close()
212        for r in main_readers:
213            self.assertRaises(ValueError, r.read, 2)
214
215
216class BufferedPCMReader(unittest.TestCase):
217    @LIB_PCM
218    def test_pcm(self):
219        def frame_lengths(reader, pcm_frames):
220            frame = reader.read(pcm_frames)
221            while len(frame) > 0:
222                yield frame.frames
223                frame = reader.read(pcm_frames)
224            else:
225                reader.close()
226
227        # ensure our reader is generating randomly-sized frames
228        reader = Variable_Reader(EXACT_BLANK_PCM_Reader(4096 * 100))
229        self.assertGreater(len(set(frame_lengths(reader, 4096))), 1)
230
231        # then, ensure that wrapped our reader in a BufferedPCMReader
232        # results in equal-sized frames
233        reader = audiotools.BufferedPCMReader(
234            Variable_Reader(EXACT_BLANK_PCM_Reader(4096 * 100)))
235        # (make sure to account for bps/channels in frame_lengths())
236        self.assertEqual(set(frame_lengths(reader, 4096)), set([4096]))
237
238        # check that sample_rate, bits_per_sample, channel_mask and channels
239        # pass-through properly
240        for sample_rate in [32000, 44100, 48000, 192000]:
241            for bits_per_sample in [8, 16, 24]:
242                for (channels, channel_mask) in [(1, 0x4),
243                                                 (2, 0x3),
244                                                 (4, 0x33),
245                                                 (6, 0x3F)]:
246                    reader = BLANK_PCM_Reader(1,
247                                              sample_rate=sample_rate,
248                                              channels=channels,
249                                              bits_per_sample=bits_per_sample,
250                                              channel_mask=channel_mask)
251                    reader2 = audiotools.BufferedPCMReader(reader)
252                    self.assertEqual(reader.sample_rate, sample_rate)
253                    self.assertEqual(reader.channels, channels)
254                    self.assertEqual(reader.bits_per_sample, bits_per_sample)
255                    self.assertEqual(reader.channel_mask, channel_mask)
256
257                    self.assertEqual(reader2.sample_rate, sample_rate)
258                    self.assertEqual(reader2.channels, channels)
259                    self.assertEqual(reader2.bits_per_sample, bits_per_sample)
260                    self.assertEqual(reader2.channel_mask, channel_mask)
261
262        # ensure that random-sized reads also work okay
263        total_frames = 4096 * 1000
264        reader = audiotools.BufferedPCMReader(
265            Variable_Reader(EXACT_BLANK_PCM_Reader(total_frames)))
266        while total_frames > 0:
267            frames = min(total_frames, random.choice(range(1, 1000)))
268            frame = reader.read(frames)
269            self.assertEqual(frame.frames, frames)
270            total_frames -= frame.frames
271
272        # ensure reading after the stream has been exhausted
273        # results in empty FrameLists
274        reader = audiotools.BufferedPCMReader(
275            EXACT_BLANK_PCM_Reader(44100))
276        f = reader.read(4096)
277        while len(f) > 0:
278            f = reader.read(4096)
279
280        self.assertEqual(len(f), 0)
281
282        for i in range(10):
283            f = reader.read(4096)
284            self.assertEqual(len(f), 0)
285
286        # and ensure reading after the stream is closed
287        # raises a ValueError
288        reader.close()
289
290        self.assertRaises(ValueError,
291                          reader.read,
292                          4096)
293
294
295class LimitedPCMReader(unittest.TestCase):
296    @LIB_PCM
297    def test_pcm(self):
298        from audiotools.pcm import from_list
299
300        main_reader = audiotools.PCMFileReader(
301            BytesIO(
302                from_list(list(range(-50, 50)),
303                          1,
304                          16,
305                          True).to_bytes(True, True)),
306            sample_rate=44100,
307            channels=1,
308            channel_mask=0x4,
309            bits_per_sample=16,
310            signed=True,
311            big_endian=True)
312
313        total_samples = []
314        for pcm_frames in [10, 20, 30, 40]:
315            reader_samples = []
316            reader = audiotools.LimitedPCMReader(main_reader, pcm_frames)
317            self.assertEqual(reader.sample_rate, 44100)
318            self.assertEqual(reader.channels, 1)
319            self.assertEqual(reader.channel_mask, 0x4)
320            self.assertEqual(reader.bits_per_sample, 16)
321
322            f = reader.read(2)
323            while len(f) > 0:
324                reader_samples.extend(list(f))
325                f = reader.read(2)
326
327            self.assertEqual(len(reader_samples), pcm_frames)
328
329            total_samples.extend(reader_samples)
330
331            # ensure subsequent reads return empty FrameLists
332            for i in range(10):
333                self.assertEqual(len(reader.read(2)), 0)
334
335            # ensure closing the substream raises ValueErrors
336            # on subsequent reads
337            # (note that this doesn't close the main reader)
338            reader.close()
339
340            self.assertRaises(ValueError, reader.read, 2)
341
342        self.assertEqual(total_samples, list(range(-50, 50)))
343
344        # ensure subsequent reads of main reader return empty FrameLists
345        for i in range(10):
346            self.assertEqual(len(main_reader.read(2)), 0)
347
348        # ensure closing the substream raises ValueErrors
349        # on subsequent reads
350        main_reader.close()
351
352        self.assertRaises(ValueError, main_reader.read, 2)
353
354
355class PCMReaderWindow(unittest.TestCase):
356    @LIB_PCM
357    def test_pcm(self):
358        from audiotools.pcm import from_list
359
360        for initial_offset in range(-5, 5):
361            for pcm_frames in range(5, 15):
362                main_reader = audiotools.PCMFileReader(
363                    BytesIO(
364                        from_list(list(range(1, 11)),
365                                  1,
366                                  16,
367                                  True).to_bytes(True, True)),
368                    sample_rate=44100,
369                    channels=1,
370                    channel_mask=0x4,
371                    bits_per_sample=16,
372                    signed=True,
373                    big_endian=True)
374
375                reader = audiotools.PCMReaderWindow(main_reader,
376                                                    initial_offset,
377                                                    pcm_frames)
378
379                self.assertEqual(reader.sample_rate,
380                                 main_reader.sample_rate)
381                self.assertEqual(reader.channels,
382                                 main_reader.channels)
383                self.assertEqual(reader.channel_mask,
384                                 main_reader.channel_mask)
385                self.assertEqual(reader.bits_per_sample,
386                                 main_reader.bits_per_sample)
387
388                # ensure reads generate the proper window of samples
389                samples = []
390                f = reader.read(2)
391                while len(f) > 0:
392                    samples.extend(list(f))
393                    f = reader.read(2)
394
395                self.assertEqual(len(samples), pcm_frames)
396
397                target_samples = list(range(1, 11))
398                if initial_offset < 0:
399                    # negative offsets pad window with 0s
400                    target_samples = (([0] * abs(initial_offset)) +
401                                      target_samples)
402                elif initial_offset > 0:
403                    # positive offsets remove samples from window
404                    target_samples = target_samples[initial_offset:]
405
406                if len(target_samples) < pcm_frames:
407                    # window longer than samples gets padded with 0s
408                    target_samples += [0] * (pcm_frames - len(target_samples))
409                elif len(target_samples) > pcm_frames:
410                    # window shorder than samples truncates samples
411                    target_samples = target_samples[0:pcm_frames]
412
413                self.assertEqual(samples, target_samples)
414
415                # ensure subsequent reads return empty FrameLists
416                for i in range(10):
417                    self.assertEqual(len(reader.read(2)), 0)
418
419                # ensure closing the PCMReaderWindow
420                # generates ValueErrors on subsequent reads
421                reader.close()
422
423                self.assertRaises(ValueError, reader.read, 2)
424
425                # ensure closing the PCMReaderWindow
426                # closes the main PCMReader also
427                self.assertRaises(ValueError, main_reader.read, 2)
428
429
430class Sines(unittest.TestCase):
431    @LIB_PCM
432    def test_pcm(self):
433        for stream in [
434            test_streams.Generate01(44100),
435            test_streams.Generate02(44100),
436            test_streams.Generate03(44100),
437            test_streams.Generate04(44100),
438
439            test_streams.Sine8_Mono(200000, 48000,
440                                    441.0, 0.50, 441.0, 0.49),
441            test_streams.Sine8_Stereo(200000, 48000,
442                                      441.0, 0.50, 441.0, 0.49, 1.0),
443            test_streams.Sine16_Mono(200000, 48000,
444                                     441.0, 0.50, 441.0, 0.49),
445            test_streams.Sine16_Stereo(200000, 48000,
446                                       441.0, 0.50, 441.0, 0.49, 1.0),
447            test_streams.Sine24_Mono(200000, 48000,
448                                     441.0, 0.50, 441.0, 0.49),
449            test_streams.Sine24_Stereo(200000, 48000,
450                                       441.0, 0.50, 441.0, 0.49, 1.0),
451            test_streams.Simple_Sine(200000, 44100, 0x3F, 16,
452                                     (6400, 10000),
453                                     (11520, 15000),
454                                     (16640, 20000),
455                                     (21760, 25000),
456                                     (26880, 30000),
457                                     (30720, 35000)),
458
459            test_streams.fsd16([1, -1], 100),
460
461            test_streams.WastedBPS16(1000)]:
462
463            # read the base data from the stream
464            f = stream.read(4096)
465            while len(f) > 0:
466                f = stream.read(4096)
467
468            # ensure subsequent reads return empty FrameLists
469            for i in range(10):
470                self.assertEqual(len(stream.read(4096)), 0)
471
472            # ensure subsequent reads on a closed stream
473            # raises ValueError
474            stream.close()
475
476            self.assertRaises(ValueError, stream.read, 4096)
477
478
479class CDDA(unittest.TestCase):
480    @LIB_CDIO
481    def setUp(self):
482        self.temp_dir = tempfile.mkdtemp()
483        self.bin = os.path.join(self.temp_dir, "Test.BIN")
484        self.cue = os.path.join(self.temp_dir, "Test.CUE")
485
486        with open(self.bin, "wb") as bin_file:
487            self.reader = test_streams.Sine16_Stereo(43646652, 44100,
488                                                     441.0, 0.50,
489                                                     4410.0, 0.49, 1.0)
490            audiotools.transfer_framelist_data(self.reader, bin_file.write)
491
492        with open(self.cue, "wb") as f1:
493            with open("cdda_test.cue", "rb") as f2:
494                f1.write(f2.read())
495
496    @LIB_CDIO
497    def tearDown(self):
498        os.unlink(self.bin)
499        os.unlink(self.cue)
500        os.rmdir(self.temp_dir)
501
502    @LIB_CDIO
503    def test_cdda(self):
504        from audiotools.cdio import CDDAReader
505
506        cdda = CDDAReader(self.cue)
507
508        self.assertEqual(cdda.is_cd_image, True)
509        self.assertEqual(cdda.sample_rate, 44100)
510        self.assertEqual(cdda.channels, 2)
511        self.assertEqual(cdda.channel_mask, 0x3)
512        self.assertEqual(cdda.bits_per_sample, 16)
513        self.assertEqual(cdda.first_sector, 0)
514        self.assertEqual(cdda.last_sector, (43646652 // 588) - 1)
515        self.assertEqual(cdda.track_lengths,
516                         {1: 8038548,
517                          2: 7932120,
518                          3: 6318648,
519                          4: 10765104,
520                          5: 5491920,
521                          6: 5100312})
522        self.assertEqual(cdda.track_offsets,
523                         {1: 0,
524                          2: 8038548,
525                          3: 15970668,
526                          4: 22289316,
527                          5: 33054420,
528                          6: 38546340})
529
530        # verify whole disc
531        checksum = md5()
532        frame = cdda.read(44100)
533        while len(frame) > 0:
534            checksum.update(frame.to_bytes(False, True))
535            frame = cdda.read(44100)
536        self.assertEqual(self.reader.hexdigest(),
537                         checksum.hexdigest())
538
539        # ensure subsequent reads keep generating empty framelists
540        for i in range(10):
541            self.assertEqual(cdda.read(44100).frames, 0)
542
543        # verify individual track sections
544        for track_num in sorted(cdda.track_offsets.keys()):
545            offset = cdda.track_offsets[track_num]
546            length = cdda.track_lengths[track_num]
547            remaining_offset = offset - cdda.seek(offset)
548            self.reader.reset()
549            self.assertTrue(audiotools.pcm_cmp(
550                audiotools.PCMReaderWindow(cdda,
551                                           remaining_offset,
552                                           length,
553                                           False),
554                audiotools.PCMReaderWindow(self.reader, offset, length)))
555
556        # verify close raises exceptions when reading/seeking
557        cdda.close()
558        self.assertRaises(ValueError, cdda.read, 10)
559
560        self.assertRaises(ValueError, cdda.seek, 10)
561
562
563class ChannelMask(unittest.TestCase):
564    @LIB_CORE
565    def test_mask(self):
566        mask = audiotools.ChannelMask.from_fields()
567        self.assertFalse(mask.defined())
568        self.assertTrue(mask.undefined())
569        self.assertEqual(len(mask), 0)
570        self.assertEqual(set([]), set(mask.channels()))
571        mask2 = audiotools.ChannelMask(int(mask))
572        self.assertEqual(mask, mask2)
573
574        mask_fields = list(audiotools.ChannelMask.SPEAKER_TO_MASK.keys())
575        for count in range(1, len(mask_fields) + 1):
576            for fields in Combinations(mask_fields, count):
577                # build a mask from fields
578                mask = audiotools.ChannelMask.from_fields(
579                    **dict([(field, True) for field in fields]))
580                self.assertTrue(mask.defined())
581                self.assertFalse(mask.undefined())
582                self.assertEqual(len(mask), len(fields))
583                self.assertEqual(set(fields), set(mask.channels()))
584                mask2 = audiotools.ChannelMask(int(mask))
585                self.assertEqual(mask, mask2)
586
587
588class Filename(unittest.TestCase):
589    def setUp(self):
590        self.temp_dir = tempfile.mkdtemp()
591        self.temp_file1 = os.path.join(self.temp_dir, "file1")
592        self.temp_file2 = os.path.join(self.temp_dir, "file2")
593        f = open(self.temp_file1, "w")
594        f.write("hello world")
595        f.close()
596        os.link(self.temp_file1, self.temp_file2)
597
598    def tearDown(self):
599        os.unlink(self.temp_file1)
600        os.unlink(self.temp_file2)
601        os.rmdir(self.temp_dir)
602
603    @LIB_CORE
604    def test_filename(self):
605        file1 = audiotools.Filename(self.temp_file1)
606        file2 = audiotools.Filename(self.temp_file2)
607        file3 = audiotools.Filename(os.path.join(self.temp_dir, "file3"))
608        file4 = audiotools.Filename(os.path.join(self.temp_dir, "./file3"))
609        file5 = audiotools.Filename(os.path.join(self.temp_dir, "file4"))
610
611        self.assertTrue(file1.disk_file())
612        self.assertTrue(file2.disk_file())
613        self.assertNotEqual(str(file1), str(file2))
614        self.assertNotEqual(u"%s" % (file1,), u"%s" % (file2,))
615        self.assertEqual(file1, file2)
616        self.assertEqual(hash(file1), hash(file2))
617
618        self.assertFalse(file3.disk_file())
619        self.assertNotEqual(str(file1), str(file3))
620        self.assertNotEqual(u"%s" % (file1,), u"%s" % (file3,))
621        self.assertNotEqual(file1, file3)
622        self.assertNotEqual(hash(file1), hash(file3))
623
624        self.assertFalse(file4.disk_file())
625        self.assertEqual(str(file3), str(file4))
626        self.assertEqual(u"%s" % (file3,), u"%s" % (file4,))
627        self.assertEqual(file3, file4)
628        self.assertEqual(hash(file3), hash(file4))
629
630        self.assertFalse(file5.disk_file())
631        self.assertNotEqual(str(file3), str(file5))
632        self.assertNotEqual(u"%s" % (file3,), u"%s" % (file5,))
633        self.assertNotEqual(file3, file5)
634        self.assertNotEqual(hash(file3), hash(file5))
635
636
637class ImageJPEG(unittest.TestCase):
638    @LIB_CORE
639    def setUp(self):
640        with open("imagejpeg_setup.jpg", "rb") as f:
641            self.image = f.read()
642        self.md5sum = "f8c43ff52c53aff1625979de47a04cec"
643        self.width = 12
644        self.height = 21
645        self.bpp = 24
646        self.colors = 0
647        self.mime_type = "image/jpeg"
648
649    @LIB_CORE
650    def test_checksum(self):
651        self.assertEqual(md5(self.image).hexdigest(), self.md5sum)
652
653    @LIB_CORE
654    def test_image(self):
655        img = audiotools.Image.new(self.image, u"Description", 1)
656        self.assertEqual(img.data, self.image)
657        self.assertEqual(img.mime_type, self.mime_type)
658        self.assertEqual(img.width, self.width)
659        self.assertEqual(img.height, self.height)
660        self.assertEqual(img.color_depth, self.bpp)
661        self.assertEqual(img.color_count, self.colors)
662        self.assertEqual(img.description, u"Description")
663        self.assertEqual(img.type, 1)
664
665
666class ImagePNG(ImageJPEG):
667    @LIB_CORE
668    def setUp(self):
669        with open("imagepng_setup.png", "rb") as f:
670            self.image = f.read()
671        self.md5sum = "31c4c5224327d5869aa6059bcda84d2e"
672        self.width = 12
673        self.height = 21
674        self.bpp = 24
675        self.colors = 0
676        self.mime_type = "image/png"
677
678
679class ImageCover1(ImageJPEG):
680    @LIB_CORE
681    def setUp(self):
682        self.image = TEST_COVER1
683        self.md5sum = "dbb6a01eca6336381754346de71e052e"
684        self.width = 500
685        self.height = 500
686        self.bpp = 24
687        self.colors = 0
688        self.mime_type = "image/jpeg"
689
690
691class ImageCover2(ImageJPEG):
692    @LIB_CORE
693    def setUp(self):
694        self.image = TEST_COVER2
695        self.md5sum = "2d348cf729c840893d672dd69476955c"
696        self.width = 500
697        self.height = 500
698        self.bpp = 24
699        self.colors = 0
700        self.mime_type = "image/png"
701
702
703class ImageCover3(ImageJPEG):
704    @LIB_CORE
705    def setUp(self):
706        self.image = TEST_COVER3
707        self.md5sum = "534b107e88d3830eac7ce814fc5d0279"
708        self.width = 100
709        self.height = 100
710        self.bpp = 24
711        self.colors = 0
712        self.mime_type = "image/jpeg"
713
714
715class ImageGIF(ImageJPEG):
716    @LIB_CORE
717    def setUp(self):
718        with open("imagegif_setup.gif", "rb") as f:
719            self.image = f.read()
720        self.md5sum = "1d4d36801b53c41d01086cbf9d0cb471"
721        self.width = 12
722        self.height = 21
723        self.bpp = 8
724        self.colors = 32
725        self.mime_type = "image/gif"
726
727
728class ImageBMP(ImageJPEG):
729    @LIB_CORE
730    def setUp(self):
731        with open("imagebmp_setup.bmp", "rb") as f:
732            self.image = f.read()
733        self.md5sum = "cb6ef2f7a458ab1d315c329f72ec9898"
734        self.width = 12
735        self.height = 21
736        self.bpp = 24
737        self.colors = 0
738        self.mime_type = "image/x-ms-bmp"
739
740
741class ImageTIFF(ImageJPEG):
742    @LIB_CORE
743    def setUp(self):
744        with open("imagetiff_setup.tiff", "rb") as f:
745            self.image = f.read()
746        self.md5sum = "192ceb086d217421a5f151cc0afa3f05"
747        self.width = 12
748        self.height = 21
749        self.bpp = 24
750        self.colors = 0
751        self.mime_type = "image/tiff"
752
753
754class ImageHugeBMP(ImageJPEG):
755    @LIB_CORE
756    def setUp(self):
757        from bz2 import decompress
758        self.image = decompress(HUGE_BMP)
759        self.md5sum = "558d875195829de829059fd4952fed46"
760        self.width = 2366
761        self.height = 2366
762        self.bpp = 24
763        self.colors = 0
764        self.mime_type = "image/x-ms-bmp"
765
766
767class PCMConverter(unittest.TestCase):
768    @LIB_PCM
769    def setUp(self):
770        self.tempwav = tempfile.NamedTemporaryFile(suffix=".wav")
771
772    @LIB_PCM
773    def tearDown(self):
774        self.tempwav.close()
775
776    @LIB_PCM
777    def test_conversions(self):
778        for ((i_sample_rate,
779              i_channels,
780              i_channel_mask,
781              i_bits_per_sample),
782             (o_sample_rate,
783              o_channels,
784              o_channel_mask,
785              o_bits_per_sample)) in Combinations(SHORT_PCM_COMBINATIONS, 2):
786
787            reader = BLANK_PCM_Reader(5,
788                                      sample_rate=i_sample_rate,
789                                      channels=i_channels,
790                                      bits_per_sample=i_bits_per_sample,
791                                      channel_mask=i_channel_mask)
792
793            converter = audiotools.PCMConverter(
794                reader,
795                sample_rate=o_sample_rate,
796                channels=o_channels,
797                bits_per_sample=o_bits_per_sample,
798                channel_mask=o_channel_mask)
799
800            wave = audiotools.WaveAudio.from_pcm(self.tempwav.name, converter)
801            converter.close()
802
803            self.assertEqual(wave.sample_rate(), o_sample_rate)
804            self.assertEqual(wave.channels(), o_channels)
805            self.assertEqual(wave.bits_per_sample(), o_bits_per_sample)
806            self.assertEqual(wave.channel_mask(), o_channel_mask)
807            self.assertEqual(
808                (decimal.Decimal(wave.cd_frames()) / 75).to_integral(),
809                5)
810
811    @LIB_PCM
812    def test_pcm(self):
813        for (in_sample_rate,
814             (in_channels,
815              in_channel_mask),
816             in_bits_per_sample) in Possibilities([44100, 96000],
817                                                  [(1, 0x4),
818                                                   (2, 0x3),
819                                                   (4, 0x33)],
820                                                  [16, 24]):
821            for (out_sample_rate,
822                 (out_channels,
823                  out_channel_mask),
824                 out_bits_per_sample) in Possibilities([44100, 96000],
825                                                       [(1, 0x4),
826                                                        (2, 0x3),
827                                                        (4, 0x33)],
828                                                       [16, 24]):
829
830                main_reader = BLANK_PCM_Reader(
831                    length=1,
832                    sample_rate=in_sample_rate,
833                    channels=in_channels,
834                    bits_per_sample=in_bits_per_sample,
835                    channel_mask=in_channel_mask)
836
837                reader = audiotools.PCMConverter(
838                    pcmreader=main_reader,
839                    sample_rate=out_sample_rate,
840                    channels=out_channels,
841                    channel_mask=out_channel_mask,
842                    bits_per_sample=out_bits_per_sample)
843
844                # read contents of converted stream
845                f = reader.read(4096)
846                while len(f) > 0:
847                    f = reader.read(4096)
848
849                # ensure subsequent reads return empty FrameLists
850                for i in range(10):
851                    self.assertEqual(len(reader.read(4096)), 0)
852
853                # ensure closing stream raises ValueErrors
854                # on subsequent reads
855                reader.close()
856
857                self.assertRaises(ValueError, reader.read, 4096)
858
859                # ensure main reader is also closed
860                # when converter is closed
861                self.assertRaises(ValueError, main_reader.read, 4096)
862
863
864class Test_ReplayGain(unittest.TestCase):
865    @LIB_CORE
866    def test_replaygain(self):
867        # a trivial test of the ReplayGain container
868
869        self.assertEqual(audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0),
870                         audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0))
871        self.assertNotEqual(audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0),
872                            audiotools.ReplayGain(0.25, 1.0, 0.5, 1.0))
873        self.assertNotEqual(audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0),
874                            audiotools.ReplayGain(0.5, 0.5, 0.5, 1.0))
875        self.assertNotEqual(audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0),
876                            audiotools.ReplayGain(0.5, 1.0, 0.25, 1.0))
877        self.assertNotEqual(audiotools.ReplayGain(0.5, 1.0, 0.5, 1.0),
878                            audiotools.ReplayGain(0.5, 1.0, 0.5, 0.5))
879
880
881class Test_filename_to_type(unittest.TestCase):
882    @LIB_CORE
883    def test_filename_to_type(self):
884        type_group = {}
885        for audio_type in audiotools.AVAILABLE_TYPES:
886            type_group.setdefault(audio_type.SUFFIX, []).append(audio_type)
887
888        for suffix in type_group.keys():
889            temp = tempfile.NamedTemporaryFile(suffix="." + suffix)
890            try:
891                if len(type_group[suffix]) == 1:
892                    self.assertEqual(audiotools.filename_to_type(temp.name),
893                                     type_group[suffix][0])
894                else:
895                    self.assertRaises(audiotools.AmbiguousAudioType,
896                                      audiotools.filename_to_type,
897                                      temp.name)
898            finally:
899                temp.close()
900
901        temp = tempfile.NamedTemporaryFile(suffix=".foo")
902        try:
903            self.assertRaises(audiotools.UnknownAudioType,
904                              audiotools.filename_to_type,
905                              temp.name)
906        finally:
907            temp.close()
908
909        temp = tempfile.NamedTemporaryFile()
910        try:
911            self.assertRaises(audiotools.UnknownAudioType,
912                              audiotools.filename_to_type,
913                              temp.name)
914        finally:
915            temp.close()
916
917
918class Test_group_tracks(unittest.TestCase):
919    @LIB_CORE
920    def setUp(self):
921        self.output_format = audiotools.FlacAudio
922        self.track_files = [
923            tempfile.NamedTemporaryFile(
924                suffix="." + self.output_format.SUFFIX)
925            for i in range(5)]
926        self.tracks = [
927            self.output_format.from_pcm(
928                track.name,
929                BLANK_PCM_Reader(1)) for track in self.track_files]
930        self.tracks[0].set_metadata(audiotools.MetaData(
931            album_name=u"Album 1",
932            album_number=1,
933            track_number=1))
934        self.tracks[1].set_metadata(audiotools.MetaData(
935            album_name=u"Album 2",
936            album_number=1,
937            track_number=1))
938        self.tracks[2].set_metadata(audiotools.MetaData(
939            album_name=u"Album 1",
940            album_number=1,
941            track_number=2))
942        self.tracks[3].set_metadata(audiotools.MetaData(
943            album_name=u"Album 2",
944            album_number=2,
945            track_number=1))
946        self.tracks[4].set_metadata(audiotools.MetaData(
947            album_name=u"Album 3",
948            album_number=1,
949            track_number=1))
950
951    @LIB_CORE
952    def tearDown(self):
953        for track in self.track_files:
954            track.close()
955
956    @LIB_CORE
957    def test_grouping(self):
958        groupings = list(audiotools.group_tracks(self.tracks))
959        groupings.sort(key=lambda x: x[0].get_metadata().album_name)
960        self.assertEqual(groupings[0], [self.tracks[0], self.tracks[2]])
961        self.assertEqual(groupings[1], [self.tracks[1]])
962        self.assertEqual(groupings[2], [self.tracks[3]])
963        self.assertEqual(groupings[3], [self.tracks[4]])
964
965
966class Test_open(unittest.TestCase):
967    @LIB_CORE
968    def setUp(self):
969        self.dummy1 = tempfile.NamedTemporaryFile()
970        self.dummy2 = tempfile.NamedTemporaryFile()
971        self.dummy3 = tempfile.NamedTemporaryFile()
972        self.dummy1.write(b"12345" * 1000)
973        self.dummy1.flush()
974        self.dummy2.write(b"54321" * 1000)
975        self.dummy2.flush()
976
977        with open("flac-allframes.flac", "rb") as f:
978            data = f.read()
979        self.dummy3.write(data[0:0x4] + b"\xFF" + data[0x5:])
980        self.dummy3.flush()
981
982    @LIB_CORE
983    def tearDown(self):
984        self.dummy1.close()
985        self.dummy2.close()
986        self.dummy3.close()
987
988    @LIB_CORE
989    def test_open(self):
990        # ensure open on dummy file raises UnsupportedFile
991        self.assertRaises(audiotools.UnsupportedFile,
992                          audiotools.open,
993                          self.dummy1.name)
994
995        # ensure open on nonexistent file raises IOError
996        self.assertRaises(IOError,
997                          audiotools.open,
998                          "/dev/null/foo")
999
1000        # ensure open on directory raises IOError
1001        self.assertRaises(IOError,
1002                          audiotools.open,
1003                          "/")
1004
1005        # ensure open on unreadable file raises IOError
1006        os.chmod(self.dummy1.name, 0)
1007        try:
1008            self.assertRaises(IOError,
1009                              audiotools.open,
1010                              self.dummy1.name)
1011        finally:
1012            os.chmod(self.dummy1.name, 0o600)
1013
1014        self.assertRaises(audiotools.InvalidFile,
1015                          audiotools.open,
1016                          self.dummy3.name)
1017
1018
1019class Test_open_directory(unittest.TestCase):
1020    @LIB_CORE
1021    def setUp(self):
1022        self.output_type = audiotools.FlacAudio
1023        self.suffix = "." + self.output_type.SUFFIX
1024        self.dir = tempfile.mkdtemp()
1025
1026    def make_track(self, directory, track_number):
1027        track = self.output_type.from_pcm(
1028            os.path.join(directory, str(track_number) + self.suffix),
1029            BLANK_PCM_Reader(1))
1030        track.set_metadata(audiotools.MetaData(track_name=u"Track Name",
1031                                               track_number=track_number))
1032        return track
1033
1034    @LIB_CORE
1035    def tearDown(self):
1036        import shutil
1037
1038        shutil.rmtree(self.dir)
1039
1040    @LIB_CORE
1041    def test_open_directory(self):
1042        subdir1 = os.path.join(self.dir, "dir1")
1043        subdir2 = os.path.join(self.dir, "dir2")
1044        subdir3 = os.path.join(subdir1, "dir3")
1045
1046        os.mkdir(subdir1)
1047        os.mkdir(subdir2)
1048        os.mkdir(subdir3)
1049
1050        track0_1 = self.make_track(self.dir, 1)
1051        track0_2 = self.make_track(self.dir, 2)
1052        track0_3 = self.make_track(self.dir, 3)
1053        track1_1 = self.make_track(subdir1, 1)
1054        track1_2 = self.make_track(subdir1, 2)
1055        track1_3 = self.make_track(subdir1, 3)
1056        track2_1 = self.make_track(subdir2, 1)
1057        track2_2 = self.make_track(subdir2, 2)
1058        track2_3 = self.make_track(subdir2, 3)
1059        track3_1 = self.make_track(subdir3, 1)
1060        track3_2 = self.make_track(subdir3, 2)
1061        track3_3 = self.make_track(subdir3, 3)
1062
1063        tracks = list(audiotools.open_directory(self.dir))
1064        self.assertEqual([t.filename for t in tracks],
1065                         [t.filename for t in
1066                          [track0_1, track0_2, track0_3,
1067                           track1_1, track1_2, track1_3,
1068                           track3_1, track3_2, track3_3,
1069                           track2_1, track2_2, track2_3]])
1070
1071
1072class Test_open_files(unittest.TestCase):
1073    @LIB_CORE
1074    def setUp(self):
1075        self.output_type = audiotools.FlacAudio
1076        self.suffix = "." + self.output_type.SUFFIX
1077        self.dir = tempfile.mkdtemp()
1078
1079    def make_track(self, directory, track_number):
1080        track = self.output_type.from_pcm(
1081            os.path.join(directory, str(track_number) + self.suffix),
1082            BLANK_PCM_Reader(1))
1083        track.set_metadata(audiotools.MetaData(track_name=u"Track Name",
1084                                               track_number=track_number))
1085        return track
1086
1087    @LIB_CORE
1088    def tearDown(self):
1089        import shutil
1090
1091        shutil.rmtree(self.dir)
1092
1093    @LIB_CORE
1094    def test_open_files(self):
1095        track1 = self.make_track(self.dir, 1)
1096        track2 = self.make_track(self.dir, 2)
1097        track3 = self.make_track(self.dir, 3)
1098        dummy1_name = os.path.join(self.dir, "4" + self.suffix)
1099        dummy1 = open(dummy1_name, "wb")
1100        dummy1.write(b"Hello World")
1101        dummy1.close()
1102
1103        tracks = list(audiotools.open_files([track1.filename, track2.filename,
1104                                             dummy1_name, track3.filename]))
1105        self.assertEqual([t.filename for t in tracks],
1106                         [t.filename for t in [track1, track2, track3]])
1107
1108
1109class Test_sorted_tracks(unittest.TestCase):
1110    @LIB_CORE
1111    def setUp(self):
1112        self.dir = tempfile.mkdtemp()
1113
1114    @LIB_CORE
1115    def tearDown(self):
1116        import shutil
1117
1118        shutil.rmtree(self.dir)
1119
1120    def __clean__(self):
1121        for f in os.listdir(self.dir):
1122            os.unlink(os.path.join(self.dir, f))
1123
1124    def __no_metadata__(self, filename_number):
1125        return audiotools.WaveAudio.from_pcm(
1126            os.path.join(self.dir, "%2.2d.wav" % (filename_number)),
1127            BLANK_PCM_Reader(1))
1128
1129    def __metadata__(self, filename_number,
1130                     track_number=None, album_number=None):
1131        track = audiotools.FlacAudio.from_pcm(
1132            os.path.join(self.dir, "%2.2d.flac" % (filename_number)),
1133            BLANK_PCM_Reader(1))
1134        track.set_metadata(audiotools.MetaData(track_number=track_number,
1135                                               album_number=album_number))
1136        return track
1137
1138    def __test_order__(self, sorted_tracks):
1139        from random import shuffle
1140
1141        self.assertGreater(len(sorted_tracks), 1)
1142
1143        shuffled_tracks = sorted_tracks[:]
1144        while ([f.filename for f in shuffled_tracks] ==
1145               [f.filename for f in sorted_tracks]):
1146            shuffle(shuffled_tracks)
1147
1148        self.assertNotEqual([f.filename for f in shuffled_tracks],
1149                            [f.filename for f in sorted_tracks])
1150
1151        reordered_tracks = audiotools.sorted_tracks(shuffled_tracks)
1152
1153        self.assertEqual([f.filename for f in reordered_tracks],
1154                         [f.filename for f in sorted_tracks])
1155
1156    @LIB_CORE
1157    def test_sort(self):
1158        # tracks without metadata come before tracks with metadata
1159        self.__clean__()
1160        self.__test_order__([self.__no_metadata__(1),
1161                             self.__metadata__(2, 2),
1162                             self.__metadata__(3, 3)])
1163
1164        self.__clean__()
1165        self.__test_order__([self.__no_metadata__(3),
1166                             self.__metadata__(1, 1),
1167                             self.__metadata__(2, 2)])
1168
1169        self.__clean__()
1170        self.__test_order__([self.__no_metadata__(3),
1171                             self.__no_metadata__(10),
1172                             self.__metadata__(1, 1),
1173                             self.__metadata__(2, 2)])
1174
1175        # tracks without metadata are sorted by filename
1176        self.__clean__()
1177        self.__test_order__([self.__no_metadata__(1),
1178                             self.__no_metadata__(2),
1179                             self.__no_metadata__(3)])
1180
1181        self.__clean__()
1182        self.__test_order__([self.__no_metadata__(1),
1183                             self.__no_metadata__(2),
1184                             self.__no_metadata__(10),
1185                             self.__no_metadata__(11)])
1186
1187        # tracks without album numbers come before tracks with album numbers
1188        self.__clean__()
1189        self.__test_order__([self.__metadata__(3, 3),
1190                             self.__metadata__(2, 1, 1),
1191                             self.__metadata__(1, 2, 1)])
1192
1193        self.__clean__()
1194        self.__test_order__([self.__metadata__(4, 3),
1195                             self.__metadata__(3, 4),
1196                             self.__metadata__(2, 1, 1),
1197                             self.__metadata__(1, 2, 1)])
1198
1199        # tracks without album numbers are sorted by track number (if any)
1200        self.__clean__()
1201        self.__test_order__([self.__metadata__(3),
1202                             self.__metadata__(2, 1),
1203                             self.__metadata__(1, 2)])
1204
1205        self.__clean__()
1206        self.__test_order__([self.__metadata__(3, 1),
1207                             self.__metadata__(2, 2),
1208                             self.__metadata__(1, 3)])
1209
1210        # tracks with album numbers are sorted by album number
1211        # and then by track number (if any)
1212        self.__clean__()
1213        self.__test_order__([self.__metadata__(5),
1214                             self.__metadata__(4, 1, 1),
1215                             self.__metadata__(3, 2, 1),
1216                             self.__metadata__(2, 1, 2),
1217                             self.__metadata__(1, 2, 2)])
1218
1219
1220class Test_pcm_frame_cmp(unittest.TestCase):
1221    @LIB_CORE
1222    def test_pcm_frame_cmp(self):
1223        from test_formats import CLOSE_PCM_Reader
1224
1225        reader1 = CLOSE_PCM_Reader(
1226            test_streams.Sine16_Stereo(44100, 44100,
1227                                       441.0, 0.50,
1228                                       4410.0, 0.49, 1.0))
1229        reader2 = CLOSE_PCM_Reader(
1230            test_streams.Sine16_Stereo(44100, 44100,
1231                                       441.0, 0.50,
1232                                       4410.0, 0.49, 1.0))
1233
1234        self.assertEqual(reader1.closes_called, 0)
1235        self.assertEqual(reader2.closes_called, 0)
1236        self.assertIsNone(audiotools.pcm_frame_cmp(reader1, reader2))
1237        self.assertEqual(reader1.closes_called, 1)
1238        self.assertEqual(reader2.closes_called, 1)
1239
1240        reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1))
1241        reader2 = CLOSE_PCM_Reader(RANDOM_PCM_Reader(1))
1242
1243        self.assertEqual(reader1.closes_called, 0)
1244        self.assertEqual(reader2.closes_called, 0)
1245        self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 0)
1246        self.assertEqual(reader1.closes_called, 1)
1247        self.assertEqual(reader2.closes_called, 1)
1248
1249        reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1))
1250        reader2 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1, sample_rate=48000))
1251
1252        self.assertEqual(reader1.closes_called, 0)
1253        self.assertEqual(reader2.closes_called, 0)
1254        self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 0)
1255        self.assertEqual(reader1.closes_called, 1)
1256        self.assertEqual(reader2.closes_called, 1)
1257
1258        reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1))
1259        reader2 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1, channels=1))
1260
1261        self.assertEqual(reader1.closes_called, 0)
1262        self.assertEqual(reader2.closes_called, 0)
1263        self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 0)
1264        self.assertEqual(reader1.closes_called, 1)
1265        self.assertEqual(reader2.closes_called, 1)
1266
1267        reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1))
1268        reader2 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1, bits_per_sample=24))
1269
1270        self.assertEqual(reader1.closes_called, 0)
1271        self.assertEqual(reader2.closes_called, 0)
1272        self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 0)
1273        self.assertEqual(reader1.closes_called, 1)
1274        self.assertEqual(reader2.closes_called, 1)
1275
1276        reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1))
1277        reader2 = CLOSE_PCM_Reader(BLANK_PCM_Reader(1, channel_mask=0x30))
1278
1279        self.assertEqual(reader1.closes_called, 0)
1280        self.assertEqual(reader2.closes_called, 0)
1281        self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 0)
1282        self.assertEqual(reader1.closes_called, 1)
1283        self.assertEqual(reader2.closes_called, 1)
1284
1285        reader1 = CLOSE_PCM_Reader(BLANK_PCM_Reader(2))
1286        reader2 = CLOSE_PCM_Reader(
1287            audiotools.PCMCat([BLANK_PCM_Reader(1), RANDOM_PCM_Reader(1)]))
1288
1289        self.assertEqual(reader1.closes_called, 0)
1290        self.assertEqual(reader2.closes_called, 0)
1291        self.assertEqual(audiotools.pcm_frame_cmp(reader1, reader2), 44100)
1292        self.assertEqual(reader1.closes_called, 1)
1293        self.assertEqual(reader2.closes_called, 1)
1294
1295
1296class TestFrameList(unittest.TestCase):
1297    if sys.version_info[0] >= 3:
1298        @classmethod
1299        def Bits8(cls):
1300            for i in range(0, 0xFF + 1):
1301                yield bytes([i])
1302
1303        @classmethod
1304        def Bits16(cls):
1305            for i in range(0, 0xFF + 1):
1306                for j in range(0, 0xFF + 1):
1307                    yield bytes([i, j])
1308
1309        @classmethod
1310        def Bits24(cls):
1311            for i in range(0, 0xFF + 1):
1312                for j in range(0, 0xFF + 1):
1313                    for k in range(0, 0xFF + 1):
1314                        yield bytes([i, j, k])
1315    else:
1316        @classmethod
1317        def Bits8(cls):
1318            for i in range(0, 0xFF + 1):
1319                yield chr(i)
1320
1321        @classmethod
1322        def Bits16(cls):
1323            for i in range(0, 0xFF + 1):
1324                for j in range(0, 0xFF + 1):
1325                    yield chr(i) + chr(j)
1326
1327        @classmethod
1328        def Bits24(cls):
1329            for i in range(0, 0xFF + 1):
1330                for j in range(0, 0xFF + 1):
1331                    for k in range(0, 0xFF + 1):
1332                        yield chr(i) + chr(j) + chr(k)
1333
1334    @LIB_CORE
1335    def test_basics(self):
1336        import audiotools.pcm
1337
1338        self.assertRaises(TypeError,
1339                          audiotools.pcm.FrameList,
1340                          0, 2, 16, 0, 1)
1341
1342        self.assertRaises(TypeError,
1343                          audiotools.pcm.FrameList,
1344                          [1, 2, 3], 2, 16, 0, 1)
1345
1346        self.assertRaises(ValueError,
1347                          audiotools.pcm.FrameList,
1348                          b"abc", 2, 16, 0, 1)
1349
1350        self.assertRaises(ValueError,
1351                          audiotools.pcm.FrameList,
1352                          b"abc", 4, 8, 0, 1)
1353
1354        self.assertRaises(ValueError,
1355                          audiotools.pcm.FrameList,
1356                          b"abcd", 1, 15, 0, 1)
1357
1358        f = audiotools.pcm.FrameList(ints_to_bytes(range(16)),
1359                                     2, 16, True, True)
1360        self.assertEqual(len(f), 8)
1361        self.assertEqual(f.channels, 2)
1362        self.assertEqual(f.frames, 4)
1363        self.assertEqual(f.bits_per_sample, 16)
1364        self.assertRaises(IndexError, f.__getitem__, 9)
1365
1366        self.assertEqual(list(f.frame(0)),
1367                         [0x0001, 0x0203])
1368        self.assertEqual(list(f.frame(1)),
1369                         [0x0405, 0x0607])
1370        self.assertEqual(list(f.frame(2)),
1371                         [0x0809, 0x0A0B])
1372        self.assertEqual(list(f.frame(3)),
1373                         [0x0C0D, 0x0E0F])
1374        self.assertRaises(IndexError, f.frame, 4)
1375        self.assertRaises(IndexError, f.frame, -1)
1376
1377        self.assertEqual(list(f.channel(0)),
1378                         [0x0001, 0x0405, 0x0809, 0x0C0D])
1379        self.assertEqual(list(f.channel(1)),
1380                         [0x0203, 0x0607, 0x0A0B, 0x0E0F])
1381        self.assertRaises(IndexError, f.channel, 2)
1382        self.assertRaises(IndexError, f.channel, -1)
1383
1384        for bps in [8, 16, 24]:
1385            self.assertEqual(list(audiotools.pcm.from_list(
1386                range(-40, 40), 1, bps, True)),
1387                list(range(-40, 40)))
1388
1389        for bps in [8, 16, 24]:
1390            self.assertEqual(list(audiotools.pcm.from_list(
1391                range((1 << (bps - 1)) - 40,
1392                      (1 << (bps - 1)) + 40), 1, bps, False)),
1393                list(range(-40, 40)))
1394
1395        for channels in range(1, 9):
1396            for bps in [8, 16, 24]:
1397                for signed in [True, False]:
1398                    if signed:
1399                        l = [random.choice(range(-40, 40)) for i in
1400                             range(16 * channels)]
1401                    else:
1402                        l = [random.choice(range(0, 80)) for i in
1403                             range(16 * channels)]
1404                    f2 = audiotools.pcm.from_list(l, channels, bps, signed)
1405                    if signed:
1406                        self.assertEqual(list(f2), l)
1407                        for channel in range(channels):
1408                            self.assertEqual(list(f2.channel(channel)),
1409                                             l[channel::channels])
1410                    else:
1411                        self.assertEqual(list(f2),
1412                                         [i - (1 << (bps - 1))
1413                                          for i in l])
1414                        for channel in range(channels):
1415                            self.assertEqual(list(f2.channel(channel)),
1416                                             [i - (1 << (bps - 1))
1417                                              for i in l[channel::channels]])
1418
1419        self.assertEqual(
1420            f.to_bytes(True, True),
1421            b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f')
1422        self.assertEqual(
1423            f.to_bytes(False, True),
1424            b'\x01\x00\x03\x02\x05\x04\x07\x06\t\x08\x0b\n\r\x0c\x0f\x0e')
1425        # FIXME - check signed
1426
1427        self.assertEqual(list(f),
1428                         list(audiotools.pcm.from_frames([f.frame(0),
1429                                                          f.frame(1),
1430                                                          f.frame(2),
1431                                                          f.frame(3)])))
1432        self.assertEqual(list(f),
1433                         list(audiotools.pcm.from_channels([f.channel(0),
1434                                                            f.channel(1)])))
1435
1436        self.assertEqual(list(audiotools.pcm.from_list(
1437            [0x0001, 0x0203, 0x0405, 0x0607,
1438             0x0809, 0x0A0B, 0x0C0D, 0x0E0F], 2, 16, True)),
1439            list(f))
1440
1441        self.assertRaises(ValueError,
1442                          audiotools.pcm.from_list,
1443                          [0x0001, 0x0203, 0x0405, 0x0607,
1444                           0x0809, 0x0A0B, 0x0C0D], 2, 16, True)
1445
1446        self.assertRaises(ValueError,
1447                          audiotools.pcm.from_list,
1448                          [0x0001, 0x0203, 0x0405, 0x0607,
1449                           0x0809, 0x0A0B, 0x0C0D, 0x0E0F], 2, 15, True)
1450
1451        self.assertRaises(
1452            TypeError,
1453            audiotools.pcm.from_frames,
1454            [audiotools.pcm.from_list(list(range(2)), 2, 16, False),
1455             list(range(2))])
1456
1457        self.assertRaises(
1458            ValueError,
1459            audiotools.pcm.from_frames,
1460            [audiotools.pcm.from_list(list(range(2)), 2, 16, False),
1461             audiotools.pcm.from_list(list(range(4)), 2, 16, False)])
1462
1463        self.assertRaises(
1464            ValueError,
1465            audiotools.pcm.from_frames,
1466            [audiotools.pcm.from_list(list(range(2)), 2, 16, False),
1467             audiotools.pcm.from_list(list(range(2)), 1, 16, False)])
1468
1469        self.assertRaises(
1470            ValueError,
1471            audiotools.pcm.from_frames,
1472            [audiotools.pcm.from_list(list(range(2)), 2, 16, False),
1473             audiotools.pcm.from_list(list(range(2)), 2, 8, False)])
1474
1475        self.assertEqual(list(audiotools.pcm.from_frames(
1476            [audiotools.pcm.from_list(list(range(2)), 2, 16, True),
1477             audiotools.pcm.from_list(list(range(2, 4)), 2, 16, True)])),
1478            list(range(4)))
1479
1480        self.assertRaises(
1481            TypeError,
1482            audiotools.pcm.from_channels,
1483            [audiotools.pcm.from_list(list(range(2)), 1, 16, False),
1484             list(range(2))])
1485
1486        self.assertRaises(
1487            ValueError,
1488            audiotools.pcm.from_channels,
1489            [audiotools.pcm.from_list(list(range(1)), 1, 16, False),
1490             audiotools.pcm.from_list(list(range(2)), 2, 16, False)])
1491
1492        self.assertRaises(
1493            ValueError,
1494            audiotools.pcm.from_channels,
1495            [audiotools.pcm.from_list(list(range(2)), 1, 16, False),
1496             audiotools.pcm.from_list(list(range(3)), 1, 16, False)])
1497
1498        self.assertRaises(
1499            ValueError,
1500            audiotools.pcm.from_channels,
1501            [audiotools.pcm.from_list(list(range(2)), 1, 16, False),
1502             audiotools.pcm.from_list(list(range(2)), 1, 8, False)])
1503
1504        self.assertEqual(list(audiotools.pcm.from_channels(
1505            [audiotools.pcm.from_list(list(range(2)), 1, 16, True),
1506             audiotools.pcm.from_list(list(range(2, 4)), 1, 16, True)])),
1507            [0, 2, 1, 3])
1508
1509        self.assertRaises(IndexError, f.split, -1)
1510
1511        (f1, f2) = f.split(2)
1512        self.assertEqual(list(f1),
1513                         [0x0001, 0x0203,
1514                          0x0405, 0x0607])
1515        self.assertEqual(list(f2),
1516                         [0x0809, 0x0A0B,
1517                          0x0C0D, 0x0E0F])
1518
1519        (f1, f2) = f.split(0)
1520        self.assertEqual(list(f1),
1521                         [])
1522        self.assertEqual(list(f2),
1523                         [0x0001, 0x0203,
1524                          0x0405, 0x0607,
1525                          0x0809, 0x0A0B,
1526                          0x0C0D, 0x0E0F])
1527
1528        (f1, f2) = f.split(20)
1529        self.assertEqual(list(f1),
1530                         [0x0001, 0x0203,
1531                          0x0405, 0x0607,
1532                          0x0809, 0x0A0B,
1533                          0x0C0D, 0x0E0F])
1534        self.assertEqual(list(f2),
1535                         [])
1536
1537        for i in range(f.frames):
1538            (f1, f2) = f.split(i)
1539            self.assertEqual(len(f1), i * f.channels)
1540            self.assertEqual(len(f2), (len(f) - (i * f.channels)))
1541            self.assertEqual(list(f1 + f2), list(f))
1542
1543        import operator
1544
1545        f1 = audiotools.pcm.from_list(list(range(10)), 2, 16, False)
1546        self.assertRaises(TypeError, operator.concat, f1, [1, 2, 3])
1547        f2 = audiotools.pcm.from_list(list(range(10, 20)), 1, 16, False)
1548        self.assertRaises(ValueError, operator.concat, f1, f2)
1549        f2 = audiotools.pcm.from_list(list(range(10, 20)), 2, 8, False)
1550        self.assertRaises(ValueError, operator.concat, f1, f2)
1551
1552        f1 = audiotools.pcm.from_list(list(range(10)), 2, 16, False)
1553        self.assertEqual(f1, audiotools.pcm.from_list(range(10), 2, 16, False))
1554        self.assertNotEqual(f1, 10)
1555        self.assertNotEqual(f1, list(range(10)))
1556        self.assertNotEqual(
1557            f1,
1558            audiotools.pcm.from_list(list(range(10)), 1, 16, False))
1559        self.assertNotEqual(
1560            f1,
1561            audiotools.pcm.from_list(list(range(10)), 2, 8, False))
1562        self.assertNotEqual(
1563            f1,
1564            audiotools.pcm.from_list(list(range(10)), 1, 8, False))
1565        self.assertNotEqual(
1566            f1,
1567            audiotools.pcm.from_list(list(range(8)), 2, 16, False))
1568        self.assertNotEqual(
1569            f1,
1570            audiotools.pcm.from_list(list(range(12)), 2, 8, False))
1571
1572    @LIB_CORE
1573    def test_8bit_roundtrip(self):
1574        import audiotools.pcm
1575
1576        unsigned_ints = list(range(0, 0xFF + 1))
1577        signed_ints = list(range(-0x80, 0x7F + 1))
1578
1579        # unsigned, big-endian
1580        self.assertEqual(
1581            [i - (1 << 7) for i in unsigned_ints],
1582            list(audiotools.pcm.FrameList(
1583                struct.pack(">%dB" % (len(unsigned_ints)), *unsigned_ints),
1584                1, 8, True, False)))
1585
1586        # unsigned, little-endian
1587        self.assertEqual(
1588            [i - (1 << 7) for i in unsigned_ints],
1589            list(audiotools.pcm.FrameList(
1590                struct.pack("<%dB" % (len(unsigned_ints)), *unsigned_ints),
1591                1, 8, False, False)))
1592
1593        # signed, big-endian
1594        self.assertEqual(
1595            signed_ints,
1596            list(audiotools.pcm.FrameList(
1597                struct.pack(">%db" % (len(signed_ints)), *signed_ints),
1598                1, 8, True, True)))
1599
1600        # signed, little-endian
1601        self.assertEqual(
1602            signed_ints,
1603            list(audiotools.pcm.FrameList(
1604                struct.pack("<%db" % (len(signed_ints)), *signed_ints),
1605                1, 8, 0, 1)))
1606
1607    @LIB_CORE
1608    def test_8bit_roundtrip_str(self):
1609        import audiotools.pcm
1610
1611        s = b"".join(TestFrameList.Bits8())
1612
1613        # big endian, unsigned
1614        self.assertEqual(
1615            audiotools.pcm.FrameList(s, 1, 8,
1616                                     True, False).to_bytes(True, False), s)
1617
1618        # big-endian, signed
1619        self.assertEqual(
1620            audiotools.pcm.FrameList(s, 1, 8,
1621                                     True, True).to_bytes(True, True), s)
1622
1623        # little-endian, unsigned
1624        self.assertEqual(
1625            audiotools.pcm.FrameList(s, 1, 8,
1626                                     False, False).to_bytes(False, False), s)
1627
1628        # little-endian, signed
1629        self.assertEqual(
1630            audiotools.pcm.FrameList(s, 1, 8,
1631                                     False, True).to_bytes(False, True), s)
1632
1633    @LIB_CORE
1634    def test_16bit_roundtrip(self):
1635        import audiotools.pcm
1636
1637        unsigned_ints = list(range(0, 0xFFFF + 1))
1638        signed_ints = list(range(-0x8000, 0x7FFF + 1))
1639
1640        # unsigned, big-endian
1641        self.assertEqual(
1642            [i - (1 << 15) for i in unsigned_ints],
1643            list(audiotools.pcm.FrameList(
1644                struct.pack(">%dH" % (len(unsigned_ints)), *unsigned_ints),
1645                1, 16, True, False)))
1646
1647        # unsigned, little-endian
1648        self.assertEqual(
1649            [i - (1 << 15) for i in unsigned_ints],
1650            list(audiotools.pcm.FrameList(
1651                struct.pack("<%dH" % (len(unsigned_ints)), *unsigned_ints),
1652                1, 16, False, False)))
1653
1654        # signed, big-endian
1655        self.assertEqual(
1656            signed_ints,
1657            list(audiotools.pcm.FrameList(
1658                struct.pack(">%dh" % (len(signed_ints)), *signed_ints),
1659                1, 16, True, True)))
1660
1661        # signed, little-endian
1662        self.assertEqual(
1663            signed_ints,
1664            list(audiotools.pcm.FrameList(
1665                struct.pack("<%dh" % (len(signed_ints)), *signed_ints),
1666                1, 16, False, True)))
1667
1668    @LIB_CORE
1669    def test_16bit_roundtrip_str(self):
1670        import audiotools.pcm
1671
1672        s = b"".join(TestFrameList.Bits16())
1673
1674        # big-endian, unsigned
1675        self.assertEqual(
1676            audiotools.pcm.FrameList(s, 1, 16,
1677                                     True, False).to_bytes(True, False),
1678            s,
1679            "data mismatch converting UBInt16 through string")
1680
1681        # big-endian, signed
1682        self.assertEqual(
1683            audiotools.pcm.FrameList(s, 1, 16,
1684                                     True, True).to_bytes(True, True),
1685            s,
1686            "data mismatch converting SBInt16 through string")
1687
1688        # little-endian, unsigned
1689        self.assertEqual(
1690            audiotools.pcm.FrameList(s, 1, 16,
1691                                     False, False).to_bytes(False, False),
1692            s,
1693            "data mismatch converting ULInt16 through string")
1694
1695        # little-endian, signed
1696        self.assertEqual(
1697            audiotools.pcm.FrameList(s, 1, 16,
1698                                     False, True).to_bytes(False, True),
1699            s,
1700            "data mismatch converting USInt16 through string")
1701
1702    @LIB_CORE
1703    def test_24bit_roundtrip(self):
1704        import audiotools.pcm
1705        from audiotools.bitstream import BitstreamRecorder
1706
1707        unsigned_values = list(range(0, 2 ** 24, 1024))
1708        unsigned_values.append(2 ** 24 - 1)
1709
1710        #unsigned, big-endian
1711        rec = BitstreamRecorder(0)
1712        rec.build("%d*24u" % (len(unsigned_values)), unsigned_values)
1713        framelist = audiotools.pcm.FrameList(rec.data(), 1, 24, True, False)
1714        self.assertEqual(len(unsigned_values), framelist.frames)
1715        self.assertEqual(
1716            [i - (1 << 23) for i in unsigned_values],
1717            list(framelist))
1718
1719        #unsigned, little-endian
1720        rec = BitstreamRecorder(1)
1721        rec.build("%d*24u" % (len(unsigned_values)), unsigned_values)
1722        framelist = audiotools.pcm.FrameList(rec.data(), 1, 24, False, False)
1723        self.assertEqual(len(unsigned_values), framelist.frames)
1724        self.assertEqual(
1725            [i - (1 << 23) for i in unsigned_values],
1726            list(framelist))
1727
1728        signed_values = list(range(-(2 ** 23), 2 ** 23, 1024))
1729        signed_values.append(2 ** 23 - 1)
1730
1731        rec = BitstreamRecorder(0)
1732        rec.build("%d*24s" % (len(signed_values)), signed_values)
1733        framelist = audiotools.pcm.FrameList(rec.data(), 1, 24, True, True)
1734        self.assertEqual(len(signed_values), framelist.frames)
1735        self.assertEqual(signed_values, list(framelist))
1736
1737        rec = BitstreamRecorder(1)
1738        rec.build("%d*24s" % (len(signed_values)), signed_values)
1739        framelist = audiotools.pcm.FrameList(rec.data(), 1, 24, False, True)
1740        self.assertEqual(len(signed_values), framelist.frames)
1741        self.assertEqual(signed_values, list(framelist))
1742
1743    @LIB_CORE
1744    def test_24bit_roundtrip_str(self):
1745        import audiotools.pcm
1746
1747        s = b"".join(TestFrameList.Bits24())
1748        # big-endian, unsigned
1749        self.assertEqual(
1750            audiotools.pcm.FrameList(s, 1, 24,
1751                                     True, False).to_bytes(True, False), s)
1752
1753        # big-endian, signed
1754        self.assertEqual(
1755            audiotools.pcm.FrameList(s, 1, 24,
1756                                     True, True).to_bytes(True, True), s)
1757
1758        # little-endian, unsigned
1759        self.assertEqual(
1760            audiotools.pcm.FrameList(s, 1, 24,
1761                                     False, False).to_bytes(False, False), s)
1762
1763        # little-endian, signed
1764        self.assertEqual(
1765            audiotools.pcm.FrameList(s, 1, 24,
1766                                     False, True).to_bytes(False, True), s)
1767
1768    @LIB_CORE
1769    def test_conversion(self):
1770        for format in audiotools.AVAILABLE_TYPES:
1771            temp_track = tempfile.NamedTemporaryFile(
1772                suffix="." + format.SUFFIX)
1773            try:
1774                for sine_class in [test_streams.Sine8_Stereo,
1775                                   test_streams.Sine16_Stereo,
1776                                   test_streams.Sine24_Stereo]:
1777                    sine = sine_class(88200,
1778                                      44100,
1779                                      441.0,
1780                                      0.50,
1781                                      441.0,
1782                                      0.49,
1783                                      1.0)
1784                    try:
1785                        track = format.from_pcm(temp_track.name, sine)
1786                    except audiotools.UnsupportedBitsPerSample:
1787                        continue
1788                    if track.lossless():
1789                        md5sum = md5()
1790                        audiotools.transfer_framelist_data(
1791                            track.to_pcm(), md5sum.update)
1792                        self.assertEqual(
1793                            md5sum.hexdigest(), sine.hexdigest(),
1794                            "MD5 mismatch for %s using %s" % (
1795                                track.NAME, repr(sine)))
1796                        for new_format in audiotools.AVAILABLE_TYPES:
1797                            with tempfile.NamedTemporaryFile(
1798                                suffix="." + format.SUFFIX) as temp_track2:
1799                                try:
1800                                    track2 = new_format.from_pcm(
1801                                        temp_track2.name,
1802                                        track.to_pcm())
1803                                except audiotools.UnsupportedBitsPerSample:
1804                                    continue
1805                                if track2.lossless():
1806                                    md5sum2 = md5()
1807                                    audiotools.transfer_framelist_data(
1808                                        track2.to_pcm(), md5sum2.update)
1809                                    self.assertEqual(
1810                                        md5sum.hexdigest(),
1811                                        sine.hexdigest(),
1812                                        "MD5 mismatch for converting %s from %s to %s" % (repr(sine), track.NAME, track2.NAME))
1813            finally:
1814                temp_track.close()
1815
1816    @LIB_CORE
1817    def test_errors(self):
1818        # check list that's too large
1819        self.assertRaises(ValueError,
1820                          audiotools.pcm.FloatFrameList,
1821                          [0.0] * 5, 2)
1822
1823        # check list that's too small
1824        self.assertRaises(ValueError,
1825                          audiotools.pcm.FloatFrameList,
1826                          [0.0] * 3, 2)
1827
1828        # check channels <= 0
1829        self.assertRaises(ValueError,
1830                          audiotools.pcm.FloatFrameList,
1831                          [0.0] * 4, 0)
1832
1833        self.assertRaises(ValueError,
1834                          audiotools.pcm.FloatFrameList,
1835                          [0.0] * 4, -1)
1836
1837
1838class TestFloatFrameList(unittest.TestCase):
1839    @LIB_CORE
1840    def test_basics(self):
1841        import audiotools.pcm
1842
1843        self.assertRaises(ValueError,
1844                          audiotools.pcm.FloatFrameList,
1845                          [1.0, 2.0, 3.0], 2)
1846
1847        self.assertRaises(TypeError,
1848                          audiotools.pcm.FloatFrameList,
1849                          0, 1)
1850
1851        self.assertRaises(TypeError,
1852                          audiotools.pcm.FloatFrameList,
1853                          [1.0, 2.0, "a"], 1)
1854
1855        f = audiotools.pcm.FloatFrameList(list(map(float, range(8))), 2)
1856        self.assertEqual(len(f), 8)
1857        self.assertEqual(f.channels, 2)
1858        self.assertEqual(f.frames, 4)
1859        self.assertRaises(IndexError, f.__getitem__, 9)
1860
1861        self.assertEqual(list(f.frame(0)),
1862                         [0.0, 1.0])
1863        self.assertEqual(list(f.frame(1)),
1864                         [2.0, 3.0])
1865        self.assertEqual(list(f.frame(2)),
1866                         [4.0, 5.0])
1867        self.assertEqual(list(f.frame(3)),
1868                         [6.0, 7.0])
1869        self.assertRaises(IndexError, f.frame, 4)
1870        self.assertRaises(IndexError, f.frame, -1)
1871
1872        self.assertEqual(list(f.channel(0)),
1873                         [0.0, 2.0, 4.0, 6.0])
1874        self.assertEqual(list(f.channel(1)),
1875                         [1.0, 3.0, 5.0, 7.0])
1876        self.assertRaises(IndexError, f.channel, 2)
1877        self.assertRaises(IndexError, f.channel, -1)
1878
1879        self.assertEqual(list(f),
1880                         list(audiotools.pcm.from_float_frames([f.frame(0),
1881                                                                f.frame(1),
1882                                                                f.frame(2),
1883                                                                f.frame(3)])))
1884        self.assertEqual(list(f),
1885                         list(audiotools.pcm.from_float_channels([
1886                             f.channel(0),
1887                             f.channel(1)])))
1888
1889        # FIXME - check from_frames
1890        # FIXME - check from_channels
1891
1892        self.assertRaises(IndexError, f.split, -1)
1893
1894        (f1, f2) = f.split(2)
1895        self.assertEqual(list(f1),
1896                         [0.0, 1.0,
1897                          2.0, 3.0])
1898        self.assertEqual(list(f2),
1899                         [4.0, 5.0,
1900                          6.0, 7.0])
1901
1902        (f1, f2) = f.split(0)
1903        self.assertEqual(list(f1),
1904                         [])
1905        self.assertEqual(list(f2),
1906                         [0.0, 1.0,
1907                          2.0, 3.0,
1908                          4.0, 5.0,
1909                          6.0, 7.0])
1910
1911        (f1, f2) = f.split(20)
1912        self.assertEqual(list(f1),
1913                         [0.0, 1.0,
1914                          2.0, 3.0,
1915                          4.0, 5.0,
1916                          6.0, 7.0])
1917        self.assertEqual(list(f2),
1918                         [])
1919
1920        for i in range(f.frames):
1921            (f1, f2) = f.split(i)
1922            self.assertEqual(len(f1), i * f.channels)
1923            self.assertEqual(len(f2), (len(f) - (i * f.channels)))
1924            self.assertEqual(list(f1 + f2), list(f))
1925
1926        import operator
1927
1928        f1 = audiotools.pcm.FloatFrameList(list(map(float, range(10))), 2)
1929        self.assertRaises(TypeError, operator.concat, f1, [1, 2, 3])
1930
1931        # check round-trip from float->int->float
1932        l = [float(i - 128) / (1 << 7) for i in range(0, 1 << 8)]
1933        for bps in [8, 16, 24]:
1934            for signed in [True, False]:
1935                self.assertEqual(
1936                    l,
1937                    list(audiotools.pcm.FloatFrameList(l, 1).to_int(bps).to_float()))
1938
1939        # check round-trip from int->float->int
1940        for bps in [8, 16, 24]:
1941            l = list(range(0, 1 << bps, 4))
1942            self.assertEqual(
1943                [i - (1 << (bps - 1)) for i in l],
1944                list(audiotools.pcm.from_list(l, 1, bps, False).to_float().to_int(bps)))
1945
1946            l = list(range(-(1 << (bps - 1)), (1 << (bps - 1)) - 1, 4))
1947            self.assertEqual(
1948                l,
1949                list(audiotools.pcm.from_list(l, 1, bps, True).to_float().to_int(bps)))
1950
1951    @LIB_CORE
1952    def test_errors(self):
1953        # check string that's too large
1954        self.assertRaises(ValueError,
1955                          audiotools.pcm.FrameList,
1956                          b"\x00" * 5, 2, 16, 1, 1)
1957
1958        # check string that's too small
1959        self.assertRaises(ValueError,
1960                          audiotools.pcm.FrameList,
1961                          b"\x00" * 3, 2, 16, 1, 1)
1962
1963        # check channels <= 0
1964        self.assertRaises(ValueError,
1965                          audiotools.pcm.FrameList,
1966                          b"\x00" * 4, 0, 16, 1, 1)
1967
1968        self.assertRaises(ValueError,
1969                          audiotools.pcm.FrameList,
1970                          b"\x00" * 4, -1, 16, 1, 1)
1971
1972        # check bps != 8,16,24
1973        for bps in [0, 7, 9, 15, 17, 23, 25, 64]:
1974            self.assertRaises(ValueError,
1975                              audiotools.pcm.FrameList,
1976                              b"\x00" * 4, 2, bps, 1, 1)
1977
1978
1979class __SimpleChunkReader__:
1980    def __init__(self, chunks):
1981        self.chunks = chunks
1982        self.i = 0
1983
1984    def read(self, bytes):
1985        try:
1986            chunk = self.chunks[self.i]
1987            self.i += 1
1988            return chunk
1989        except IndexError:
1990            return ""
1991
1992    def tell(self):
1993        return self.i
1994
1995    def seek(self, position):
1996        self.i = position
1997
1998    def close(self):
1999        pass
2000
2001
2002class ByteCounter:
2003    def __init__(self):
2004        self.bytes = 0
2005
2006    def __int__(self):
2007        return self.bytes
2008
2009    def reset(self):
2010        self.bytes = 0
2011
2012    def callback(self, i):
2013        self.bytes += 1
2014
2015
2016class Bitstream(unittest.TestCase):
2017    def __test_big_endian_reader__(self, reader, table):
2018        # check the bitstream reader
2019        # against some known big-endian values
2020
2021        pos = reader.getpos()
2022        self.assertEqual(reader.read(2), 0x2)
2023        self.assertEqual(reader.read(3), 0x6)
2024        self.assertEqual(reader.read(5), 0x07)
2025        self.assertEqual(reader.read(3), 0x5)
2026        self.assertEqual(reader.read(19), 0x53BC1)
2027
2028        reader.setpos(pos)
2029        self.assertEqual(reader.read(2), 0x2)
2030        reader.skip(3)
2031        self.assertEqual(reader.read(5), 0x07)
2032        reader.skip(3)
2033        self.assertEqual(reader.read(19), 0x53BC1)
2034
2035        reader.setpos(pos)
2036        self.assertTrue(reader.byte_aligned())
2037        for i in range(32):
2038            bit = reader.read(1)
2039            reader.unread(bit)
2040            self.assertEqual(reader.read(1), bit)
2041        self.assertTrue(reader.byte_aligned())
2042
2043        reader.setpos(pos)
2044        reader.unread(reader.read(1))
2045        self.assertTrue(reader.byte_aligned())
2046        self.assertEqual(reader.read_bytes(4), b"\xB1\xED\x3B\xC1")
2047        self.assertTrue(reader.byte_aligned())
2048
2049        reader.setpos(pos)
2050        for i in range(8):
2051            reader.unread(0)
2052        self.assertRaises(IOError, reader.unread, 0)
2053
2054        reader.setpos(pos)
2055        self.assertEqual(reader.read_signed(2), -2)
2056        self.assertEqual(reader.read_signed(3), -2)
2057        self.assertEqual(reader.read_signed(5), 7)
2058        self.assertEqual(reader.read_signed(3), -3)
2059        self.assertEqual(reader.read_signed(19), -181311)
2060
2061        reader.setpos(pos)
2062        self.assertEqual(reader.unary(0), 1)
2063        self.assertEqual(reader.unary(0), 2)
2064        self.assertEqual(reader.unary(0), 0)
2065        self.assertEqual(reader.unary(0), 0)
2066        self.assertEqual(reader.unary(0), 4)
2067
2068        reader.setpos(pos)
2069        self.assertEqual(reader.unary(1), 0)
2070        self.assertEqual(reader.unary(1), 1)
2071        self.assertEqual(reader.unary(1), 0)
2072        self.assertEqual(reader.unary(1), 3)
2073        self.assertEqual(reader.unary(1), 0)
2074
2075        reader.setpos(pos)
2076        self.assertEqual(reader.read_huffman_code(table), 1)
2077        self.assertEqual(reader.read_huffman_code(table), 0)
2078        self.assertEqual(reader.read_huffman_code(table), 4)
2079        self.assertEqual(reader.read_huffman_code(table), 0)
2080        self.assertEqual(reader.read_huffman_code(table), 0)
2081        self.assertEqual(reader.read_huffman_code(table), 2)
2082        self.assertEqual(reader.read_huffman_code(table), 1)
2083        self.assertEqual(reader.read_huffman_code(table), 1)
2084        self.assertEqual(reader.read_huffman_code(table), 2)
2085        self.assertEqual(reader.read_huffman_code(table), 0)
2086        self.assertEqual(reader.read_huffman_code(table), 2)
2087        self.assertEqual(reader.read_huffman_code(table), 0)
2088        self.assertEqual(reader.read_huffman_code(table), 1)
2089        self.assertEqual(reader.read_huffman_code(table), 4)
2090        self.assertEqual(reader.read_huffman_code(table), 2)
2091
2092        reader.setpos(pos)
2093        self.assertEqual(reader.read(3), 5)
2094        reader.byte_align()
2095        self.assertEqual(reader.read(3), 7)
2096        reader.byte_align()
2097        reader.byte_align()
2098        self.assertEqual(reader.read(8), 59)
2099        reader.byte_align()
2100        self.assertEqual(reader.read(4), 12)
2101
2102        reader.setpos(pos)
2103        self.assertEqual(reader.read_bytes(2), b"\xB1\xED")
2104        reader.setpos(pos)
2105        self.assertEqual(reader.read(4), 11)
2106        self.assertEqual(reader.read_bytes(2), b"\x1E\xD3")
2107
2108        reader.setpos(pos)
2109        self.assertEqual(reader.read(3), 5)
2110        reader.set_endianness(1)
2111        self.assertEqual(reader.read(3), 5)
2112        reader.set_endianness(0)
2113        self.assertEqual(reader.read(4), 3)
2114        reader.set_endianness(0)
2115        self.assertEqual(reader.read(4), 12)
2116
2117        reader.setpos(pos)
2118        pos2 = reader.getpos()
2119        self.assertEqual(reader.read(4), 0xB)
2120        reader.setpos(pos2)
2121        self.assertEqual(reader.read(8), 0xB1)
2122        reader.setpos(pos2)
2123        self.assertEqual(reader.read(12), 0xB1E)
2124        del(pos2)
2125        pos3 = reader.getpos()
2126        self.assertEqual(reader.read(4), 0xD)
2127        reader.setpos(pos3)
2128        self.assertEqual(reader.read(8), 0xD3)
2129        reader.setpos(pos3)
2130        self.assertEqual(reader.read(12), 0xD3B)
2131        del(pos3)
2132
2133        reader.seek(3, 0)
2134        self.assertEqual(reader.read(8), 0xC1)
2135        reader.seek(2, 0)
2136        self.assertEqual(reader.read(8), 0x3B)
2137        reader.seek(1, 0)
2138        self.assertEqual(reader.read(8), 0xED)
2139        reader.seek(0, 0)
2140        self.assertEqual(reader.read(8), 0xB1)
2141        try:
2142            reader.seek(4, 0)
2143            reader.read(8)
2144            self.assertTrue(False)
2145        except IOError:
2146            self.assertTrue(True)
2147        try:
2148            reader.seek(-1, 0)
2149            reader.read(8)
2150            self.assertTrue(False)
2151        except IOError:
2152            self.assertTrue(True)
2153
2154        reader.seek(-1, 2)
2155        self.assertEqual(reader.read(8), 0xC1)
2156        reader.seek(-2, 2)
2157        self.assertEqual(reader.read(8), 0x3B)
2158        reader.seek(-3, 2)
2159        self.assertEqual(reader.read(8), 0xED)
2160        reader.seek(-4, 2)
2161        self.assertEqual(reader.read(8), 0xB1)
2162        # BytesIO objects allow seeking to before the beginning
2163        # of the stream, placing the cursor at the beginning
2164        # try:
2165        #     reader.seek(-5, 2)
2166        #     reader.read(8)
2167        #     self.assertTrue(False)
2168        # except IOError:
2169        #     self.assertTrue(True)
2170        try:
2171            reader.seek(1, 2)
2172            reader.read(8)
2173            self.assertTrue(False)
2174        except IOError:
2175            self.assertTrue(True)
2176
2177        reader.seek(0, 0)
2178        reader.seek(3, 1)
2179        self.assertEqual(reader.read(8), 0xC1)
2180        reader.seek(0, 0)
2181        reader.seek(2, 1)
2182        self.assertEqual(reader.read(8), 0x3B)
2183        reader.seek(0, 0)
2184        reader.seek(1, 1)
2185        self.assertEqual(reader.read(8), 0xED)
2186        reader.seek(0, 0)
2187        reader.seek(0, 1)
2188        self.assertEqual(reader.read(8), 0xB1)
2189        try:
2190            reader.seek(0, 0)
2191            reader.seek(4, 1)
2192            reader.read(8)
2193            self.assertTrue(False)
2194        except IOError:
2195            self.assertTrue(True)
2196        # BytesIO objects allow seeking to before the beginning
2197        # of the stream, placing the cursor at the beginning
2198        # try:
2199        #     reader.seek(0, 0)
2200        #     reader.seek(-1, 1)
2201        #     reader.read(8)
2202        #     self.assertTrue(False)
2203        # except IOError:
2204        #     self.assertTrue(True)
2205
2206        reader.seek(0, 2)
2207        reader.seek(-1, 1)
2208        self.assertEqual(reader.read(8), 0xC1)
2209        reader.seek(0, 2)
2210        reader.seek(-2, 1)
2211        self.assertEqual(reader.read(8), 0x3B)
2212        reader.seek(0, 2)
2213        reader.seek(-3, 1)
2214        self.assertEqual(reader.read(8), 0xED)
2215        reader.seek(0, 2)
2216        reader.seek(-4, 1)
2217        self.assertEqual(reader.read(8), 0xB1)
2218        # BytesIO objects allow seeking to before the beginning
2219        # of the stream, placing the cursor at the beginning
2220        # try:
2221        #     reader.seek(0, 2)
2222        #     reader.seek(-5, 1)
2223        #     reader.read(8)
2224        #     self.assertTrue(False)
2225        # except IOError:
2226        #     self.assertTrue(True)
2227        try:
2228            reader.seek(0, 2)
2229            reader.seek(1, 1)
2230            reader.read(8)
2231            self.assertTrue(False)
2232        except IOError:
2233            self.assertTrue(True)
2234
2235        reader.setpos(pos)
2236
2237    def __test_little_endian_reader__(self, reader, table):
2238        # check the bitstream reader
2239        # against some known little-endian values
2240
2241        pos = reader.getpos()
2242        self.assertEqual(reader.read(2), 0x1)
2243        self.assertEqual(reader.read(3), 0x4)
2244        self.assertEqual(reader.read(5), 0x0D)
2245        self.assertEqual(reader.read(3), 0x3)
2246        self.assertEqual(reader.read(19), 0x609DF)
2247
2248        reader.setpos(pos)
2249        self.assertEqual(reader.read(2), 0x1)
2250        reader.skip(3)
2251        self.assertEqual(reader.read(5), 0x0D)
2252        reader.skip(3)
2253        self.assertEqual(reader.read(19), 0x609DF)
2254
2255        reader.setpos(pos)
2256        self.assertTrue(reader.byte_aligned())
2257        for i in range(32):
2258            bit = reader.read(1)
2259            reader.unread(bit)
2260            self.assertEqual(reader.read(1), bit)
2261        self.assertTrue(reader.byte_aligned())
2262
2263        reader.setpos(pos)
2264        reader.unread(reader.read(1))
2265        self.assertTrue(reader.byte_aligned())
2266        self.assertEqual(reader.read_bytes(4), b"\xB1\xED\x3B\xC1")
2267        self.assertTrue(reader.byte_aligned())
2268
2269        reader.setpos(pos)
2270        for i in range(8):
2271            reader.unread(0)
2272        self.assertRaises(IOError, reader.unread, 0)
2273
2274        reader.setpos(pos)
2275        self.assertEqual(reader.read_signed(2), 1)
2276        self.assertEqual(reader.read_signed(3), -4)
2277        self.assertEqual(reader.read_signed(5), 13)
2278        self.assertEqual(reader.read_signed(3), 3)
2279        self.assertEqual(reader.read_signed(19), -128545)
2280
2281        reader.setpos(pos)
2282        self.assertEqual(reader.unary(0), 1)
2283        self.assertEqual(reader.unary(0), 0)
2284        self.assertEqual(reader.unary(0), 0)
2285        self.assertEqual(reader.unary(0), 2)
2286        self.assertEqual(reader.unary(0), 2)
2287
2288        reader.setpos(pos)
2289        self.assertEqual(reader.unary(1), 0)
2290        self.assertEqual(reader.unary(1), 3)
2291        self.assertEqual(reader.unary(1), 0)
2292        self.assertEqual(reader.unary(1), 1)
2293        self.assertEqual(reader.unary(1), 0)
2294
2295        reader.setpos(pos)
2296        self.assertEqual(reader.read_huffman_code(table), 1)
2297        self.assertEqual(reader.read_huffman_code(table), 3)
2298        self.assertEqual(reader.read_huffman_code(table), 1)
2299        self.assertEqual(reader.read_huffman_code(table), 0)
2300        self.assertEqual(reader.read_huffman_code(table), 2)
2301        self.assertEqual(reader.read_huffman_code(table), 1)
2302        self.assertEqual(reader.read_huffman_code(table), 0)
2303        self.assertEqual(reader.read_huffman_code(table), 0)
2304        self.assertEqual(reader.read_huffman_code(table), 1)
2305        self.assertEqual(reader.read_huffman_code(table), 0)
2306        self.assertEqual(reader.read_huffman_code(table), 1)
2307        self.assertEqual(reader.read_huffman_code(table), 2)
2308        self.assertEqual(reader.read_huffman_code(table), 4)
2309        self.assertEqual(reader.read_huffman_code(table), 3)
2310
2311        reader.setpos(pos)
2312        self.assertEqual(reader.read_bytes(2), b"\xB1\xED")
2313        reader.setpos(pos)
2314        self.assertEqual(reader.read(4), 1)
2315
2316        self.assertEqual(reader.read_bytes(2), b"\xDB\xBE")
2317
2318        reader.setpos(pos)
2319        self.assertEqual(reader.read(3), 1)
2320        reader.byte_align()
2321        self.assertEqual(reader.read(3), 5)
2322        reader.byte_align()
2323        reader.byte_align()
2324        self.assertEqual(reader.read(8), 59)
2325        reader.byte_align()
2326        self.assertEqual(reader.read(4), 1)
2327
2328        reader.setpos(pos)
2329        self.assertEqual(reader.read(3), 1)
2330        reader.set_endianness(0)
2331        self.assertEqual(reader.read(3), 7)
2332        reader.set_endianness(1)
2333        self.assertEqual(reader.read(4), 11)
2334        reader.set_endianness(1)
2335        self.assertEqual(reader.read(4), 1)
2336
2337        reader.setpos(pos)
2338        pos2 = reader.getpos()
2339        self.assertEqual(reader.read(4), 0x1)
2340        reader.setpos(pos2)
2341        self.assertEqual(reader.read(8), 0xB1)
2342        reader.setpos(pos2)
2343        self.assertEqual(reader.read(12), 0xDB1)
2344        del(pos2)
2345        pos3 = reader.getpos()
2346        self.assertEqual(reader.read(4), 0xE)
2347        reader.setpos(pos3)
2348        self.assertEqual(reader.read(8), 0xBE)
2349        reader.setpos(pos3)
2350        self.assertEqual(reader.read(12), 0x3BE)
2351        del(pos3)
2352
2353        reader.setpos(pos)
2354
2355    def __test_try__(self, reader, table):
2356        start = reader.getpos()
2357
2358        # bounce to the very end of the stream
2359        reader.skip(31)
2360        pos = reader.getpos()
2361        self.assertEqual(reader.read(1), 1)
2362        reader.setpos(pos)
2363
2364        # then test all the read methods to ensure they trigger br_abort
2365        # in the case of unary/Huffman, the stream ends on a "1" bit
2366        # whether reading it big-endian or little-endian
2367
2368        self.assertRaises(IOError, reader.read, 2)
2369        reader.setpos(pos)
2370        self.assertRaises(IOError, reader.read_signed, 2)
2371        reader.setpos(pos)
2372        self.assertRaises(IOError, reader.skip, 2)
2373        reader.setpos(pos)
2374        self.assertRaises(IOError, reader.unary, 0)
2375        reader.setpos(pos)
2376        self.assertEqual(reader.unary(1), 0)
2377        self.assertRaises(IOError, reader.unary, 1)
2378        reader.setpos(pos)
2379        self.assertRaises(IOError, reader.read_huffman_code, table)
2380        reader.setpos(pos)
2381        self.assertRaises(IOError, reader.read_bytes, 2)
2382        reader.setpos(pos)
2383        self.assertRaises(IOError, reader.substream, 1)
2384        reader.setpos(pos)
2385        self.assertRaises(ValueError, reader.read, -1)
2386        reader.setpos(pos)
2387        self.assertRaises(ValueError, reader.read_signed, -1)
2388        reader.setpos(pos)
2389        self.assertRaises(ValueError, reader.read_signed, 0)
2390        reader.setpos(pos)
2391        self.assertRaises(ValueError, reader.skip, -1)
2392        reader.setpos(pos)
2393        self.assertRaises(ValueError, reader.read_bytes, -2)
2394        reader.setpos(pos)
2395        self.assertRaises(IOError, reader.skip_bytes, 2 ** 30)
2396        reader.setpos(pos)
2397        self.assertRaises(IOError, reader.skip_bytes, 2 ** 65)
2398        reader.setpos(pos)
2399        self.assertRaises(IOError, reader.read_bytes, 2 ** 30)
2400        reader.setpos(pos)
2401        self.assertRaises(IOError, reader.read_bytes, 2 ** 65)
2402        reader.setpos(pos)
2403        self.assertRaises(IOError, reader.substream, 2 ** 30)
2404        reader.setpos(pos)
2405
2406        reader.setpos(start)
2407
2408    def __test_callbacks_reader__(self,
2409                                  reader,
2410                                  unary_0_reads,
2411                                  unary_1_reads,
2412                                  table,
2413                                  huffman_code_count):
2414        counter = ByteCounter()
2415        start = reader.getpos()
2416        reader.add_callback(counter.callback)
2417
2418        # a single callback
2419        counter.reset()
2420        for i in range(8):
2421            reader.read(4)
2422        self.assertEqual(int(counter), 4)
2423        reader.setpos(start)
2424
2425        # calling callbacks directly
2426        counter.reset()
2427        for i in range(20):
2428            reader.call_callbacks(0)
2429        self.assertEqual(int(counter), 20)
2430
2431        # two callbacks
2432        counter.reset()
2433        reader.add_callback(counter.callback)
2434        for i in range(8):
2435            reader.read(4)
2436        self.assertEqual(int(counter), 8)
2437        reader.pop_callback()
2438        reader.setpos(start)
2439
2440        # temporarily suspending the callback
2441        counter.reset()
2442        reader.read(8)
2443        self.assertEqual(int(counter), 1)
2444        callback = reader.pop_callback()
2445        reader.read(8)
2446        reader.read(8)
2447        reader.add_callback(counter.callback)
2448        reader.read(8)
2449        self.assertEqual(int(counter), 2)
2450        reader.setpos(start)
2451
2452        # temporarily adding two callbacks
2453        counter.reset()
2454        reader.read(8)
2455        self.assertEqual(int(counter), 1)
2456        reader.add_callback(counter.callback)
2457        reader.read(8)
2458        reader.read(8)
2459        reader.pop_callback()
2460        reader.read(8)
2461        self.assertEqual(int(counter), 6)
2462        reader.setpos(start)
2463
2464        # read_signed
2465        counter.reset()
2466        for i in range(8):
2467            reader.read_signed(4)
2468        self.assertEqual(int(counter), 4)
2469        reader.setpos(start)
2470
2471        # skip
2472        counter.reset()
2473        for i in range(8):
2474            reader.skip(4)
2475        self.assertEqual(int(counter), 4)
2476        reader.setpos(start)
2477
2478        # read_unary
2479        counter.reset()
2480        for i in range(unary_0_reads):
2481            reader.unary(0)
2482        self.assertEqual(int(counter), 4)
2483        counter.reset()
2484        reader.setpos(start)
2485        for i in range(unary_1_reads):
2486            reader.unary(1)
2487        self.assertEqual(int(counter), 4)
2488        reader.setpos(start)
2489
2490        # read_huffman_code
2491        counter.reset()
2492        for i in range(huffman_code_count):
2493            reader.read_huffman_code(table)
2494        self.assertEqual(int(counter), 4)
2495        reader.setpos(start)
2496
2497        # read_bytes
2498        counter.reset()
2499        reader.read_bytes(2)
2500        reader.read_bytes(2)
2501        self.assertEqual(int(counter), 4)
2502        reader.setpos(start)
2503
2504        reader.pop_callback()
2505        del(start)
2506
2507    @LIB_BITSTREAM
2508    def test_init_error(self):
2509        from audiotools.bitstream import BitstreamReader
2510        from audiotools.bitstream import BitstreamRecorder
2511        from audiotools.bitstream import BitstreamWriter
2512
2513        self.assertRaises(TypeError, BitstreamRecorder)
2514        self.assertRaises(TypeError, BitstreamRecorder, None)
2515        self.assertRaises(TypeError, BitstreamWriter)
2516        self.assertRaises(TypeError, BitstreamReader)
2517
2518    @LIB_BITSTREAM
2519    def test_parse(self):
2520        from audiotools.bitstream import parse
2521
2522        # test basic big-endian string
2523        self.assertEqual(parse("2u3u5u3s19s",
2524                               False,
2525                               ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1])),
2526                         [2, 6, 7, -3, -181311])
2527
2528        # test all the defined format fields
2529        for (fields, values) in [("2u 3u 5u 3u 19u",
2530                                  [0x2, 0x6, 0x07, 0x5, 0x53BC1]),
2531                                 ("2s 3s 5s 3s 19s",
2532                                  [-2, -2, 7, -3, -181311]),
2533                                 ("2U 3U 5U 3U 19U",
2534                                  [0x2, 0x6, 0x07, 0x5, 0x53BC1]),
2535                                 ("2S 3S 5S 3S 19S",
2536                                  [-2, -2, 7, -3, -181311]),
2537                                 ("2u 3p 5u 3p 19u",
2538                                  [0x2, 0x07, 0x53BC1]),
2539                                 ("2p 1P 3u 19u",
2540                                  [0x5, 0x53BC1]),
2541                                 ("2b 2b",
2542                                  [b"\xB1\xED", b"\x3B\xC1"]),
2543                                 ("2u a 3u a 4u a 5u",
2544                                  [2, 7, 3, 24]),
2545                                 ("3* 2u",
2546                                  [2, 3, 0]),
2547                                 ("3* 2* 2u",
2548                                  [2, 3, 0, 1, 3, 2]),
2549                                 ("2u ? 3u", [2]),
2550                                 ("2u 10? 3u", [2]),
2551                                 ("2u 10* ? 3u", [2]),
2552                                 ("2u 10* 3? 3u", [2])]:
2553            self.assertEqual(parse(fields,
2554                                   False,
2555                                   ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1])),
2556                             values)
2557
2558        # test several big-endian unsigned edge cases
2559        self.assertEqual(
2560            parse("32u 32u 32u 32u 64U 64U 64U 64U",
2561                  False,
2562                  ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
2563                                 128, 0, 0, 0, 127, 255, 255, 255,
2564                                 0, 0, 0, 0, 0, 0, 0, 0,
2565                                 255, 255, 255, 255, 255, 255, 255, 255,
2566                                 128, 0, 0, 0, 0, 0, 0, 0,
2567                                 127, 255, 255, 255, 255, 255, 255, 255])),
2568            [0,
2569             4294967295,
2570             2147483648,
2571             2147483647,
2572             0,
2573             0xFFFFFFFFFFFFFFFF,
2574             9223372036854775808,
2575             9223372036854775807])
2576
2577        # test several big-endian signed edge cases
2578        self.assertEqual(
2579            parse("32s 32s 32s 32s 64S 64S 64S 64S",
2580                  False,
2581                  ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
2582                                 128, 0, 0, 0, 127, 255, 255, 255,
2583                                 0, 0, 0, 0, 0, 0, 0, 0,
2584                                 255, 255, 255, 255, 255, 255, 255, 255,
2585                                 128, 0, 0, 0, 0, 0, 0, 0,
2586                                 127, 255, 255, 255, 255, 255, 255, 255])),
2587            [0,
2588             -1,
2589             -2147483648,
2590             2147483647,
2591             0,
2592             -1,
2593             -9223372036854775808,
2594             9223372036854775807])
2595
2596        # test big-endian read errors
2597        for s in ["3u", "3s", "3U", "3S", "3p", "3P", "3b"]:
2598            self.assertRaises(IOError,
2599                              parse,
2600                              "8u" + s,
2601                              False,
2602                              "a")
2603
2604        # test basic little-endian string
2605        self.assertEqual(parse("2u3u5u3s19s",
2606                               True,
2607                               ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1])),
2608                         [1, 4, 13, 3, -128545])
2609
2610        # test all the defined format fields
2611        for (fields, values) in [("2u 3u 5u 3u 19u",
2612                                  [0x1, 0x4, 0x0D, 0x3, 0x609DF]),
2613                                 ("2s 3s 5s 3s 19s",
2614                                  [1, -4, 13, 3, -128545]),
2615                                 ("2U 3U 5U 3U 19U",
2616                                  [0x1, 0x4, 0x0D, 0x3, 0x609DF]),
2617                                 ("2S 3S 5S 3S 19S",
2618                                  [1, -4, 13, 3, -128545]),
2619                                 ("2u 3p 5u 3p 19u",
2620                                  [0x1, 0x0D, 0x609DF]),
2621                                 ("2p 1P 3u 19u",
2622                                  [0x3, 0x609DF]),
2623                                 ("2b 2b",
2624                                  [b"\xB1\xED", b"\x3B\xC1"]),
2625                                 ("2u a 3u a 4u a 5u",
2626                                  [1, 5, 11, 1]),
2627                                 ("3* 2u",
2628                                  [1, 0, 3]),
2629                                 ("3* 2* 2u",
2630                                  [1, 0, 3, 2, 1, 3]),
2631                                 ("2u ? 3u", [1]),
2632                                 ("2u 10? 3u", [1]),
2633                                 ("2u 10* ? 3u", [1]),
2634                                 ("2u 10* 3? 3u", [1])]:
2635            self.assertEqual(parse(fields,
2636                                   True,
2637                                   ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1])),
2638                             values)
2639
2640        # test several little-endian unsigned edge cases
2641        self.assertEqual(
2642            parse("32u 32u 32u 32u 64U 64U 64U 64U",
2643                  True,
2644                  ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
2645                                 0, 0, 0, 128, 255, 255, 255, 127,
2646                                 0, 0, 0, 0, 0, 0, 0, 0,
2647                                 255, 255, 255, 255, 255, 255, 255, 255,
2648                                 0, 0, 0, 0, 0, 0, 0, 128,
2649                                 255, 255, 255, 255, 255, 255, 255, 127])),
2650            [0,
2651             4294967295,
2652             2147483648,
2653             2147483647,
2654             0,
2655             0xFFFFFFFFFFFFFFFF,
2656             9223372036854775808,
2657             9223372036854775807])
2658
2659        # test several little-endian signed edge cases
2660        self.assertEqual(
2661            parse("32s 32s 32s 32s 64S 64S 64S 64S",
2662                  True,
2663                  ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
2664                                 0, 0, 0, 128, 255, 255, 255, 127,
2665                                 0, 0, 0, 0, 0, 0, 0, 0,
2666                                 255, 255, 255, 255, 255, 255, 255, 255,
2667                                 0, 0, 0, 0, 0, 0, 0, 128,
2668                                 255, 255, 255, 255, 255, 255, 255, 127])),
2669            [0,
2670             -1,
2671             -2147483648,
2672             2147483647,
2673             0,
2674             -1,
2675             -9223372036854775808,
2676             9223372036854775807])
2677
2678        # test little-endian read errors
2679        for s in ["3u", "3s", "3U", "3S", "3p", "3P", "3b"]:
2680            self.assertRaises(IOError,
2681                              parse,
2682                              "8u" + s,
2683                              True,
2684                              "a")
2685
2686    @LIB_BITSTREAM
2687    def test_build(self):
2688        from audiotools.bitstream import build
2689
2690        # test basic big-endian string
2691        self.assertEqual(build("2u3u5u3s19s",
2692                               False,
2693                               [2, 6, 7, -3, -181311]),
2694                         ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1]))
2695
2696        # test several big-endian unsigned edge cases
2697        self.assertEqual(
2698            build("32u 32u 32u 32u 64U 64U 64U 64U",
2699                  False,
2700                  [0,
2701                   4294967295,
2702                   2147483648,
2703                   2147483647,
2704                   0,
2705                   0xFFFFFFFFFFFFFFFF,
2706                   9223372036854775808,
2707                   9223372036854775807]),
2708            ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
2709                           128, 0, 0, 0, 127, 255, 255, 255,
2710                           0, 0, 0, 0, 0, 0, 0, 0,
2711                           255, 255, 255, 255, 255, 255, 255, 255,
2712                           128, 0, 0, 0, 0, 0, 0, 0,
2713                           127, 255, 255, 255, 255, 255, 255, 255]))
2714
2715        # test several big-endian signed edge cases
2716        self.assertEqual(
2717            build("32s 32s 32s 32s 64S 64S 64S 64S",
2718                  False,
2719                  [0,
2720                   -1,
2721                   -2147483648,
2722                   2147483647,
2723                   0,
2724                   -1,
2725                   -9223372036854775808,
2726                   9223372036854775807]),
2727            ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
2728                           128, 0, 0, 0, 127, 255, 255, 255,
2729                           0, 0, 0, 0, 0, 0, 0, 0,
2730                           255, 255, 255, 255, 255, 255, 255, 255,
2731                           128, 0, 0, 0, 0, 0, 0, 0,
2732                           127, 255, 255, 255, 255, 255, 255, 255]))
2733
2734        # test big-endian write errors
2735        for l in [[2, 6, 7, -3], [2, 6, 7], [2, 6], [2], []]:
2736            self.assertRaises(IndexError,
2737                              build,
2738                              "2u3u5u3s19s",
2739                              False,
2740                              l)
2741
2742        # test basic little-endian string
2743        self.assertEqual(build("2u3u5u3s19s",
2744                               True,
2745                               [1, 4, 13, 3, -128545]),
2746                         ints_to_bytes([0xB1, 0xED, 0x3B, 0xC1]))
2747
2748        # test several little-endian unsigned edge cases
2749        self.assertEqual(
2750            build("32u 32u 32u 32u 64U 64U 64U 64U",
2751                  True,
2752                  [0,
2753                   4294967295,
2754                   2147483648,
2755                   2147483647,
2756                   0,
2757                   0xFFFFFFFFFFFFFFFF,
2758                   9223372036854775808,
2759                   9223372036854775807]),
2760            ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
2761                           0, 0, 0, 128, 255, 255, 255, 127,
2762                           0, 0, 0, 0, 0, 0, 0, 0,
2763                           255, 255, 255, 255, 255, 255, 255, 255,
2764                           0, 0, 0, 0, 0, 0, 0, 128,
2765                           255, 255, 255, 255, 255, 255, 255, 127]))
2766
2767        # test several little-endian signed edge cases
2768        self.assertEqual(
2769            build("32s 32s 32s 32s 64S 64S 64S 64S",
2770                  True,
2771                  [0,
2772                   -1,
2773                   -2147483648,
2774                   2147483647,
2775                   0,
2776                   -1,
2777                   -9223372036854775808,
2778                   9223372036854775807]),
2779            ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
2780                           0, 0, 0, 128, 255, 255, 255, 127,
2781                           0, 0, 0, 0, 0, 0, 0, 0,
2782                           255, 255, 255, 255, 255, 255, 255, 255,
2783                           0, 0, 0, 0, 0, 0, 0, 128,
2784                           255, 255, 255, 255, 255, 255, 255, 127]))
2785
2786        # test little-endian write errors
2787        for l in [[1, 4, 13, 3], [1, 4, 13], [1, 4], [1], []]:
2788            self.assertRaises(IndexError,
2789                              build,
2790                              "2u3u5u3s19s",
2791                              True,
2792                              l)
2793
2794    @LIB_BITSTREAM
2795    def test_build_parse_roundtrip(self):
2796        from audiotools.bitstream import build, parse
2797
2798        for (format_string, values) in [("1u a",   [1]),
2799                                        ("2u a",   [1]),
2800                                        ("3u a",   [1]),
2801                                        ("4u a",   [1]),
2802                                        ("5u a",   [1]),
2803                                        ("6u a",   [1]),
2804                                        ("7u a",   [1]),
2805                                        ("8u",     [1]),
2806                                        ("2s a",   [-1]),
2807                                        ("3s a",   [-1]),
2808                                        ("4s a",   [-1]),
2809                                        ("5s a",   [-1]),
2810                                        ("6s a",   [-1]),
2811                                        ("7s a",   [-1]),
2812                                        ("8s a",   [-1]),
2813                                        ("64U",    [0xFFFFFFFFFFFFFFFF]),
2814                                        ("64S",    [-9223372036854775808]),
2815                                        ("10b",    [b"\x00" * 10]),
2816                                        ("10p10b a", [b"\x01" * 10]),
2817                                        ("10P10b", [b"\x02" * 10])]:
2818            self.assertEqual(parse(format_string, False,
2819                                   build(format_string, False, values)),
2820                             values)
2821            self.assertEqual(parse(format_string, True,
2822                                   build(format_string, True, values)),
2823                             values)
2824
2825    @LIB_BITSTREAM
2826    def test_simple_reader(self):
2827        from audiotools.bitstream import BitstreamReader, HuffmanTree
2828
2829        data = b"\xB1\xED\x3B\xC1"
2830
2831        temp = tempfile.TemporaryFile()
2832
2833        temp.write(data)
2834        temp.flush()
2835        temp.seek(0, 0)
2836
2837        temp_s = BytesIO()
2838        temp_s.write(data)
2839        temp_s.seek(0, 0)
2840
2841        # test a big-endian stream
2842        for reader in [BitstreamReader(temp, False),
2843                       BitstreamReader(temp_s, False),
2844                       BitstreamReader(data, False)]:
2845            table_be = HuffmanTree([[1, 1], 0,
2846                                    [1, 0], 1,
2847                                    [0, 1], 2,
2848                                    [0, 0, 1], 3,
2849                                    [0, 0, 0], 4], 0)
2850            self.__test_big_endian_reader__(reader, table_be)
2851            self.__test_try__(reader, table_be)
2852            self.__test_callbacks_reader__(reader, 14, 18, table_be, 14)
2853
2854        temp.seek(0, 0)
2855        temp_s.seek(0, 0)
2856
2857        # test a little-endian stream
2858        for reader in [BitstreamReader(temp, True),
2859                       BitstreamReader(temp_s, True),
2860                       BitstreamReader(data, True)]:
2861            table_le = HuffmanTree([[1, 1], 0,
2862                                    [1, 0], 1,
2863                                    [0, 1], 2,
2864                                    [0, 0, 1], 3,
2865                                    [0, 0, 0], 4], 1)
2866            self.__test_little_endian_reader__(reader, table_le)
2867            self.__test_try__(reader, table_le)
2868            self.__test_callbacks_reader__(reader, 14, 18, table_le, 13)
2869
2870        # pad the stream with some additional data at both ends
2871        data = b"\xFF" + b"\xFF" + data + b"\xFF" + b"\xFF"
2872
2873        temp.seek(0, 0)
2874        temp.write(data)
2875        temp.flush()
2876        temp.seek(0, 0)
2877
2878        temp_s = BytesIO()
2879        temp_s.write(data)
2880        temp_s.seek(0, 0)
2881
2882        # check a big-endian substream
2883        for reader in [BitstreamReader(temp, False),
2884                       BitstreamReader(temp_s, False),
2885                       BitstreamReader(data, False)]:
2886            start = reader.getpos()
2887
2888            reader.skip(16)
2889            subreader = reader.substream(4)
2890            self.__test_big_endian_reader__(subreader, table_be)
2891            self.__test_try__(subreader, table_be)
2892            self.__test_callbacks_reader__(subreader, 14, 18, table_be, 13)
2893
2894            # check a big-endian substream built from another substream
2895            reader.setpos(start)
2896            reader.skip(8)
2897            subreader1 = reader.substream(6)
2898            subreader1.skip(8)
2899            subreader2 = subreader.substream(4)
2900            self.__test_big_endian_reader__(subreader2, table_be)
2901            self.__test_try__(subreader2, table_be)
2902            self.__test_callbacks_reader__(subreader2, 14, 18, table_be, 13)
2903
2904        temp.seek(0, 0)
2905        temp_s.seek(0, 0)
2906
2907        # check a little-endian substream built from a file
2908        for reader in [BitstreamReader(temp, True),
2909                       BitstreamReader(temp_s, True),
2910                       BitstreamReader(data, True)]:
2911            start = reader.getpos()
2912
2913            reader.skip(16)
2914            subreader = reader.substream(4)
2915            self.__test_little_endian_reader__(subreader, table_le)
2916            self.__test_try__(subreader, table_le)
2917            self.__test_callbacks_reader__(subreader, 14, 18, table_le, 13)
2918
2919            # check a little-endian substream built from another substream
2920            reader.setpos(start)
2921            reader.skip(8)
2922            subreader1 = reader.substream(6)
2923            subreader1.skip(8)
2924            subreader2 = subreader.substream(4)
2925            self.__test_little_endian_reader__(subreader2, table_le)
2926            self.__test_try__(subreader2, table_le)
2927            self.__test_callbacks_reader__(subreader2, 14, 18, table_le, 13)
2928
2929        temp.close()
2930
2931        # test the writer functions with each endianness
2932        self.__test_writer__(0)
2933        self.__test_writer__(1)
2934
2935    def __test_edge_reader_be__(self, reader):
2936        start = reader.getpos()
2937
2938        # try the unsigned 32 and 64 bit values
2939        reader.setpos(start)
2940        self.assertEqual(reader.read(32), 0)
2941        self.assertEqual(reader.read(32), 4294967295)
2942        self.assertEqual(reader.read(32), 2147483648)
2943        self.assertEqual(reader.read(32), 2147483647)
2944        self.assertEqual(reader.read(64), 0)
2945        self.assertEqual(reader.read(64), 0xFFFFFFFFFFFFFFFF)
2946        self.assertEqual(reader.read(64), 9223372036854775808)
2947        self.assertEqual(reader.read(64), 9223372036854775807)
2948
2949        # try the signed 32 and 64 bit values
2950        reader.setpos(start)
2951        self.assertEqual(reader.read_signed(32), 0)
2952        self.assertEqual(reader.read_signed(32), -1)
2953        self.assertEqual(reader.read_signed(32), -2147483648)
2954        self.assertEqual(reader.read_signed(32), 2147483647)
2955        self.assertEqual(reader.read_signed(64), 0)
2956        self.assertEqual(reader.read_signed(64), -1)
2957        self.assertEqual(reader.read_signed(64), -9223372036854775808)
2958        self.assertEqual(reader.read_signed(64), 9223372036854775807)
2959
2960        # try the unsigned values via parse()
2961        reader.setpos(start)
2962        (u_val_1,
2963         u_val_2,
2964         u_val_3,
2965         u_val_4,
2966         u_val64_1,
2967         u_val64_2,
2968         u_val64_3,
2969         u_val64_4) = reader.parse("32u 32u 32u 32u 64U 64U 64U 64U")
2970        self.assertEqual(u_val_1, 0)
2971        self.assertEqual(u_val_2, 4294967295)
2972        self.assertEqual(u_val_3, 2147483648)
2973        self.assertEqual(u_val_4, 2147483647)
2974        self.assertEqual(u_val64_1, 0)
2975        self.assertEqual(u_val64_2, 0xFFFFFFFFFFFFFFFF)
2976        self.assertEqual(u_val64_3, 9223372036854775808)
2977        self.assertEqual(u_val64_4, 9223372036854775807)
2978
2979        # try the signed values via parse()
2980        reader.setpos(start)
2981        (s_val_1,
2982         s_val_2,
2983         s_val_3,
2984         s_val_4,
2985         s_val64_1,
2986         s_val64_2,
2987         s_val64_3,
2988         s_val64_4) = reader.parse("32s 32s 32s 32s 64S 64S 64S 64S")
2989        self.assertEqual(s_val_1, 0)
2990        self.assertEqual(s_val_2, -1)
2991        self.assertEqual(s_val_3, -2147483648)
2992        self.assertEqual(s_val_4, 2147483647)
2993        self.assertEqual(s_val64_1, 0)
2994        self.assertEqual(s_val64_2, -1)
2995        self.assertEqual(s_val64_3, -9223372036854775808)
2996        self.assertEqual(s_val64_4, 9223372036854775807)
2997
2998    def __test_edge_reader_le__(self, reader):
2999        start = reader.getpos()
3000
3001        # try the unsigned 32 and 64 bit values
3002        self.assertEqual(reader.read(32), 0)
3003        self.assertEqual(reader.read(32), 4294967295)
3004        self.assertEqual(reader.read(32), 2147483648)
3005        self.assertEqual(reader.read(32), 2147483647)
3006        self.assertEqual(reader.read(64), 0)
3007        self.assertEqual(reader.read(64), 0xFFFFFFFFFFFFFFFF)
3008        self.assertEqual(reader.read(64), 9223372036854775808)
3009        self.assertEqual(reader.read(64), 9223372036854775807)
3010
3011        # try the signed 32 and 64 bit values
3012        reader.setpos(start)
3013        self.assertEqual(reader.read_signed(32), 0)
3014        self.assertEqual(reader.read_signed(32), -1)
3015        self.assertEqual(reader.read_signed(32), -2147483648)
3016        self.assertEqual(reader.read_signed(32), 2147483647)
3017        self.assertEqual(reader.read_signed(64), 0)
3018        self.assertEqual(reader.read_signed(64), -1)
3019        self.assertEqual(reader.read_signed(64), -9223372036854775808)
3020        self.assertEqual(reader.read_signed(64), 9223372036854775807)
3021
3022        # try the unsigned values via parse()
3023        reader.setpos(start)
3024        (u_val_1,
3025         u_val_2,
3026         u_val_3,
3027         u_val_4,
3028         u_val64_1,
3029         u_val64_2,
3030         u_val64_3,
3031         u_val64_4) = reader.parse("32u 32u 32u 32u 64U 64U 64U 64U")
3032        self.assertEqual(u_val_1, 0)
3033        self.assertEqual(u_val_2, 4294967295)
3034        self.assertEqual(u_val_3, 2147483648)
3035        self.assertEqual(u_val_4, 2147483647)
3036        self.assertEqual(u_val64_1, 0)
3037        self.assertEqual(u_val64_2, 0xFFFFFFFFFFFFFFFF)
3038        self.assertEqual(u_val64_3, 9223372036854775808)
3039        self.assertEqual(u_val64_4, 9223372036854775807)
3040
3041        # try the signed values via parse()
3042        reader.setpos(start)
3043        (s_val_1,
3044         s_val_2,
3045         s_val_3,
3046         s_val_4,
3047         s_val64_1,
3048         s_val64_2,
3049         s_val64_3,
3050         s_val64_4) = reader.parse("32s 32s 32s 32s 64S 64S 64S 64S")
3051        self.assertEqual(s_val_1, 0)
3052        self.assertEqual(s_val_2, -1)
3053        self.assertEqual(s_val_3, -2147483648)
3054        self.assertEqual(s_val_4, 2147483647)
3055        self.assertEqual(s_val64_1, 0)
3056        self.assertEqual(s_val64_2, -1)
3057        self.assertEqual(s_val64_3, -9223372036854775808)
3058        self.assertEqual(s_val64_4, 9223372036854775807)
3059
3060    def __test_edge_writer__(self, get_writer, validate_writer):
3061        # try the unsigned 32 and 64 bit values
3062        (writer, temp) = get_writer()
3063        writer.write(32, 0)
3064        writer.write(32, 4294967295)
3065        writer.write(32, 2147483648)
3066        writer.write(32, 2147483647)
3067        writer.write(64, 0)
3068        writer.write(64, 0xFFFFFFFFFFFFFFFF)
3069        writer.write(64, 9223372036854775808)
3070        writer.write(64, 9223372036854775807)
3071        validate_writer(writer, temp)
3072
3073        # try the signed 32 and 64 bit values
3074        (writer, temp) = get_writer()
3075        writer.write_signed(32, 0)
3076        writer.write_signed(32, -1)
3077        writer.write_signed(32, -2147483648)
3078        writer.write_signed(32, 2147483647)
3079        writer.write_signed(64, 0)
3080        writer.write_signed(64, -1)
3081        writer.write_signed(64, -9223372036854775808)
3082        writer.write_signed(64, 9223372036854775807)
3083        validate_writer(writer, temp)
3084
3085        # try the unsigned values via build()
3086        (writer, temp) = get_writer()
3087        u_val_1 = 0
3088        u_val_2 = 4294967295
3089        u_val_3 = 2147483648
3090        u_val_4 = 2147483647
3091        u_val64_1 = 0
3092        u_val64_2 = 0xFFFFFFFFFFFFFFFF
3093        u_val64_3 = 9223372036854775808
3094        u_val64_4 = 9223372036854775807
3095        writer.build("32u 32u 32u 32u 64u 64u 64u 64u",
3096                     [u_val_1, u_val_2, u_val_3, u_val_4,
3097                      u_val64_1, u_val64_2, u_val64_3, u_val64_4])
3098        validate_writer(writer, temp)
3099
3100        # try the signed values via build()
3101        (writer, temp) = get_writer()
3102        s_val_1 = 0
3103        s_val_2 = -1
3104        s_val_3 = -2147483648
3105        s_val_4 = 2147483647
3106        s_val64_1 = 0
3107        s_val64_2 = -1
3108        s_val64_3 = -9223372036854775808
3109        s_val64_4 = 9223372036854775807
3110        writer.build("32s 32s 32s 32s 64s 64s 64s 64s",
3111                     [s_val_1, s_val_2, s_val_3, s_val_4,
3112                      s_val64_1, s_val64_2, s_val64_3, s_val64_4])
3113        validate_writer(writer, temp)
3114
3115    def __get_edge_writer_be__(self):
3116        from audiotools.bitstream import BitstreamWriter
3117
3118        temp_file = tempfile.NamedTemporaryFile()
3119        return (BitstreamWriter(open(temp_file.name, "wb"), False), temp_file)
3120
3121    def __validate_edge_writer_be__(self, writer, temp_file):
3122        writer.close()
3123
3124        with open(temp_file.name, "rb") as f:
3125            self.assertEqual(
3126                f.read(),
3127                ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
3128                               128, 0, 0, 0, 127, 255, 255, 255,
3129                               0, 0, 0, 0, 0, 0, 0, 0,
3130                               255, 255, 255, 255, 255, 255, 255, 255,
3131                               128, 0, 0, 0, 0, 0, 0, 0,
3132                               127, 255, 255, 255, 255, 255, 255, 255]))
3133
3134        temp_file.close()
3135
3136    def __get_edge_recorder_be__(self):
3137        from audiotools.bitstream import BitstreamRecorder
3138
3139        return (BitstreamRecorder(0), tempfile.NamedTemporaryFile())
3140
3141    def __validate_edge_recorder_be__(self, writer, temp_file):
3142        from audiotools.bitstream import BitstreamWriter
3143
3144        writer2 = BitstreamWriter(open(temp_file.name, "wb"), False)
3145        writer.copy(writer2)
3146        writer2.close()
3147
3148        with open(temp_file.name, "rb") as f:
3149            self.assertEqual(
3150                f.read(),
3151                ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
3152                               128, 0, 0, 0, 127, 255, 255, 255,
3153                               0, 0, 0, 0, 0, 0, 0, 0,
3154                               255, 255, 255, 255, 255, 255, 255, 255,
3155                               128, 0, 0, 0, 0, 0, 0, 0,
3156                               127, 255, 255, 255, 255, 255, 255, 255]))
3157
3158        temp_file.close()
3159
3160    def __validate_edge_accumulator_be__(self, writer, temp_file):
3161        self.assertEqual(writer.bits(), 48 * 8)
3162
3163    def __get_edge_writer_le__(self):
3164        from audiotools.bitstream import BitstreamWriter
3165
3166        temp_file = tempfile.NamedTemporaryFile()
3167        return (BitstreamWriter(open(temp_file.name, "wb"), True), temp_file)
3168
3169    def __validate_edge_writer_le__(self, writer, temp_file):
3170        writer.close()
3171
3172        with open(temp_file.name, "rb") as f:
3173            self.assertEqual(
3174                f.read(),
3175                ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
3176                               0, 0, 0, 128, 255, 255, 255, 127,
3177                               0, 0, 0, 0, 0, 0, 0, 0,
3178                               255, 255, 255, 255, 255, 255, 255, 255,
3179                               0, 0, 0, 0, 0, 0, 0, 128,
3180                               255, 255, 255, 255, 255, 255, 255, 127]))
3181
3182        temp_file.close()
3183
3184    def __get_edge_recorder_le__(self):
3185        from audiotools.bitstream import BitstreamRecorder
3186
3187        return (BitstreamRecorder(1), tempfile.NamedTemporaryFile())
3188
3189    def __validate_edge_recorder_le__(self, writer, temp_file):
3190        from audiotools.bitstream import BitstreamWriter
3191
3192        writer2 = BitstreamWriter(open(temp_file.name, "wb"), True)
3193        writer.copy(writer2)
3194        writer2.close()
3195
3196        with open(temp_file.name, "rb") as f:
3197            self.assertEqual(
3198                f.read(),
3199                ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
3200                               0, 0, 0, 128, 255, 255, 255, 127,
3201                               0, 0, 0, 0, 0, 0, 0, 0,
3202                               255, 255, 255, 255, 255, 255, 255, 255,
3203                               0, 0, 0, 0, 0, 0, 0, 128,
3204                               255, 255, 255, 255, 255, 255, 255, 127]))
3205
3206        temp_file.close()
3207
3208    def __validate_edge_accumulator_le__(self, writer, temp_file):
3209        self.assertEqual(writer.bits(), 48 * 8)
3210
3211    def __test_writer__(self, endianness):
3212        from audiotools.bitstream import BitstreamWriter
3213        from audiotools.bitstream import BitstreamRecorder
3214
3215        checks = [self.__writer_perform_write__,
3216                  self.__writer_perform_write_signed__,
3217                  self.__writer_perform_write_unary_0__,
3218                  self.__writer_perform_write_unary_1__,
3219                  self.__writer_perform_write_huffman__]
3220
3221        # perform file-based checks
3222        for check in checks:
3223            temp = tempfile.NamedTemporaryFile()
3224            try:
3225                writer = BitstreamWriter(open(temp.name, "wb"), endianness)
3226                check(writer, endianness)
3227                writer.close()
3228                self.__check_output_file__(temp)
3229            finally:
3230                temp.close()
3231
3232            data = BytesIO()
3233            writer = BitstreamWriter(data, endianness)
3234            check(writer, endianness)
3235            del(writer)
3236            self.assertEqual(data.getvalue(), b"\xB1\xED\x3B\xC1")
3237
3238        # perform recorder-based checks
3239        for check in checks:
3240            temp = tempfile.NamedTemporaryFile()
3241            try:
3242                writer = BitstreamWriter(open(temp.name, "wb"), endianness)
3243                recorder = BitstreamRecorder(endianness)
3244                check(recorder, endianness)
3245                recorder.copy(writer)
3246                writer.close()
3247                self.__check_output_file__(temp)
3248                self.assertEqual(recorder.bits(), 32)
3249            finally:
3250                temp.close()
3251
3252        # check swap records
3253        temp = tempfile.NamedTemporaryFile()
3254        try:
3255            writer = BitstreamWriter(open(temp.name, "wb"), endianness)
3256            recorder1 = BitstreamRecorder(endianness)
3257            recorder2 = BitstreamRecorder(endianness)
3258            recorder2.write(8, 0xB1)
3259            recorder2.write(8, 0xED)
3260            recorder1.write(8, 0x3B)
3261            recorder1.write(8, 0xC1)
3262            recorder1.swap(recorder2)
3263            recorder1.copy(writer)
3264            recorder2.copy(writer)
3265            writer.close()
3266            self.__check_output_file__(temp)
3267        finally:
3268            temp.close()
3269
3270        # check recorder reset
3271        temp = tempfile.NamedTemporaryFile()
3272        try:
3273            writer = BitstreamWriter(open(temp.name, "wb"), endianness)
3274            recorder = BitstreamRecorder(endianness)
3275            recorder.write(8, 0xAA)
3276            recorder.write(8, 0xBB)
3277            recorder.write(8, 0xCC)
3278            recorder.write(8, 0xDD)
3279            recorder.write(8, 0xEE)
3280            recorder.reset()
3281            recorder.write(8, 0xB1)
3282            recorder.write(8, 0xED)
3283            recorder.write(8, 0x3B)
3284            recorder.write(8, 0xC1)
3285            recorder.copy(writer)
3286            writer.close()
3287            self.__check_output_file__(temp)
3288        finally:
3289            temp.close()
3290
3291        # check endianness setting
3292        # FIXME
3293
3294        # check a file-based byte-align
3295        # FIXME
3296
3297        # check a recorder-based byte-align
3298        # FIXME
3299
3300        # check an accumulator-based byte-align
3301        # FIXME
3302
3303        # check a partial dump
3304        # FIXME
3305
3306        # check that recorder->recorder->file works
3307        for check in checks:
3308            temp = tempfile.NamedTemporaryFile()
3309            try:
3310                writer = BitstreamWriter(open(temp.name, "wb"), endianness)
3311                recorder1 = BitstreamRecorder(endianness)
3312                recorder2 = BitstreamRecorder(endianness)
3313                self.assertEqual(recorder1.bits(), 0)
3314                self.assertEqual(recorder2.bits(), 0)
3315                check(recorder2, endianness)
3316                self.assertEqual(recorder1.bits(), 0)
3317                self.assertEqual(recorder2.bits(), 32)
3318                recorder2.copy(recorder1)
3319                self.assertEqual(recorder1.bits(), 32)
3320                self.assertEqual(recorder2.bits(), 32)
3321                recorder1.copy(writer)
3322                writer.close()
3323                self.__check_output_file__(temp)
3324            finally:
3325                temp.close()
3326
3327    def __writer_perform_write__(self, writer, endianness):
3328        if endianness == 0:
3329            writer.write(2, 2)
3330            writer.write(3, 6)
3331            writer.write(5, 7)
3332            writer.write(3, 5)
3333            writer.write(19, 342977)
3334        else:
3335            writer.write(2, 1)
3336            writer.write(3, 4)
3337            writer.write(5, 13)
3338            writer.write(3, 3)
3339            writer.write(19, 395743)
3340
3341        self.assertRaises(ValueError, writer.write, -1, 0)
3342
3343    def __writer_perform_write_signed__(self, writer, endianness):
3344        if endianness == 0:
3345            writer.write_signed(2, -2)
3346            writer.write_signed(3, -2)
3347            writer.write_signed(5, 7)
3348            writer.write_signed(3, -3)
3349            writer.write_signed(19, -181311)
3350        else:
3351            writer.write_signed(2, 1)
3352            writer.write_signed(3, -4)
3353            writer.write_signed(5, 13)
3354            writer.write_signed(3, 3)
3355            writer.write_signed(19, -128545)
3356
3357        self.assertRaises(ValueError, writer.write_signed, -1, 0)
3358
3359    def __writer_perform_write_unary_0__(self, writer, endianness):
3360        if endianness == 0:
3361            writer.unary(0, 1)
3362            writer.unary(0, 2)
3363            writer.unary(0, 0)
3364            writer.unary(0, 0)
3365            writer.unary(0, 4)
3366            writer.unary(0, 2)
3367            writer.unary(0, 1)
3368            writer.unary(0, 0)
3369            writer.unary(0, 3)
3370            writer.unary(0, 4)
3371            writer.unary(0, 0)
3372            writer.unary(0, 0)
3373            writer.unary(0, 0)
3374            writer.unary(0, 0)
3375            writer.write(1, 1)
3376        else:
3377            writer.unary(0, 1)
3378            writer.unary(0, 0)
3379            writer.unary(0, 0)
3380            writer.unary(0, 2)
3381            writer.unary(0, 2)
3382            writer.unary(0, 2)
3383            writer.unary(0, 5)
3384            writer.unary(0, 3)
3385            writer.unary(0, 0)
3386            writer.unary(0, 1)
3387            writer.unary(0, 0)
3388            writer.unary(0, 0)
3389            writer.unary(0, 0)
3390            writer.unary(0, 0)
3391            writer.write(2, 3)
3392
3393    def __writer_perform_write_unary_1__(self, writer, endianness):
3394        if endianness == 0:
3395            writer.unary(1, 0)
3396            writer.unary(1, 1)
3397            writer.unary(1, 0)
3398            writer.unary(1, 3)
3399            writer.unary(1, 0)
3400            writer.unary(1, 0)
3401            writer.unary(1, 0)
3402            writer.unary(1, 1)
3403            writer.unary(1, 0)
3404            writer.unary(1, 1)
3405            writer.unary(1, 2)
3406            writer.unary(1, 0)
3407            writer.unary(1, 0)
3408            writer.unary(1, 1)
3409            writer.unary(1, 0)
3410            writer.unary(1, 0)
3411            writer.unary(1, 0)
3412            writer.unary(1, 5)
3413        else:
3414            writer.unary(1, 0)
3415            writer.unary(1, 3)
3416            writer.unary(1, 0)
3417            writer.unary(1, 1)
3418            writer.unary(1, 0)
3419            writer.unary(1, 1)
3420            writer.unary(1, 0)
3421            writer.unary(1, 1)
3422            writer.unary(1, 0)
3423            writer.unary(1, 0)
3424            writer.unary(1, 0)
3425            writer.unary(1, 0)
3426            writer.unary(1, 1)
3427            writer.unary(1, 0)
3428            writer.unary(1, 0)
3429            writer.unary(1, 2)
3430            writer.unary(1, 5)
3431            writer.unary(1, 0)
3432
3433    def __writer_perform_write_huffman__(self, writer, endianness):
3434        from audiotools.bitstream import HuffmanTree
3435
3436        table = HuffmanTree([[1, 1], 0,
3437                             [1, 0], 1,
3438                             [0, 1], 2,
3439                             [0, 0, 1], 3,
3440                             [0, 0, 0], 4], endianness)
3441
3442        if endianness == 0:
3443            writer.write_huffman_code(table, 1)
3444            writer.write_huffman_code(table, 0)
3445            writer.write_huffman_code(table, 4)
3446            writer.write_huffman_code(table, 0)
3447            writer.write_huffman_code(table, 0)
3448            writer.write_huffman_code(table, 2)
3449            writer.write_huffman_code(table, 1)
3450            writer.write_huffman_code(table, 1)
3451            writer.write_huffman_code(table, 2)
3452            writer.write_huffman_code(table, 0)
3453            writer.write_huffman_code(table, 2)
3454            writer.write_huffman_code(table, 0)
3455            writer.write_huffman_code(table, 1)
3456            writer.write_huffman_code(table, 4)
3457            writer.write_huffman_code(table, 2)
3458        else:
3459            writer.write_huffman_code(table, 1)
3460            writer.write_huffman_code(table, 3)
3461            writer.write_huffman_code(table, 1)
3462            writer.write_huffman_code(table, 0)
3463            writer.write_huffman_code(table, 2)
3464            writer.write_huffman_code(table, 1)
3465            writer.write_huffman_code(table, 0)
3466            writer.write_huffman_code(table, 0)
3467            writer.write_huffman_code(table, 1)
3468            writer.write_huffman_code(table, 0)
3469            writer.write_huffman_code(table, 1)
3470            writer.write_huffman_code(table, 2)
3471            writer.write_huffman_code(table, 4)
3472            writer.write_huffman_code(table, 3)
3473            writer.write(1, 1)
3474
3475    def __check_output_file__(self, temp_file):
3476        with open(temp_file.name, "rb") as f:
3477            self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1")
3478
3479    @LIB_BITSTREAM
3480    def test_read_errors(self):
3481        from audiotools.bitstream import BitstreamReader
3482
3483        for little_endian in [False, True]:
3484            for reader in [BitstreamReader(BytesIO(b"a" * 10),
3485                                           little_endian),
3486                           BitstreamReader(BytesIO(b"a" * 10),
3487                                           little_endian).substream(5)]:
3488                # reading negative number of bits shouldn't work
3489                self.assertRaises(ValueError,
3490                                  reader.read,
3491                                  -1)
3492
3493                self.assertRaises(ValueError,
3494                                  reader.read_signed,
3495                                  -1)
3496
3497                # reading signed value in 0 bits shouldn't work
3498                self.assertRaises(ValueError,
3499                                  reader.read_signed,
3500                                  0)
3501
3502                self.assertRaises(ValueError,
3503                                  reader.parse,
3504                                  "0s")
3505
3506                # reading unary with non 0/1 bit shouldn't work
3507                self.assertRaises(ValueError,
3508                                  reader.unary,
3509                                  3)
3510
3511                self.assertRaises(ValueError,
3512                                  reader.unary,
3513                                  -1)
3514
3515    @LIB_BITSTREAM
3516    def test_write_errors(self):
3517        from audiotools.bitstream import BitstreamWriter
3518        from audiotools.bitstream import BitstreamRecorder
3519
3520        for little_endian in [False, True]:
3521            for writer in [BitstreamWriter(BytesIO(),
3522                                           little_endian),
3523                           BitstreamRecorder(little_endian)]:
3524                # writing negative number of bits shouldn't work
3525                self.assertRaises(ValueError,
3526                                  writer.write,
3527                                  -1, 0)
3528
3529                self.assertRaises(ValueError,
3530                                  writer.write_signed,
3531                                  -1, 0)
3532
3533                # writing signed value in 0 bits shouldn't work
3534                self.assertRaises(ValueError,
3535                                  writer.write_signed,
3536                                  0, 0)
3537
3538                self.assertRaises(ValueError,
3539                                  writer.build,
3540                                  "0s", [0])
3541
3542                # writing negative value as unsigned shouldn't work
3543                self.assertRaises(ValueError,
3544                                  writer.write,
3545                                  8, -1)
3546
3547                # write unsigned value that's too large shouldn't work
3548                self.assertRaises(ValueError,
3549                                  writer.write,
3550                                  8, 2 ** 8)
3551
3552                # nor should it work from the .build method
3553                self.assertRaises(ValueError,
3554                                  writer.build,
3555                                  "8u", [-1])
3556                self.assertRaises(ValueError,
3557                                  writer.build,
3558                                  "8u", [2 ** 8])
3559
3560                # writing negative value that's too small shouldn't work
3561                self.assertRaises(ValueError,
3562                                  writer.write_signed,
3563                                  8, -(2 ** 8))
3564
3565                # writing signed value that's too large shouldn't work
3566                self.assertRaises(ValueError,
3567                                  writer.write_signed,
3568                                  8, 2 ** 8)
3569
3570                # nor should it work from the .build method
3571                self.assertRaises(ValueError,
3572                                  writer.build,
3573                                  "8s", [-(2 ** 8)])
3574                self.assertRaises(ValueError,
3575                                  writer.build,
3576                                  "8s", [2 ** 8])
3577
3578                # writing some value that's not a number shouldn't work
3579                self.assertRaises(TypeError,
3580                                  writer.write,
3581                                  8, "foo")
3582
3583                self.assertRaises(TypeError,
3584                                  writer.write_signed,
3585                                  8, "foo")
3586
3587                self.assertRaises(TypeError,
3588                                  writer.build,
3589                                  "8u", ["foo"])
3590
3591                self.assertRaises(TypeError,
3592                                  writer.build,
3593                                  "8s", ["foo"])
3594
3595                # writing unary with non 0/1 bit shouldn't work
3596                self.assertRaises(ValueError,
3597                                  writer.unary,
3598                                  3, 1)
3599
3600                self.assertRaises(ValueError,
3601                                  writer.unary,
3602                                  -1, 1)
3603
3604    @LIB_BITSTREAM
3605    def test_edge_cases(self):
3606        from audiotools.bitstream import BitstreamReader
3607
3608        with tempfile.NamedTemporaryFile() as temp:
3609            # write the temp file with a set of known big-endian data
3610            temp.write(ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
3611                                      128, 0, 0, 0, 127, 255, 255, 255,
3612                                      0, 0, 0, 0, 0, 0, 0, 0,
3613                                      255, 255, 255, 255, 255, 255, 255, 255,
3614                                      128, 0, 0, 0, 0, 0, 0, 0,
3615                                      127, 255, 255, 255, 255, 255, 255, 255]))
3616            temp.flush()
3617
3618            # ensure a big-endian reader reads the values correctly
3619            reader = BitstreamReader(open(temp.name, "rb"), False)
3620            self.__test_edge_reader_be__(reader)
3621            reader.close()
3622
3623            # ensure a big-endian sub-reader reads the values correctly
3624            reader = BitstreamReader(open(temp.name, "rb"), False)
3625            subreader = reader.substream(48)
3626            self.__test_edge_reader_be__(subreader)
3627            subreader.close()
3628            reader.close()
3629
3630        with tempfile.NamedTemporaryFile() as temp:
3631            # write the temp file with a collection of known little-endian data
3632            temp.write(ints_to_bytes([0, 0, 0, 0, 255, 255, 255, 255,
3633                                      0, 0, 0, 128, 255, 255, 255, 127,
3634                                      0, 0, 0, 0, 0, 0, 0, 0,
3635                                      255, 255, 255, 255, 255, 255, 255, 255,
3636                                      0, 0, 0, 0, 0, 0, 0, 128,
3637                                      255, 255, 255, 255, 255, 255, 255, 127]))
3638            temp.flush()
3639
3640            # ensure a little-endian reader reads the values correctly
3641            reader = BitstreamReader(open(temp.name, "rb"), True)
3642            self.__test_edge_reader_le__(reader)
3643            reader.close()
3644
3645            # ensure a little-endian sub-reader reads the values correctly
3646            reader = BitstreamReader(open(temp.name, "rb"), True)
3647            subreader = reader.substream(48)
3648            self.__test_edge_reader_be__(subreader)
3649            subreader.close()
3650            reader.close()
3651
3652        # test a bunch of big-endian values via the bitstream writer
3653        self.__test_edge_writer__(self.__get_edge_writer_be__,
3654                                  self.__validate_edge_writer_be__)
3655
3656        # test a bunch of big-endian values via the bitstream recorder
3657        self.__test_edge_writer__(self.__get_edge_recorder_be__,
3658                                  self.__validate_edge_recorder_be__)
3659
3660        # test a bunch of little-endian values via the bitstream writer
3661        self.__test_edge_writer__(self.__get_edge_writer_le__,
3662                                  self.__validate_edge_writer_le__)
3663
3664        # test a bunch of little-endian values via the bitstream recorder
3665        self.__test_edge_writer__(self.__get_edge_recorder_le__,
3666                                  self.__validate_edge_recorder_le__)
3667
3668    @LIB_BITSTREAM
3669    def test_huge_values(self):
3670        from audiotools.bitstream import BitstreamReader
3671        from audiotools.bitstream import BitstreamWriter
3672        from audiotools.bitstream import BitstreamRecorder
3673
3674        for b in [2, 4, 8, 16, 32, 64, 128, 256, 512]:
3675            data = os.urandom(b)
3676            bits = b * 8
3677            for little_endian in [False, True]:
3678                unsigned1 = BitstreamReader(
3679                    BytesIO(data),
3680                    little_endian).read(bits)
3681
3682                unsigned2 = BitstreamReader(
3683                    BytesIO(data),
3684                    little_endian).parse("%du" % (bits))[0]
3685
3686                signed1 = BitstreamReader(
3687                    BytesIO(data),
3688                    little_endian).read_signed(bits)
3689
3690                signed2 = BitstreamReader(
3691                    BytesIO(data),
3692                    little_endian).parse("%ds" % (bits))[0]
3693
3694                # check that reading from .read and .parse
3695                # yield the same values
3696                self.assertEqual(unsigned1, unsigned2)
3697                self.assertEqual(signed1, signed2)
3698
3699                # check that writing round-trips properly
3700                unsigned_data1 = BytesIO()
3701                unsigned_data2 = BytesIO()
3702                signed_data1 = BytesIO()
3703                signed_data2 = BytesIO()
3704
3705                BitstreamWriter(unsigned_data1,
3706                                little_endian).write(bits, unsigned1)
3707                BitstreamWriter(unsigned_data2,
3708                                little_endian).build("%du" % (bits),
3709                                                     [unsigned1])
3710                BitstreamWriter(signed_data1,
3711                                little_endian).write_signed(bits, signed1)
3712                BitstreamWriter(signed_data2,
3713                                little_endian).build("%ds" % (bits),
3714                                                     [signed1])
3715
3716                self.assertEqual(data, unsigned_data1.getvalue())
3717                self.assertEqual(data, unsigned_data2.getvalue())
3718                self.assertEqual(data, signed_data1.getvalue())
3719                self.assertEqual(data, signed_data2.getvalue())
3720
3721                unsigned_data1 = BitstreamRecorder(little_endian)
3722                unsigned_data2 = BitstreamRecorder(little_endian)
3723                signed_data1 = BitstreamRecorder(little_endian)
3724                signed_data2 = BitstreamRecorder(little_endian)
3725
3726                unsigned_data1.write(bits, unsigned1)
3727                unsigned_data2.build("%du" % (bits), [unsigned1])
3728                signed_data1.write_signed(bits, signed1)
3729                signed_data2.build("%ds" % (bits), [signed1])
3730
3731                self.assertEqual(data, unsigned_data1.data())
3732                self.assertEqual(data, unsigned_data2.data())
3733                self.assertEqual(data, signed_data1.data())
3734                self.assertEqual(data, signed_data2.data())
3735
3736                # check that endianness swapping works
3737                r = BitstreamReader(
3738                    BytesIO(data),
3739                    little_endian)
3740
3741                unsigned1 = r.read(bits // 2)
3742                r.set_endianness(not little_endian)
3743                unsigned2 = r.read(bits // 2)
3744
3745                new_data = BytesIO()
3746                w = BitstreamWriter(
3747                    new_data, little_endian)
3748                w.write(bits // 2, unsigned1)
3749                w.set_endianness(not little_endian)
3750                w.write(bits // 2, unsigned2)
3751                w.flush()
3752
3753                self.assertEqual(data, new_data.getvalue())
3754
3755                w = BitstreamRecorder(little_endian)
3756                w.write(bits // 2, unsigned1)
3757                w.set_endianness(not little_endian)
3758                w.write(bits // 2, unsigned2)
3759
3760                self.assertEqual(data, w.data())
3761
3762    @LIB_BITSTREAM
3763    def test_python_reader(self):
3764        from audiotools.bitstream import BitstreamReader
3765
3766        # Vanilla, file-based BitstreamReader uses a 1 character buffer
3767        # and relies on stdio to perform buffering which is fast enough.
3768        # Therefore, a byte-aligned file can be seek()ed at will.
3769        # However, making lots of read(1) calls on a Python object
3770        # is unacceptably slow.
3771        # Therefore, we read a 4KB string and pull individual bytes from
3772        # it as needed, which should keep performance reasonable.
3773        def new_temp1():
3774            temp = BytesIO()
3775            temp.write(b"\xB1")
3776            temp.write(b"\xED")
3777            temp.write(b"\x3B")
3778            temp.write(b"\xC1")
3779            temp.seek(0, 0)
3780            return temp
3781
3782        def new_temp2():
3783            return __SimpleChunkReader__([b"\xB1" +
3784                                          b"\xED" +
3785                                          b"\x3B" +
3786                                          b"\xC1"])
3787
3788        def new_temp3():
3789            return __SimpleChunkReader__([b"\xB1" +
3790                                          b"\xED",
3791                                          b"\x3B" +
3792                                          b"\xC1"])
3793
3794        def new_temp4():
3795            return __SimpleChunkReader__([b"\xB1",
3796                                          b"\xED",
3797                                          b"\x3B" +
3798                                          b"\xC1"])
3799
3800        def new_temp5():
3801            return __SimpleChunkReader__([b"\xB1",
3802                                          b"\xED",
3803                                          b"\x3B",
3804                                          b"\xC1"])
3805
3806        for new_temp in [new_temp1, new_temp2, new_temp3, new_temp4,
3807                         new_temp5]:
3808            # first, check the bitstream reader
3809            # against some simple known big-endian values
3810            bitstream = BitstreamReader(new_temp(), False)
3811
3812            self.assertEqual(bitstream.read(2), 2)
3813            self.assertEqual(bitstream.read(3), 6)
3814            self.assertEqual(bitstream.read(5), 7)
3815            self.assertEqual(bitstream.read(3), 5)
3816            self.assertEqual(bitstream.read(19), 342977)
3817
3818            bitstream = BitstreamReader(new_temp(), False)
3819            self.assertEqual(bitstream.read_signed(2), -2)
3820            self.assertEqual(bitstream.read_signed(3), -2)
3821            self.assertEqual(bitstream.read_signed(5), 7)
3822            self.assertEqual(bitstream.read_signed(3), -3)
3823            self.assertEqual(bitstream.read_signed(19), -181311)
3824
3825            bitstream = BitstreamReader(new_temp(), False)
3826            self.assertEqual(bitstream.unary(0), 1)
3827            self.assertEqual(bitstream.unary(0), 2)
3828            self.assertEqual(bitstream.unary(0), 0)
3829            self.assertEqual(bitstream.unary(0), 0)
3830            self.assertEqual(bitstream.unary(0), 4)
3831            bitstream.byte_align()
3832            bitstream = BitstreamReader(new_temp(), False)
3833            self.assertEqual(bitstream.unary(1), 0)
3834            self.assertEqual(bitstream.unary(1), 1)
3835            self.assertEqual(bitstream.unary(1), 0)
3836            self.assertEqual(bitstream.unary(1), 3)
3837            self.assertEqual(bitstream.unary(1), 0)
3838            bitstream.byte_align()
3839
3840            bitstream = BitstreamReader(new_temp(), False)
3841            pos = bitstream.getpos()
3842            self.assertTrue(bitstream.byte_aligned())
3843            for i in range(32):
3844                bit = bitstream.read(1)
3845                bitstream.unread(bit)
3846                re_read = bitstream.read(1)
3847                self.assertEqual(bit, re_read)
3848            self.assertTrue(bitstream.byte_aligned())
3849
3850            bitstream.setpos(pos)
3851            bitstream.unread(bitstream.read(1))
3852            self.assertTrue(bitstream.byte_aligned())
3853            self.assertEqual(bitstream.read_bytes(4), b"\xB1\xED\x3B\xC1")
3854            self.assertTrue(bitstream.byte_aligned())
3855
3856            bitstream = BitstreamReader(new_temp(), False)
3857            pos = bitstream.getpos()
3858            self.assertEqual(bitstream.read(4), 0xB)
3859            bitstream.setpos(pos)
3860            self.assertEqual(bitstream.read(8), 0xB1)
3861            bitstream.setpos(pos)
3862            self.assertEqual(bitstream.read(12), 0xB1E)
3863            pos = bitstream.getpos()
3864            self.assertEqual(bitstream.read(4), 0xD)
3865            bitstream.setpos(pos)
3866            self.assertEqual(bitstream.read(8), 0xD3)
3867            bitstream.setpos(pos)
3868            self.assertEqual(bitstream.read(12), 0xD3B)
3869
3870            del(bitstream)
3871            bitstream = BitstreamReader(new_temp(), False)
3872
3873            # then, check the bitstream reader
3874            # against some simple known little-endian values
3875            bitstream = BitstreamReader(new_temp(), True)
3876
3877            self.assertEqual(bitstream.read(2), 1)
3878            self.assertEqual(bitstream.read(3), 4)
3879            self.assertEqual(bitstream.read(5), 13)
3880            self.assertEqual(bitstream.read(3), 3)
3881            self.assertEqual(bitstream.read(19), 395743)
3882
3883            bitstream = BitstreamReader(new_temp(), True)
3884            self.assertEqual(bitstream.read_signed(2), 1)
3885            self.assertEqual(bitstream.read_signed(3), -4)
3886            self.assertEqual(bitstream.read_signed(5), 13)
3887            self.assertEqual(bitstream.read_signed(3), 3)
3888            self.assertEqual(bitstream.read_signed(19), -128545)
3889
3890            bitstream = BitstreamReader(new_temp(), True)
3891            self.assertEqual(bitstream.unary(0), 1)
3892            self.assertEqual(bitstream.unary(0), 0)
3893            self.assertEqual(bitstream.unary(0), 0)
3894            self.assertEqual(bitstream.unary(0), 2)
3895            self.assertEqual(bitstream.unary(0), 2)
3896            bitstream.byte_align()
3897            bitstream = BitstreamReader(new_temp(), True)
3898            self.assertEqual(bitstream.unary(1), 0)
3899            self.assertEqual(bitstream.unary(1), 3)
3900            self.assertEqual(bitstream.unary(1), 0)
3901            self.assertEqual(bitstream.unary(1), 1)
3902            self.assertEqual(bitstream.unary(1), 0)
3903            bitstream.byte_align()
3904
3905            bitstream = BitstreamReader(new_temp(), True)
3906            pos = bitstream.getpos()
3907            self.assertTrue(bitstream.byte_aligned())
3908            for i in range(32):
3909                bit = bitstream.read(1)
3910                bitstream.unread(bit)
3911                re_read = bitstream.read(1)
3912                self.assertEqual(bit, re_read)
3913            self.assertTrue(bitstream.byte_aligned())
3914
3915            bitstream.setpos(pos)
3916            bitstream.unread(bitstream.read(1))
3917            self.assertTrue(bitstream.byte_aligned())
3918            self.assertEqual(bitstream.read_bytes(4), b"\xB1\xED\x3B\xC1")
3919            self.assertTrue(bitstream.byte_aligned())
3920
3921            bitstream = BitstreamReader(new_temp(), True)
3922            pos = bitstream.getpos()
3923            self.assertEqual(bitstream.read(4), 0x1)
3924            bitstream.setpos(pos)
3925            self.assertEqual(bitstream.read(8), 0xB1)
3926            bitstream.setpos(pos)
3927            self.assertEqual(bitstream.read(12), 0xDB1)
3928            pos = bitstream.getpos()
3929            self.assertEqual(bitstream.read(4), 0xE)
3930            bitstream.setpos(pos)
3931            self.assertEqual(bitstream.read(8), 0xBE)
3932            bitstream.setpos(pos)
3933            self.assertEqual(bitstream.read(12), 0x3BE)
3934
3935    @LIB_BITSTREAM
3936    def test_simple_writer(self):
3937        from audiotools.bitstream import BitstreamWriter
3938
3939        with tempfile.NamedTemporaryFile() as temp:
3940            # first, have the bitstream writer generate
3941            # a set of known big-endian values
3942
3943            f = open(temp.name, "wb")
3944            bitstream = BitstreamWriter(f, False)
3945            bitstream.write(2, 2)
3946            bitstream.write(3, 6)
3947            bitstream.write(5, 7)
3948            bitstream.write(3, 5)
3949            bitstream.write(19, 342977)
3950            bitstream.flush()
3951            bitstream.close()
3952            with open(temp.name, "rb") as f:
3953                self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1")
3954
3955            f = open(temp.name, "wb")
3956            bitstream = BitstreamWriter(f, False)
3957            bitstream.write_signed(2, -2)
3958            bitstream.write_signed(3, -2)
3959            bitstream.write_signed(5, 7)
3960            bitstream.write_signed(3, -3)
3961            bitstream.write_signed(19, -181311)
3962            bitstream.flush()
3963            bitstream.close()
3964            with open(temp.name, "rb") as f:
3965                self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1")
3966
3967            f = open(temp.name, "wb")
3968            bitstream = BitstreamWriter(f, False)
3969            bitstream.unary(0, 1)
3970            bitstream.unary(0, 2)
3971            bitstream.unary(0, 0)
3972            bitstream.unary(0, 0)
3973            bitstream.unary(0, 4)
3974            bitstream.unary(0, 2)
3975            bitstream.unary(0, 1)
3976            bitstream.unary(0, 0)
3977            bitstream.unary(0, 3)
3978            bitstream.unary(0, 4)
3979            bitstream.unary(0, 0)
3980            bitstream.unary(0, 0)
3981            bitstream.unary(0, 0)
3982            bitstream.unary(0, 0)
3983            bitstream.write(1, 1)
3984            bitstream.flush()
3985            bitstream.close()
3986            with open(temp.name, "rb") as f:
3987                self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1")
3988
3989            f = open(temp.name, "wb")
3990            bitstream = BitstreamWriter(f, False)
3991            bitstream.unary(1, 0)
3992            bitstream.unary(1, 1)
3993            bitstream.unary(1, 0)
3994            bitstream.unary(1, 3)
3995            bitstream.unary(1, 0)
3996            bitstream.unary(1, 0)
3997            bitstream.unary(1, 0)
3998            bitstream.unary(1, 1)
3999            bitstream.unary(1, 0)
4000            bitstream.unary(1, 1)
4001            bitstream.unary(1, 2)
4002            bitstream.unary(1, 0)
4003            bitstream.unary(1, 0)
4004            bitstream.unary(1, 1)
4005            bitstream.unary(1, 0)
4006            bitstream.unary(1, 0)
4007            bitstream.unary(1, 0)
4008            bitstream.unary(1, 5)
4009            bitstream.flush()
4010            bitstream.close()
4011            with open(temp.name, "rb") as f:
4012                self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1")
4013
4014            # then, have the bitstream writer generate
4015            # a set of known little-endian values
4016            f = open(temp.name, "wb")
4017            bitstream = BitstreamWriter(f, True)
4018            bitstream.write(2, 1)
4019            bitstream.write(3, 4)
4020            bitstream.write(5, 13)
4021            bitstream.write(3, 3)
4022            bitstream.write(19, 395743)
4023            bitstream.flush()
4024            bitstream.close()
4025            with open(temp.name, "rb") as f:
4026                self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1")
4027
4028            f = open(temp.name, "wb")
4029            bitstream = BitstreamWriter(f, True)
4030            bitstream.write_signed(2, 1)
4031            bitstream.write_signed(3, -4)
4032            bitstream.write_signed(5, 13)
4033            bitstream.write_signed(3, 3)
4034            bitstream.write_signed(19, -128545)
4035            bitstream.flush()
4036            bitstream.close()
4037            with open(temp.name, "rb") as f:
4038                self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1")
4039
4040            f = open(temp.name, "wb")
4041            bitstream = BitstreamWriter(f, True)
4042            bitstream.unary(0, 1)
4043            bitstream.unary(0, 0)
4044            bitstream.unary(0, 0)
4045            bitstream.unary(0, 2)
4046            bitstream.unary(0, 2)
4047            bitstream.unary(0, 2)
4048            bitstream.unary(0, 5)
4049            bitstream.unary(0, 3)
4050            bitstream.unary(0, 0)
4051            bitstream.unary(0, 1)
4052            bitstream.unary(0, 0)
4053            bitstream.unary(0, 0)
4054            bitstream.unary(0, 0)
4055            bitstream.unary(0, 0)
4056            bitstream.write(2, 3)
4057            bitstream.flush()
4058            bitstream.close()
4059            with open(temp.name, "rb") as f:
4060                self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1")
4061
4062            f = open(temp.name, "wb")
4063            bitstream = BitstreamWriter(f, True)
4064            bitstream.unary(1, 0)
4065            bitstream.unary(1, 3)
4066            bitstream.unary(1, 0)
4067            bitstream.unary(1, 1)
4068            bitstream.unary(1, 0)
4069            bitstream.unary(1, 1)
4070            bitstream.unary(1, 0)
4071            bitstream.unary(1, 1)
4072            bitstream.unary(1, 0)
4073            bitstream.unary(1, 0)
4074            bitstream.unary(1, 0)
4075            bitstream.unary(1, 0)
4076            bitstream.unary(1, 1)
4077            bitstream.unary(1, 0)
4078            bitstream.unary(1, 0)
4079            bitstream.unary(1, 2)
4080            bitstream.unary(1, 5)
4081            bitstream.unary(1, 0)
4082            bitstream.flush()
4083            bitstream.close()
4084            with open(temp.name, "rb") as f:
4085                self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1")
4086
4087            f = open(temp.name, "wb")
4088            bitstream = BitstreamWriter(f, True)
4089            bitstream.write(4, 0x1)
4090            bitstream.byte_align()
4091            bitstream.write(4, 0xD)
4092            bitstream.byte_align()
4093            bitstream.flush()
4094            bitstream.close()
4095            with open(temp.name, "rb") as f:
4096                self.assertEqual(f.read(), b"\x01\x0D")
4097
4098    # and have the bitstream reader check those values are accurate
4099
4100    @LIB_BITSTREAM
4101    def test_reader_close(self):
4102        from audiotools.bitstream import BitstreamReader, HuffmanTree
4103
4104        def test_reader(reader):
4105            self.assertRaises(IOError, reader.read, 1)
4106            self.assertRaises(IOError, reader.skip, 3)
4107            self.assertRaises(IOError, reader.skip_bytes, 1)
4108            self.assertRaises(IOError, reader.read_signed, 2)
4109            self.assertRaises(IOError, reader.unary, 1)
4110            self.assertRaises(IOError, reader.read_bytes, 1)
4111            self.assertRaises(IOError, reader.parse, "1b2b3b")
4112            self.assertRaises(IOError, reader.substream, 2)
4113            self.assertRaises(IOError, reader.read_huffman_code,
4114                              HuffmanTree([(1, ),     1,
4115                                           (0, 1),    2,
4116                                           (0, 0, 1), 3,
4117                                           (0, 0, 0), 4], False))
4118
4119        def new_temp():
4120            temp = BytesIO()
4121            temp.write(b"\xB1")
4122            temp.write(b"\xED")
4123            temp.write(b"\x3B")
4124            temp.write(b"\xC1")
4125            temp.seek(0, 0)
4126            return temp
4127
4128        # test a BitstreamReader from a Python file object
4129        f = open("test_core.py", "rb")
4130
4131        reader = BitstreamReader(f, False)
4132        reader.close()
4133        test_reader(reader)
4134        reader.set_endianness(1)
4135        test_reader(reader)
4136
4137        reader = BitstreamReader(f, True)
4138        reader.close()
4139        test_reader(reader)
4140        reader.set_endianness(0)
4141        test_reader(reader)
4142
4143        f.close()
4144        del(f)
4145
4146        # test a BitstreamReader from a Python BytesIO object
4147        reader = BitstreamReader(new_temp(), False)
4148        reader.close()
4149        test_reader(reader)
4150        reader.set_endianness(1)
4151        test_reader(reader)
4152
4153        reader = BitstreamReader(new_temp(), True)
4154        reader.close()
4155        test_reader(reader)
4156        reader.set_endianness(0)
4157        test_reader(reader)
4158
4159    @LIB_BITSTREAM
4160    def test_reader_context(self):
4161        from io import BytesIO
4162        from audiotools.bitstream import BitstreamReader
4163
4164        b = BytesIO(b"\xB1\xED\x3B\xC1")
4165        self.assertFalse(b.closed)
4166        r = BitstreamReader(b, False)
4167        self.assertEqual(r.read(2), 0x2)
4168        self.assertEqual(r.read(3), 0x6)
4169        self.assertEqual(r.read(5), 0x07)
4170        self.assertEqual(r.read(3), 0x5)
4171        self.assertEqual(r.read(19), 0x53BC1)
4172        self.assertRaises(IOError, r.read, 8)
4173        self.assertFalse(b.closed)
4174        del(r)
4175        self.assertFalse(b.closed)
4176
4177        b = BytesIO(b"\xB1\xED\x3B\xC1")
4178        self.assertFalse(b.closed)
4179        r = BitstreamReader(b, True)
4180        self.assertEqual(r.read(2), 0x1)
4181        self.assertEqual(r.read(3), 0x4)
4182        self.assertEqual(r.read(5), 0x0D)
4183        self.assertEqual(r.read(3), 0x3)
4184        self.assertEqual(r.read(19), 0x609DF)
4185        self.assertRaises(IOError, r.read, 8)
4186        self.assertFalse(b.closed)
4187        del(r)
4188        self.assertFalse(b.closed)
4189
4190        b = BytesIO(b"\xB1\xED\x3B\xC1")
4191        self.assertFalse(b.closed)
4192        with BitstreamReader(b, False) as r:
4193            self.assertEqual(r.read(2), 0x2)
4194            self.assertEqual(r.read(3), 0x6)
4195            self.assertEqual(r.read(5), 0x07)
4196            self.assertEqual(r.read(3), 0x5)
4197            self.assertEqual(r.read(19), 0x53BC1)
4198            self.assertRaises(IOError, r.read, 8)
4199            self.assertFalse(b.closed)
4200        self.assertTrue(b.closed)
4201
4202        b = BytesIO(b"\xB1\xED\x3B\xC1")
4203        self.assertFalse(b.closed)
4204        with BitstreamReader(b, True) as r:
4205            self.assertEqual(r.read(2), 0x1)
4206            self.assertEqual(r.read(3), 0x4)
4207            self.assertEqual(r.read(5), 0x0D)
4208            self.assertEqual(r.read(3), 0x3)
4209            self.assertEqual(r.read(19), 0x609DF)
4210            self.assertRaises(IOError, r.read, 8)
4211            self.assertFalse(b.closed)
4212        self.assertTrue(b.closed)
4213
4214    @LIB_BITSTREAM
4215    def test_writer_close(self):
4216        from audiotools.bitstream import BitstreamWriter
4217        from audiotools.bitstream import BitstreamRecorder
4218
4219        def test_writer(writer):
4220            self.assertRaises(IOError, writer.write, 1, 1)
4221            self.assertRaises(IOError, writer.write_signed, 2, 1)
4222            self.assertRaises(IOError, writer.unary, 1, 1)
4223            self.assertRaises(IOError, writer.write_bytes, b"foo")
4224            self.assertRaises(IOError, writer.build, "1u2u3u", [0, 1, 2])
4225
4226        # test a BitstreamWriter to a Python file object
4227        f = open("test.bin", "wb")
4228        try:
4229            writer = BitstreamWriter(f, False)
4230            writer.close()
4231            test_writer(writer)
4232            writer.set_endianness(1)
4233            test_writer(writer)
4234            f.close()
4235            del(f)
4236        finally:
4237            os.unlink("test.bin")
4238
4239        f = open("test.bin", "wb")
4240        try:
4241            writer = BitstreamWriter(f, True)
4242            writer.close()
4243            test_writer(writer)
4244            writer.set_endianness(0)
4245            test_writer(writer)
4246            f.close()
4247            del(f)
4248        finally:
4249            os.unlink("test.bin")
4250
4251        # test a BitstreamWriter to a Python BytesIO object
4252        s = BytesIO()
4253        writer = BitstreamWriter(s, False)
4254        writer.close()
4255        test_writer(writer)
4256        writer.set_endianness(1)
4257        test_writer(writer)
4258        del(writer)
4259        del(s)
4260
4261        s = BytesIO()
4262        writer = BitstreamWriter(s, True)
4263        writer.close()
4264        test_writer(writer)
4265        writer.set_endianness(0)
4266        test_writer(writer)
4267        del(writer)
4268        del(s)
4269
4270        # test a BitstreamRecorder
4271        writer = BitstreamRecorder(0)
4272        writer.close()
4273        test_writer(writer)
4274        writer.set_endianness(1)
4275        test_writer(writer)
4276        del(writer)
4277
4278        writer = BitstreamRecorder(1)
4279        writer.close()
4280        test_writer(writer)
4281        writer.set_endianness(0)
4282        test_writer(writer)
4283        del(writer)
4284
4285    @LIB_BITSTREAM
4286    def test_writer_context(self):
4287        from io import BytesIO
4288        from audiotools.bitstream import BitstreamWriter
4289        from audiotools.bitstream import BitstreamRecorder
4290
4291        # if simply deallocated, writers should flush contents
4292        # but not close internal stream
4293        b = BytesIO()
4294        self.assertFalse(b.closed)
4295        w = BitstreamWriter(b, False)
4296        w.write(2, 0x2)
4297        w.write(3, 0x6)
4298        w.write(5, 0x07)
4299        w.write(3, 0x5)
4300        w.write(19, 0x53BC1)
4301        del(w)
4302        self.assertFalse(b.closed)
4303        self.assertEqual(b.getvalue(), b"\xB1\xED\x3B\xC1")
4304
4305        b = BytesIO()
4306        self.assertFalse(b.closed)
4307        w = BitstreamWriter(b, True)
4308        w.write(2, 0x1)
4309        w.write(3, 0x4)
4310        w.write(5, 0x0D)
4311        w.write(3, 0x3)
4312        w.write(19, 0x609DF)
4313        del(w)
4314        self.assertFalse(b.closed)
4315        self.assertEqual(b.getvalue(), b"\xB1\xED\x3B\xC1")
4316
4317        # if put into a context manager, writers should flush contents
4318        # but also close internal stream
4319        b = open("test.bin", "wb")
4320        try:
4321            self.assertFalse(b.closed)
4322            with BitstreamWriter(b, False) as w:
4323                w.write(2, 0x2)
4324                w.write(3, 0x6)
4325                w.write(5, 0x07)
4326                w.write(3, 0x5)
4327                w.write(19, 0x53BC1)
4328                self.assertFalse(b.closed)
4329            self.assertTrue(b.closed)
4330            with open("test.bin", "rb") as f:
4331                self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1")
4332        finally:
4333            os.unlink("test.bin")
4334
4335        b = open("test.bin", "wb")
4336        try:
4337            self.assertFalse(b.closed)
4338            with BitstreamWriter(b, True) as w:
4339                w.write(2, 0x1)
4340                w.write(3, 0x4)
4341                w.write(5, 0x0D)
4342                w.write(3, 0x3)
4343                w.write(19, 0x609DF)
4344                self.assertFalse(b.closed)
4345            self.assertTrue(b.closed)
4346            with open("test.bin", "rb") as f:
4347                self.assertEqual(f.read(), b"\xB1\xED\x3B\xC1")
4348        finally:
4349            os.unlink("test.bin")
4350
4351        # recorders should work in a context manager
4352        # even if it's not particularly useful
4353        with BitstreamRecorder(False) as w:
4354            w.write(2, 0x2)
4355            w.write(3, 0x6)
4356            w.write(5, 0x07)
4357            w.write(3, 0x5)
4358            w.write(19, 0x53BC1)
4359            self.assertEqual(w.data(), b"\xB1\xED\x3B\xC1")
4360
4361        with BitstreamRecorder(True) as w:
4362            w.write(2, 0x1)
4363            w.write(3, 0x4)
4364            w.write(5, 0x0D)
4365            w.write(3, 0x3)
4366            w.write(19, 0x609DF)
4367            self.assertEqual(w.data(), b"\xB1\xED\x3B\xC1")
4368
4369    def __test_writer_marks__(self, writer):
4370        writer.write(1, 1)
4371        self.assertRaises(IOError, writer.getpos)
4372        writer.write(2, 3)
4373        self.assertRaises(IOError, writer.getpos)
4374        writer.write(3, 7)
4375        self.assertRaises(IOError, writer.getpos)
4376        writer.write(2, 3)
4377        pos = writer.getpos()
4378        writer.write(4, 15)
4379        self.assertRaises(IOError, writer.setpos, pos)
4380        writer.write(4, 15)
4381        writer.write(8, 0xFF)
4382        writer.setpos(pos)
4383        writer.write(8, 0)
4384
4385    @LIB_BITSTREAM
4386    def test_writer_marks(self):
4387        from audiotools.bitstream import BitstreamWriter
4388
4389        f = open("test.bin", "wb")
4390        try:
4391            writer = BitstreamWriter(f, False)
4392            self.__test_writer_marks__(writer)
4393            del(writer)
4394            f.close()
4395            with open("test.bin", "rb") as f:
4396                self.assertEqual(f.read(), b"\xFF\x00\xFF")
4397        finally:
4398            os.unlink("test.bin")
4399
4400        f = open("test.bin", "wb")
4401        try:
4402            writer = BitstreamWriter(f, True)
4403            self.__test_writer_marks__(writer)
4404            del(writer)
4405            f.close()
4406            with open("test.bin", "rb") as f:
4407                self.assertEqual(f.read(), b"\xFF\x00\xFF")
4408        finally:
4409            os.unlink("test.bin")
4410
4411        s = BytesIO()
4412        writer = BitstreamWriter(s, False)
4413        self.__test_writer_marks__(writer)
4414        del(writer)
4415        self.assertEqual(s.getvalue(), b"\xFF\x00\xFF")
4416
4417        s = BytesIO()
4418        writer = BitstreamWriter(s, True)
4419        self.__test_writer_marks__(writer)
4420        del(writer)
4421        self.assertEqual(s.getvalue(), b"\xFF\x00\xFF")
4422
4423
4424class TestReplayGain(unittest.TestCase):
4425    @LIB_REPLAYGAIN
4426    def test_basics(self):
4427        import audiotools.replaygain
4428        import audiotools.pcm
4429
4430        # check for invalid sample rate
4431        self.assertRaises(ValueError,
4432                          audiotools.replaygain.ReplayGain,
4433                          200000)
4434
4435        # check for no samples
4436        gain = audiotools.replaygain.ReplayGain(44100)
4437        self.assertRaises(ValueError, gain.title_gain)
4438        self.assertRaises(ValueError, gain.album_gain)
4439        self.assertEqual(gain.title_peak(), 0.0)
4440        self.assertEqual(gain.album_peak(), 0.0)
4441
4442    @LIB_REPLAYGAIN
4443    def test_valid_rates(self):
4444        import audiotools.replaygain
4445
4446        for sample_rate in [8000, 11025, 12000, 16000, 18900, 22050, 24000,
4447                            32000, 37800, 44100, 48000, 56000, 64000, 88200,
4448                            96000, 112000, 128000, 144000, 176400, 192000]:
4449            gain = audiotools.replaygain.ReplayGain(sample_rate)
4450            reader = test_streams.Simple_Sine(sample_rate * 2,
4451                                              sample_rate,
4452                                              0x4,
4453                                              16,
4454                                              (30000, sample_rate // 100))
4455            audiotools.transfer_data(reader.read, gain.update)
4456            title_gain = gain.title_gain()
4457            title_peak = gain.title_peak()
4458            gain.next_title()
4459            album_gain = gain.album_gain()
4460            album_peak = gain.album_peak()
4461            self.assertLess(title_gain, -4.0)
4462            self.assertGreater(title_peak, .90)
4463            self.assertEqual(title_gain, album_gain)
4464            self.assertEqual(title_peak, album_peak)
4465
4466    @LIB_REPLAYGAIN
4467    def test_pcm(self):
4468        import audiotools.replaygain
4469
4470        gain = audiotools.replaygain.ReplayGain(44100)
4471        audiotools.transfer_data(
4472            test_streams.Sine16_Stereo(44100, 44100,
4473                                       441.0, 0.50,
4474                                       4410.0, 0.49, 1.0).read,
4475            gain.update)
4476
4477        title_gain = gain.title_gain()
4478        title_peak = gain.title_peak()
4479        gain.next_title()
4480
4481        main_reader = test_streams.Sine16_Stereo(44100, 44100,
4482                                                 441.0, 0.50,
4483                                                 4410.0, 0.49, 1.0)
4484
4485        reader = audiotools.replaygain.ReplayGainReader(main_reader,
4486                                                        title_gain,
4487                                                        title_peak)
4488
4489        # read FrameLists from ReplayGainReader
4490        f = reader.read(4096)
4491        while len(f) > 0:
4492            f = reader.read(4096)
4493
4494        # ensure subsequent reads return empty FrameLists
4495        for i in range(10):
4496            self.assertEqual(len(reader.read(4096)), 0)
4497
4498        # ensure closing the ReplayGainReader raises ValueError
4499        # on subsequent reads
4500        reader.close()
4501
4502        self.assertRaises(ValueError, reader.read, 4096)
4503
4504        # ensure wrapped reader is also closed
4505        self.assertRaises(ValueError, main_reader.read, 4096)
4506
4507    @LIB_REPLAYGAIN
4508    def test_reader(self):
4509        import audiotools.replaygain
4510
4511        test_format = audiotools.WaveAudio
4512
4513        dummy1 = tempfile.NamedTemporaryFile(suffix="." + test_format.SUFFIX)
4514        dummy2 = tempfile.NamedTemporaryFile(suffix="." + test_format.SUFFIX)
4515        try:
4516            # build dummy file
4517            track1 = test_format.from_pcm(
4518                dummy1.name,
4519                test_streams.Sine16_Stereo(44100, 44100,
4520                                           441.0, 0.50,
4521                                           4410.0, 0.49, 1.0))
4522
4523            # calculate its ReplayGain
4524            gain = audiotools.replaygain.ReplayGain(track1.sample_rate())
4525            with track1.to_pcm() as pcm:
4526                audiotools.transfer_data(pcm.read, gain.update)
4527                title_gain = gain.title_gain()
4528                title_peak = gain.title_peak()
4529                gain.next_title()
4530
4531            # apply gain to dummy file
4532            track2 = test_format.from_pcm(
4533                dummy2.name,
4534                audiotools.replaygain.ReplayGainReader(track1.to_pcm(),
4535                                                       title_gain,
4536                                                       title_peak))
4537
4538            # ensure gain applied is quieter than without gain applied
4539            gain2 = audiotools.replaygain.ReplayGain(track1.sample_rate())
4540            with track2.to_pcm() as pcm:
4541                audiotools.transfer_data(pcm.read, gain2.update)
4542                title_gain2 = gain2.title_gain()
4543                title_peak2 = gain2.title_peak()
4544                gain2.next_title()
4545
4546            self.assertGreater(title_gain2, title_gain)
4547        finally:
4548            dummy1.close()
4549            dummy2.close()
4550
4551
4552class testcuesheet(unittest.TestCase):
4553    def setUp(self):
4554        from audiotools.cue import Cuesheet, read_cuesheet, write_cuesheet
4555        self.suffix = ".cue"
4556        self.sheet_class = Cuesheet
4557        self.read_sheet = read_cuesheet
4558
4559    def __sheets__(self):
4560        from audiotools import Sheet, SheetTrack, SheetIndex
4561
4562        def timestamp_to_frac(m, s, f):
4563            from fractions import Fraction
4564            return Fraction((m * 60 * 75) + (s * 75) + f, 75)
4565
4566        # an ordinary cuesheet with no pre-gaps
4567        yield Sheet([SheetTrack(
4568                     number=1,
4569                     track_indexes=[
4570                         SheetIndex(1, timestamp_to_frac(0, 0, 0))]),
4571                     SheetTrack(
4572                         number=2,
4573                         track_indexes=[
4574                             SheetIndex(1, timestamp_to_frac(5, 50, 65))]),
4575                     SheetTrack(
4576                         number=3,
4577                         track_indexes=[
4578                             SheetIndex(1, timestamp_to_frac(9, 47, 50))]),
4579                     SheetTrack(
4580                         number=4,
4581                         track_indexes=[
4582                             SheetIndex(1, timestamp_to_frac(15, 12, 53))]),
4583                     SheetTrack(
4584                         number=5,
4585                         track_indexes=[
4586                             SheetIndex(1, timestamp_to_frac(25, 2, 40))]),
4587                     SheetTrack(
4588                         number=6,
4589                         track_indexes=[
4590                             SheetIndex(1, timestamp_to_frac(27, 34, 5))]),
4591                     SheetTrack(
4592                         number=7,
4593                         track_indexes=[
4594                             SheetIndex(1, timestamp_to_frac(31, 58, 53))]),
4595                     SheetTrack(
4596                         number=8,
4597                         track_indexes=[
4598                             SheetIndex(1, timestamp_to_frac(35, 8, 65))])])
4599
4600        # a cuesheet spread across a couple of different files
4601        yield Sheet([SheetTrack(number=1,
4602                                track_indexes=[
4603                                    SheetIndex(1,
4604                                               timestamp_to_frac(0, 0, 0))],
4605                                filename=u"TRACK1.WAV"),
4606                     SheetTrack(number=2,
4607                                track_indexes=[
4608                                    SheetIndex(1,
4609                                               timestamp_to_frac(5, 50, 65))],
4610                                filename=u"TRACK1.WAV"),
4611                     SheetTrack(number=3,
4612                                track_indexes=[
4613                                    SheetIndex(1,
4614                                               timestamp_to_frac(9, 47, 50))],
4615                                filename=u"TRACK1.WAV"),
4616                     SheetTrack(number=4,
4617                                track_indexes=[
4618                                    SheetIndex(1,
4619                                               timestamp_to_frac(15, 12, 53))],
4620                                filename=u"TRACK1.WAV"),
4621                     SheetTrack(number=5,
4622                                track_indexes=[
4623                                    SheetIndex(1,
4624                                               timestamp_to_frac(0, 0, 0))],
4625                                filename=u"TRACK2.WAV"),
4626                     SheetTrack(number=6,
4627                                track_indexes=[
4628                                    SheetIndex(1,
4629                                               timestamp_to_frac(2, 31, 40))],
4630                                filename=u"TRACK2.WAV"),
4631                     SheetTrack(number=7,
4632                                track_indexes=[
4633                                    SheetIndex(1,
4634                                               timestamp_to_frac(6, 56, 13))],
4635                                filename=u"TRACK2.WAV"),
4636                     SheetTrack(number=8,
4637                                track_indexes=[
4638                                    SheetIndex(1,
4639                                               timestamp_to_frac(10, 6, 25))],
4640                                filename=u"TRACK2.WAV")])
4641
4642        # mix in some pre-gaps
4643        yield Sheet([SheetTrack(number=1,
4644                                track_indexes=[
4645                                    SheetIndex(1,
4646                                               timestamp_to_frac(0, 0, 0))]),
4647                     SheetTrack(number=2,
4648                                track_indexes=[
4649                                    SheetIndex(0,
4650                                               timestamp_to_frac(5, 49, 65)),
4651                                    SheetIndex(1,
4652                                               timestamp_to_frac(5, 50, 65))]),
4653                     SheetTrack(number=3,
4654                                track_indexes=[
4655                                    SheetIndex(0,
4656                                               timestamp_to_frac(9, 45, 50)),
4657                                    SheetIndex(1,
4658                                               timestamp_to_frac(9, 47, 50))]),
4659                     SheetTrack(number=4,
4660                                track_indexes=[
4661                                    SheetIndex(0,
4662                                               timestamp_to_frac(15, 9, 53)),
4663                                    SheetIndex(1,
4664                                               timestamp_to_frac(15, 12, 53))])
4665                     ])
4666
4667        # add catalog numbers, ISRCs and multiple index points
4668        yield Sheet([SheetTrack(number=1,
4669                                track_indexes=[
4670                                    SheetIndex(1,
4671                                               timestamp_to_frac(0, 0, 0)),
4672                                    SheetIndex(2,
4673                                               timestamp_to_frac(2, 0, 0)),
4674                                    SheetIndex(3,
4675                                               timestamp_to_frac(4, 0, 0))],
4676                                metadata=audiotools.MetaData(
4677                                    ISRC=u"ABCDE1234567"),
4678                                filename=u"MYAUDIO1.WAV"),
4679                     SheetTrack(number=2,
4680                                track_indexes=[
4681                                    SheetIndex(1,
4682                                               timestamp_to_frac(0, 0, 0))],
4683                                metadata=audiotools.MetaData(
4684                                    ISRC=u"XYZZY0000000"),
4685                                filename=u"MYAUDIO2.WAV"),
4686                     SheetTrack(number=3,
4687                                track_indexes=[
4688                                    SheetIndex(0,
4689                                               timestamp_to_frac(3, 0, 0)),
4690                                    SheetIndex(1,
4691                                               timestamp_to_frac(3, 2, 0)),
4692                                    SheetIndex(2,
4693                                               timestamp_to_frac(5, 34, 32)),
4694                                    SheetIndex(3,
4695                                               timestamp_to_frac(8, 12, 49)),
4696                                    SheetIndex(4,
4697                                               timestamp_to_frac(10, 1, 74))],
4698                                metadata=audiotools.MetaData(
4699                                    ISRC=u"123456789012"),
4700                                filename=u"MYAUDIO2.WAV")],
4701                    metadata=audiotools.MetaData(catalog=u"3898347789120"))
4702
4703    def __metadata_sheets__(self):
4704        from audiotools import Sheet, SheetTrack, SheetIndex
4705
4706        def timestamp_to_frac(m, s, f):
4707            from fractions import Fraction
4708            return Fraction((m * 60 * 75) + (s * 75) + f, 75)
4709
4710        # a sheet with a portable set of plain metadata
4711        yield Sheet([SheetTrack(number=1,
4712                                track_indexes=[
4713                                    SheetIndex(1,
4714                                               timestamp_to_frac(0, 0, 0))],
4715                                metadata=audiotools.MetaData(
4716                                    track_name=u"Track 1",
4717                                    performer_name=u"Performer 1",
4718                                    artist_name=u"Artist 1"),
4719                                filename=u"CDImage.wav"),
4720                     SheetTrack(number=2,
4721                                track_indexes=[
4722                                    SheetIndex(0,
4723                                               timestamp_to_frac(4, 36, 50)),
4724                                    SheetIndex(1,
4725                                               timestamp_to_frac(4, 41, 10))],
4726                                metadata=audiotools.MetaData(
4727                                    track_name=u"Track 2",
4728                                    performer_name=u"Performer 2",
4729                                    artist_name=u"Artist 2"),
4730                                filename=u"CDImage.wav")],
4731                    metadata=audiotools.MetaData(
4732                        album_name=u"Album Name",
4733                        performer_name=u"Album Performer",
4734                        artist_name=u"Album Artist"))
4735
4736        # a sheet with a lot of strings that need escaping
4737        yield Sheet([SheetTrack(number=1,
4738                                track_indexes=[
4739                                    SheetIndex(1,
4740                                               timestamp_to_frac(0, 0, 0))],
4741                                metadata=audiotools.MetaData(
4742                                    track_name=u"Track \"1\"",
4743                                    performer_name=u"Performer \"1\"",
4744                                    artist_name=u"Artist \"1\""),
4745                                filename=u"CD\"Image\".wav"),
4746                     SheetTrack(number=2,
4747                                track_indexes=[
4748                                    SheetIndex(0,
4749                                               timestamp_to_frac(4, 36, 50)),
4750                                    SheetIndex(1,
4751                                               timestamp_to_frac(4, 41, 10))],
4752                                metadata=audiotools.MetaData(
4753                                    track_name=u"Track \"2\"",
4754                                    performer_name=u"Performer \"2\"",
4755                                    artist_name=u"Artist \"2\""),
4756                                filename=u"CD\"Image\".wav")],
4757                    metadata=audiotools.MetaData(
4758                        album_name=u"Album \"Name\"",
4759                        performer_name=u"Album \"Performer\"",
4760                        artist_name=u"Album \"Artist\""))
4761
4762        # a sheet with lots of backslashes that need escaping
4763        yield Sheet([SheetTrack(number=1,
4764                                track_indexes=[
4765                                    SheetIndex(1,
4766                                               timestamp_to_frac(0, 0, 0))],
4767                                metadata=audiotools.MetaData(
4768                                    track_name=u"Track \\ 1",
4769                                    performer_name=u"Performer \\ 1",
4770                                    artist_name=u"Artist \\ 1"),
4771                                filename=u"CD\\Image.wav"),
4772                     SheetTrack(number=2,
4773                                track_indexes=[
4774                                    SheetIndex(0,
4775                                               timestamp_to_frac(4, 36, 50)),
4776                                    SheetIndex(1,
4777                                               timestamp_to_frac(4, 41, 10))],
4778                                metadata=audiotools.MetaData(
4779                                    track_name=u"Track \\ 2",
4780                                    performer_name=u"Performer \\ 2",
4781                                    artist_name=u"Artist \\ 2"),
4782                                filename=u"CD\\Image.wav")],
4783                    metadata=audiotools.MetaData(
4784                        album_name=u"Album \\ Name",
4785                        performer_name=u"Album \\ Performer",
4786                        artist_name=u"Album \\ Artist"))
4787
4788    @LIB_CUESHEET
4789    def test_attributes(self):
4790        from audiotools import Sheet, SheetTrack, SheetIndex
4791
4792        def timestamp_to_frac(m, s, f):
4793            from fractions import Fraction
4794            return Fraction((m * 60 * 75) + (s * 75) + f, 75)
4795
4796        raw_sheet = \
4797            Sheet([SheetTrack(number=1,
4798                              track_indexes=[
4799                                  SheetIndex(0,
4800                                             timestamp_to_frac(0, 0, 0)),
4801                                  SheetIndex(1,
4802                                             timestamp_to_frac(0, 1, 0))],
4803                              metadata=audiotools.MetaData(
4804                                  track_name=u"Track 1"),
4805                              filename=u"CDImage.wav"),
4806                   SheetTrack(number=2,
4807                              track_indexes=[
4808                                  SheetIndex(1,
4809                                             timestamp_to_frac(0, 5, 0))],
4810                              metadata=audiotools.MetaData(
4811                                  track_name=u"Track 2"),
4812                              filename=u"CDImage.wav"),
4813                   SheetTrack(number=3,
4814                              track_indexes=[
4815                                  SheetIndex(0,
4816                                             timestamp_to_frac(0, 9, 0)),
4817                                  SheetIndex(1,
4818                                             timestamp_to_frac(0, 11, 0)),
4819                                  SheetIndex(2,
4820                                             timestamp_to_frac(0, 12, 0))],
4821                              metadata=audiotools.MetaData(
4822                                  track_name=u"Track 3"),
4823                              filename=u"CDImage.wav")])
4824
4825        sheet = self.sheet_class.converted(raw_sheet)
4826
4827        self.assertEqual(sheet, raw_sheet)
4828        for other_sheet in self.__sheets__():
4829            self.assertNotEqual(sheet, other_sheet)
4830
4831        self.assertEqual(sheet.track_numbers(), [1, 2, 3])
4832        self.assertRaises(KeyError, sheet.track, 0)
4833        self.assertRaises(KeyError, sheet.track, 4)
4834        self.assertEqual(sheet.pre_gap(), 1)
4835        self.assertEqual(sheet.track_offset(1), 1)
4836        self.assertEqual(sheet.track_length(1), 4)
4837        self.assertEqual(sheet.track_offset(2), 5)
4838        self.assertEqual(sheet.track_length(2), 6)
4839        self.assertEqual(sheet.track_offset(3), 11)
4840        self.assertEqual(sheet.track_length(3), None)
4841
4842        self.assertEqual(sheet.track(1).indexes(), [0, 1])
4843        self.assertRaises(KeyError, sheet.track(1).index, 2)
4844        self.assertEqual(sheet.track(2).indexes(), [1])
4845        self.assertRaises(KeyError, sheet.track(2).index, 0)
4846        self.assertRaises(KeyError, sheet.track(2).index, 2)
4847        self.assertEqual(sheet.track(3).indexes(), [0, 1, 2])
4848        self.assertRaises(KeyError, sheet.track(3).index, 3)
4849
4850        round_trip_sheet = Sheet.converted(sheet)
4851        self.assertEqual(round_trip_sheet, sheet)
4852
4853    @LIB_CUESHEET
4854    def test_round_trip(self):
4855        for sheet in self.__sheets__():
4856            converted = self.sheet_class.converted(sheet)
4857            self.assertEqual(converted, sheet)
4858            temp_sheet = tempfile.NamedTemporaryFile(suffix=self.suffix)
4859            temp_sheet.write(converted.build().encode("UTF-8"))
4860            temp_sheet.flush()
4861            re_read = self.read_sheet(temp_sheet.name)
4862            temp_sheet.close()
4863            self.assertEqual(re_read, sheet)
4864
4865    @LIB_CUESHEET
4866    def test_metadata(self):
4867        for sheet in self.__metadata_sheets__():
4868            converted = self.sheet_class.converted(sheet)
4869            self.assertEqual(converted, sheet)
4870            temp_sheet = tempfile.NamedTemporaryFile(suffix=self.suffix)
4871            temp_sheet.write(converted.build().encode("UTF-8"))
4872            temp_sheet.flush()
4873            re_read = self.read_sheet(temp_sheet.name)
4874            temp_sheet.close()
4875            self.assertEqual(re_read, sheet)
4876
4877    @LIB_CUESHEET
4878    def test_flags(self):
4879        from audiotools import Sheet, SheetTrack, SheetIndex
4880        from fractions import Fraction
4881
4882        raw_sheet = \
4883            Sheet([SheetTrack(number=1,
4884                              track_indexes=[SheetIndex(1, Fraction(0, 1))],
4885                              filename=u"track1.wav",
4886                              pre_emphasis=False,
4887                              copy_permitted=False),
4888                   SheetTrack(number=2,
4889                              track_indexes=[SheetIndex(1, Fraction(0, 1))],
4890                              filename=u"track2.wav",
4891                              pre_emphasis=True,
4892                              copy_permitted=False),
4893                   SheetTrack(number=3,
4894                              track_indexes=[SheetIndex(1, Fraction(0, 1))],
4895                              filename=u"track3.wav",
4896                              pre_emphasis=False,
4897                              copy_permitted=True),
4898                   SheetTrack(number=4,
4899                              track_indexes=[SheetIndex(1, Fraction(0, 1))],
4900                              filename=u"track4.wav",
4901                              pre_emphasis=True,
4902                              copy_permitted=True)])
4903
4904        self.assertEqual(raw_sheet.track(1).filename(), u"track1.wav")
4905        self.assertEqual(raw_sheet.track(1).pre_emphasis(), False)
4906        self.assertEqual(raw_sheet.track(1).copy_permitted(), False)
4907        self.assertEqual(raw_sheet.track(2).filename(), u"track2.wav")
4908        self.assertEqual(raw_sheet.track(2).pre_emphasis(), True)
4909        self.assertEqual(raw_sheet.track(2).copy_permitted(), False)
4910        self.assertEqual(raw_sheet.track(3).filename(), u"track3.wav")
4911        self.assertEqual(raw_sheet.track(3).pre_emphasis(), False)
4912        self.assertEqual(raw_sheet.track(3).copy_permitted(), True)
4913        self.assertEqual(raw_sheet.track(4).filename(), u"track4.wav")
4914        self.assertEqual(raw_sheet.track(4).pre_emphasis(), True)
4915        self.assertEqual(raw_sheet.track(4).copy_permitted(), True)
4916
4917        sheet = self.sheet_class.converted(raw_sheet)
4918
4919        self.assertEqual(sheet.track(1).filename(), u"track1.wav")
4920        self.assertEqual(sheet.track(1).pre_emphasis(), False)
4921        self.assertEqual(sheet.track(1).copy_permitted(), False)
4922        self.assertEqual(sheet.track(2).filename(), u"track2.wav")
4923        self.assertEqual(sheet.track(2).pre_emphasis(), True)
4924        self.assertEqual(sheet.track(2).copy_permitted(), False)
4925        self.assertEqual(sheet.track(3).filename(), u"track3.wav")
4926        self.assertEqual(sheet.track(3).pre_emphasis(), False)
4927        self.assertEqual(sheet.track(3).copy_permitted(), True)
4928        self.assertEqual(sheet.track(4).filename(), u"track4.wav")
4929        self.assertEqual(sheet.track(4).pre_emphasis(), True)
4930        self.assertEqual(sheet.track(4).copy_permitted(), True)
4931
4932        self.assertEqual(sheet, raw_sheet)
4933
4934        for other_sheet in self.__sheets__():
4935            self.assertNotEqual(sheet, other_sheet)
4936
4937        # round-trip sheet to disk to ensure it still works
4938        temp_sheet = tempfile.NamedTemporaryFile(suffix=self.suffix)
4939        temp_sheet.write(sheet.build().encode("UTF-8"))
4940        temp_sheet.flush()
4941        re_read = self.read_sheet(temp_sheet.name)
4942        temp_sheet.close()
4943        self.assertEqual(re_read, sheet)
4944
4945
4946class testtocfile(testcuesheet):
4947    def setUp(self):
4948        from audiotools.toc import TOCFile, read_tocfile, write_tocfile
4949        self.suffix = ".toc"
4950        self.sheet_class = TOCFile
4951        self.read_sheet = read_tocfile
4952
4953
4954class test_flac_cuesheet(testcuesheet):
4955    def setUp(self):
4956        self.audio_class = audiotools.FlacAudio
4957
4958    def __sheets__(self):
4959        # unlike the regular testcuesheet files
4960        # these only contain CD images
4961
4962        for sheet in testcuesheet.__sheets__(self):
4963            if sheet.image_formatted():
4964                yield sheet
4965
4966    @LIB_CUESHEET
4967    def test_attributes(self):
4968        from audiotools import Sheet, SheetTrack, SheetIndex
4969        from audiotools.flac import Flac_CUESHEET
4970
4971        def timestamp_to_frac(m, s, f):
4972            from fractions import Fraction
4973            return Fraction((m * 60 * 75) + (s * 75) + f, 75)
4974
4975        raw_sheet = \
4976            Sheet([SheetTrack(number=1,
4977                              track_indexes=[
4978                                  SheetIndex(0,
4979                                             timestamp_to_frac(0, 0, 0)),
4980                                  SheetIndex(1,
4981                                             timestamp_to_frac(0, 1, 0))],
4982                              metadata=audiotools.MetaData(
4983                                  track_name=u"Track 1"),
4984                              filename=u"CDImage.wav"),
4985                   SheetTrack(number=2,
4986                              track_indexes=[
4987                                  SheetIndex(1,
4988                                             timestamp_to_frac(0, 5, 0))],
4989                              metadata=audiotools.MetaData(
4990                                  track_name=u"Track 2"),
4991                              filename=u"CDImage.wav"),
4992                   SheetTrack(number=3,
4993                              track_indexes=[
4994                                  SheetIndex(0,
4995                                             timestamp_to_frac(0, 9, 0)),
4996                                  SheetIndex(1,
4997                                             timestamp_to_frac(0, 11, 0)),
4998                                  SheetIndex(2,
4999                                             timestamp_to_frac(0, 12, 0))],
5000                              metadata=audiotools.MetaData(
5001                                  track_name=u"Track 3"),
5002                              filename=u"CDImage.wav")])
5003
5004        sheet = Flac_CUESHEET.converted(raw_sheet, 882000, 44100)
5005
5006        self.assertEqual(sheet, raw_sheet)
5007        for other_sheet in self.__sheets__():
5008            self.assertNotEqual(sheet, other_sheet)
5009
5010        self.assertEqual(sheet.track_numbers(), [1, 2, 3])
5011        self.assertRaises(KeyError, sheet.track, 0)
5012        self.assertRaises(KeyError, sheet.track, 4)
5013        self.assertEqual(sheet.pre_gap(), 1)
5014        self.assertEqual(sheet.track_offset(1), 1)
5015        self.assertEqual(sheet.track_length(1), 4)
5016        self.assertEqual(sheet.track_offset(2), 5)
5017        self.assertEqual(sheet.track_length(2), 6)
5018        self.assertEqual(sheet.track_offset(3), 11)
5019        self.assertEqual(sheet.track_length(3), 9)
5020
5021        self.assertEqual(sheet.track(1).indexes(), [0, 1])
5022        self.assertRaises(KeyError, sheet.track(1).index, 2)
5023        self.assertEqual(sheet.track(2).indexes(), [1])
5024        self.assertRaises(KeyError, sheet.track(2).index, 0)
5025        self.assertRaises(KeyError, sheet.track(2).index, 2)
5026        self.assertEqual(sheet.track(3).indexes(), [0, 1, 2])
5027        self.assertRaises(KeyError, sheet.track(3).index, 3)
5028
5029        round_trip_sheet = Sheet.converted(sheet)
5030        self.assertEqual(round_trip_sheet, sheet)
5031
5032    @LIB_CUESHEET
5033    def test_round_trip(self):
5034        sample_rate = 44100
5035
5036        for sheet in self.__sheets__():
5037            # tack on 1 minute to cuesheet's last index for total size
5038            total_length = int((sheet[-1][-1].offset() + 60) * sample_rate)
5039
5040            # create dummy file
5041            temp_file = tempfile.NamedTemporaryFile(
5042                suffix="." + self.audio_class.SUFFIX)
5043            temp_track = self.audio_class.from_pcm(
5044                temp_file.name,
5045                EXACT_SILENCE_PCM_Reader(pcm_frames=total_length,
5046                                         sample_rate=sample_rate,
5047                                         channels=2,
5048                                         bits_per_sample=16,
5049                                         channel_mask=0x3),
5050                total_pcm_frames=total_length)
5051
5052            # set cuesheet
5053            temp_track.set_cuesheet(sheet)
5054
5055            # get cuesheet
5056            track_sheet = audiotools.open(temp_file.name).get_cuesheet()
5057            self.assertIsNot(track_sheet, None)
5058
5059            # ensure they match
5060            self.assertEqual(track_sheet, sheet)
5061
5062            # clean out dummy file
5063            temp_file.close()
5064
5065    @LIB_CUESHEET
5066    def test_metadata(self):
5067        # FLAC cuesheets don't support meaningful metadata
5068        # outside of catalog and ISRC
5069        self.assertTrue(True)
5070
5071    @LIB_CUESHEET
5072    def test_flags(self):
5073        # FLAC cuesheets only support pre-emphasis flag
5074        #FIXME
5075
5076        self.assertTrue(True)
5077
5078
5079class test_oggflac_cuesheet(test_flac_cuesheet):
5080    def setUp(self):
5081        self.audio_class = audiotools.OggFlacAudio
5082
5083    @LIB_CUESHEET
5084    def test_attributes(self):
5085        pass
5086
5087
5088class test_tta_cuesheet(test_flac_cuesheet):
5089    def setUp(self):
5090        self.audio_class = audiotools.TrueAudio
5091
5092    @LIB_CUESHEET
5093    def test_attributes(self):
5094        pass
5095
5096
5097class test_wavpack_cuesheet(test_flac_cuesheet):
5098    def setUp(self):
5099        self.audio_class = audiotools.WavPackAudio
5100
5101    @LIB_CUESHEET
5102    def test_attributes(self):
5103        pass
5104
5105
5106class TestCDTOC(unittest.TestCase):
5107    @LIB_CUESHEET
5108    def test_round_trip(self):
5109        from audiotools.cdtoc import CDTOC
5110        from audiotools import Sheet
5111        from fractions import Fraction
5112
5113        tocs = [u"A+96+4975+99F4+E600+130AB+16D98+1A538+" +
5114                u"1EC41+22DE4+27579+2C6D2",
5115                u"A+2373+85AC+D381+124BC+15A95+1A7CA+" +
5116                u"2112E+25BC7+2A3B6+3001B+39DC8",
5117                u"D+96+49F3+8A23+C240+10991+1491C+18F27+1DBD8+" +
5118                u"216CC+25C00+2ACA3+2F41F+33D59+382D4+44C3D",
5119                u"B+908C+F7F7+14708+1A8BC+206F8+27294+2C51B+" +
5120                u"31A48+3862F+3D89D+44661+4A29C+X96"]
5121
5122        lead_outs = [181820, 236850, 281511, 303622]
5123
5124        for (toc, lead_out) in zip(tocs, lead_outs):
5125            cdtoc = CDTOC.from_unicode(toc)
5126            self.assertEqual(cdtoc.__unicode__(), toc)
5127            sheet = Sheet.converted(cdtoc)
5128            self.assertEqual(cdtoc, sheet)
5129            cdtoc2 = CDTOC.converted(sheet, Fraction(lead_out, 75))
5130            self.assertEqual(cdtoc2, cdtoc)
5131            self.assertEqual(cdtoc2.__unicode__(), toc)
5132
5133    @LIB_CUESHEET
5134    def test_flac(self):
5135        from audiotools import Sheet, SheetTrack, SheetIndex
5136        from audiotools.flac import Flac_CUESHEET
5137        from fractions import Fraction
5138
5139        with tempfile.NamedTemporaryFile(suffix=".flac") as temp_file:
5140            temp_track = audiotools.FlacAudio.from_pcm(
5141                temp_file.name,
5142                EXACT_SILENCE_PCM_Reader(44100 * 13),
5143                total_pcm_frames=44100 * 13)
5144
5145            # check malformed CDTOC tag
5146            self.assertIsNone(temp_track.get_cuesheet())
5147            metadata = temp_track.get_metadata()
5148            comment = metadata.get_block(4)
5149            comment[u"CDTOC"] = [u"INVALID"]
5150            temp_track.update_metadata(metadata)
5151            self.assertEqual(temp_track.get_metadata().get_block(4)[u"CDTOC"],
5152                             [u"INVALID"])
5153            self.assertIsNone(temp_track.get_cuesheet())
5154
5155            # check CDTOC tag
5156            metadata = temp_track.get_metadata()
5157            comment = metadata.get_block(4)
5158            comment[u"CDTOC"] = [u"3+E1+177+2A3+465"]
5159            temp_track.update_metadata(metadata)
5160            cuesheet = temp_track.get_cuesheet()
5161            self.assertIsNotNone(cuesheet)
5162            self.assertEqual(cuesheet.pre_gap(), 1)
5163            self.assertEqual(cuesheet.track(1).index(1).offset(), 1)
5164            self.assertEqual(cuesheet.track_length(1), 2)
5165            self.assertEqual(cuesheet.track(2).index(1).offset(), 3)
5166            self.assertEqual(cuesheet.track_length(2), 4)
5167            self.assertEqual(cuesheet.track(3).index(1).offset(), 7)
5168            self.assertEqual(cuesheet.track_length(3), 6)
5169
5170            # ensure CUESHEET block takes precedence over tag in get_cuesheet
5171            new_sheet = Sheet([SheetTrack(1, [SheetIndex(0, Fraction(0, 1)),
5172                                              SheetIndex(1, Fraction(1, 1))]),
5173                               SheetTrack(2, [SheetIndex(1, Fraction(4, 1))]),
5174                               SheetTrack(3, [SheetIndex(1, Fraction(6, 1))])])
5175
5176            metadata = temp_track.get_metadata()
5177            metadata.replace_blocks(
5178                5,
5179                [Flac_CUESHEET.converted(new_sheet,
5180                                         temp_track.total_frames(),
5181                                         temp_track.sample_rate())])
5182            temp_track.update_metadata(metadata)
5183            metadata = temp_track.get_metadata()
5184            self.assertTrue(metadata.has_block(5))
5185            self.assertEqual(metadata.get_block(4)[u"CDTOC"],
5186                             [u"3+E1+177+2A3+465"])
5187            cuesheet = temp_track.get_cuesheet()
5188            self.assertIsNotNone(cuesheet)
5189            self.assertEqual(cuesheet.pre_gap(), 1)
5190            self.assertEqual(cuesheet.track(1).index(1).offset(), 1)
5191            self.assertEqual(cuesheet.track_length(1), 3)
5192            self.assertEqual(cuesheet.track(2).index(1).offset(), 4)
5193            self.assertEqual(cuesheet.track_length(2), 2)
5194            self.assertEqual(cuesheet.track(3).index(1).offset(), 6)
5195            self.assertEqual(cuesheet.track_length(3), 7)
5196
5197            # set_cuesheet overwrites block and wipes out tag
5198            new_sheet2 = Sheet([SheetTrack(1, [SheetIndex(1, Fraction(0, 1))]),
5199                                SheetTrack(2, [SheetIndex(1, Fraction(5, 1))])])
5200            temp_track.set_cuesheet(new_sheet2)
5201            metadata = temp_track.get_metadata()
5202            self.assertEqual(len(metadata.get_blocks(5)), 1)
5203            vorbiscomment = metadata.get_block(4)
5204            self.assertRaises(KeyError,
5205                              vorbiscomment.__getitem__,
5206                              u"CDTOC")
5207            cuesheet = temp_track.get_cuesheet()
5208            self.assertIsNotNone(cuesheet)
5209            self.assertEqual(cuesheet.pre_gap(), 0)
5210            self.assertEqual(cuesheet.track(1).index(1).offset(), 0)
5211            self.assertEqual(cuesheet.track_length(1), 5)
5212            self.assertEqual(cuesheet.track(2).index(1).offset(), 5)
5213            self.assertEqual(cuesheet.track_length(2), 8)
5214
5215            # delete_cuesheet wipes out both block and tag
5216            metadata = temp_track.get_metadata()
5217            metadata.get_block(4)[u"CDTOC"] = [u"3+E1+177+2A3+465"]
5218            metadata.replace_blocks(
5219                5,
5220                [Flac_CUESHEET.converted(new_sheet,
5221                                         temp_track.total_frames(),
5222                                         temp_track.sample_rate())])
5223            temp_track.update_metadata(metadata)
5224            temp_track.delete_cuesheet()
5225            self.assertIsNone(temp_track.get_cuesheet())
5226            metadata = temp_track.get_metadata()
5227            self.assertFalse(metadata.has_block(5))
5228            self.assertRaises(KeyError,
5229                              metadata.get_block(4).__getitem__,
5230                              u"CDTOC")
5231
5232
5233class TestMultiChannel(unittest.TestCase):
5234    def setUp(self):
5235        # these support the full range of ChannelMasks
5236        self.wav_channel_masks = [audiotools.WaveAudio,
5237                                  audiotools.WavPackAudio]
5238
5239        # these support a subset of ChannelMasks up to 6 channels
5240        self.flac_channel_masks = [audiotools.FlacAudio,
5241                                   audiotools.OggFlacAudio]
5242
5243        # these support a reordered subset of ChannelMasks up to 8 channels
5244        self.vorbis_channel_masks = [audiotools.VorbisAudio,
5245                                     audiotools.OpusAudio]
5246
5247    def __test_mask_blank__(self, audio_class, channel_mask):
5248        with tempfile.NamedTemporaryFile(
5249            suffix="." + audio_class.SUFFIX) as temp_file:
5250            temp_track = audio_class.from_pcm(
5251                temp_file.name,
5252                Join_Reader(
5253                    [BLANK_PCM_Reader(2, channels=1)
5254                     for i in range(len(channel_mask))],
5255                    int(channel_mask)))
5256            self.assertEqual(temp_track.channel_mask(), channel_mask,
5257                             "%s != %s for format %s" %
5258                             (temp_track.channel_mask(),
5259                              channel_mask,
5260                              audio_class.NAME))
5261
5262            pcm = temp_track.to_pcm()
5263            self.assertEqual(int(pcm.channel_mask), int(channel_mask))
5264            audiotools.transfer_framelist_data(pcm, lambda x: None)
5265
5266    def __test_undefined_mask_blank__(self, audio_class, channels,
5267                                      should_be_blank):
5268        temp_file = tempfile.NamedTemporaryFile(
5269            suffix="." + audio_class.SUFFIX)
5270        try:
5271            temp_track = audio_class.from_pcm(
5272                temp_file.name,
5273                Join_Reader(
5274                    [BLANK_PCM_Reader(2, channels=1)
5275                     for i in range(channels)],
5276                    int(audiotools.ChannelMask(0))))
5277            self.assertEqual(temp_track.channels(), channels)
5278            if should_be_blank:
5279                self.assertEqual(int(temp_track.channel_mask()), 0)
5280                pcm = temp_track.to_pcm()
5281                self.assertEqual(int(pcm.channel_mask), 0)
5282                audiotools.transfer_framelist_data(pcm, lambda x: None)
5283            else:
5284                self.assertNotEqual(int(temp_track.channel_mask()), 0,
5285                                    "mask = %s for format %s at %d channels" %
5286                                    (temp_track.channel_mask(),
5287                                     audio_class,
5288                                     channels))
5289                pcm = temp_track.to_pcm()
5290                self.assertEqual(int(pcm.channel_mask),
5291                                 int(temp_track.channel_mask()))
5292                audiotools.transfer_framelist_data(pcm, lambda x: None)
5293        finally:
5294            temp_file.close()
5295
5296    def __test_error_mask_blank__(self, audio_class, channels,
5297                                  channel_mask):
5298        temp_file = tempfile.NamedTemporaryFile(
5299            suffix="." + audio_class.SUFFIX)
5300        try:
5301            self.assertRaises(
5302                audiotools.UnsupportedChannelMask,
5303                audio_class.from_pcm,
5304                temp_file.name,
5305                Join_Reader([BLANK_PCM_Reader(2, channels=1)
5306                             for i in range(channels)],
5307                            int(channel_mask)))
5308        finally:
5309            temp_file.close()
5310
5311    def __test_error_channel_count__(self, audio_class, channels,
5312                                     channel_mask):
5313        temp_file = tempfile.NamedTemporaryFile(
5314            suffix="." + audio_class.SUFFIX)
5315        try:
5316            self.assertRaises(
5317                audiotools.UnsupportedChannelCount,
5318                audio_class.from_pcm,
5319                temp_file.name,
5320                Join_Reader([BLANK_PCM_Reader(2, channels=1)
5321                             for i in range(channels)],
5322                            int(channel_mask)))
5323        finally:
5324            temp_file.close()
5325
5326    def __test_pcm_conversion__(self,
5327                                source_audio_class,
5328                                target_audio_class,
5329                                channel_mask):
5330        source_file = tempfile.NamedTemporaryFile(
5331            suffix="." + source_audio_class.SUFFIX)
5332        target_file = tempfile.NamedTemporaryFile(
5333            suffix="." + target_audio_class.SUFFIX)
5334        wav_file = tempfile.NamedTemporaryFile(suffix=".wav")
5335        try:
5336            source_track = source_audio_class.from_pcm(
5337                source_file.name,
5338                Join_Reader(
5339                    [BLANK_PCM_Reader(2, channels=1)
5340                     for i in range(len(channel_mask))],
5341                    int(channel_mask)))
5342            self.assertEqual(source_track.channel_mask(), channel_mask)
5343
5344            source_pcm = source_track.to_pcm()
5345
5346            self.assertEqual(isinstance(source_pcm.channel_mask, int),
5347                             True,
5348                             "%s's to_pcm() PCMReader is not an int" %
5349                             (source_audio_class.NAME))
5350
5351            target_track = target_audio_class.from_pcm(
5352                target_file.name,
5353                source_pcm)
5354
5355            self.assertEqual(target_track.channel_mask(), channel_mask)
5356            self.assertEqual(source_track.channel_mask(),
5357                             target_track.channel_mask())
5358
5359            source_track.convert(wav_file.name, audiotools.WaveAudio)
5360            wav = audiotools.open(wav_file.name)
5361            wav.verify()
5362            self.assertEqual(source_track.channel_mask(),
5363                             wav.channel_mask())
5364            target_track = wav.convert(target_file.name,
5365                                       audiotools.WaveAudio)
5366            self.assertEqual(target_track.channel_mask(), channel_mask)
5367            self.assertEqual(source_track.channel_mask(),
5368                             target_track.channel_mask())
5369        finally:
5370            source_file.close()
5371            target_file.close()
5372            wav_file.close()
5373
5374    @LIB_CORE
5375    def test_channel_mask(self):
5376        from_fields = audiotools.ChannelMask.from_fields
5377
5378        for audio_class in (self.wav_channel_masks +
5379                            self.flac_channel_masks +
5380                            self.vorbis_channel_masks):
5381            for mask in [from_fields(front_center=True),
5382                         from_fields(front_left=True,
5383                                     front_right=True),
5384                         from_fields(front_left=True,
5385                                     front_right=True,
5386                                     front_center=True),
5387                         from_fields(front_right=True,
5388                                     front_left=True,
5389                                     back_right=True,
5390                                     back_left=True),
5391                         from_fields(front_right=True,
5392                                     front_center=True,
5393                                     front_left=True,
5394                                     back_right=True,
5395                                     back_left=True),
5396                         from_fields(front_right=True,
5397                                     front_center=True,
5398                                     low_frequency=True,
5399                                     front_left=True,
5400                                     back_right=True,
5401                                     back_left=True)]:
5402                self.__test_mask_blank__(audio_class, mask)
5403
5404        for audio_class in (self.wav_channel_masks +
5405                            self.vorbis_channel_masks):
5406            for mask in [from_fields(front_left=True, front_right=True,
5407                                     front_center=True,
5408                                     side_left=True, side_right=True,
5409                                     back_center=True, low_frequency=True),
5410                         from_fields(front_left=True, front_right=True,
5411                                     side_left=True, side_right=True,
5412                                     back_left=True, back_right=True,
5413                                     front_center=True, low_frequency=True)]:
5414                self.__test_mask_blank__(audio_class, mask)
5415
5416        for audio_class in self.wav_channel_masks:
5417            for mask in [from_fields(front_left=True, front_right=True,
5418                                     side_left=True, side_right=True,
5419                                     back_left=True, back_right=True,
5420                                     front_center=True, back_center=True,
5421                                     low_frequency=True),
5422                         from_fields(front_left=True, front_right=True,
5423                                     side_left=True, side_right=True,
5424                                     back_left=True, back_right=True,
5425                                     front_center=True, back_center=True)]:
5426                self.__test_mask_blank__(audio_class, mask)
5427
5428    @LIB_CORE
5429    def test_channel_mask_conversion(self):
5430        from_fields = audiotools.ChannelMask.from_fields
5431
5432        for source_audio_class in audiotools.AVAILABLE_TYPES:
5433            for target_audio_class in audiotools.AVAILABLE_TYPES:
5434                self.__test_pcm_conversion__(source_audio_class,
5435                                             target_audio_class,
5436                                             from_fields(front_left=True,
5437                                                         front_right=True))
5438
5439        for source_audio_class in (self.wav_channel_masks +
5440                                   self.flac_channel_masks +
5441                                   self.vorbis_channel_masks):
5442            for target_audio_class in (self.wav_channel_masks +
5443                                       self.flac_channel_masks +
5444                                       self.vorbis_channel_masks):
5445                for mask in [from_fields(front_center=True),
5446                             from_fields(front_left=True,
5447                                         front_right=True),
5448                             from_fields(front_left=True,
5449                                         front_right=True,
5450                                         front_center=True),
5451                             from_fields(front_right=True,
5452                                         front_left=True,
5453                                         back_right=True,
5454                                         back_left=True),
5455                             from_fields(front_right=True,
5456                                         front_center=True,
5457                                         front_left=True,
5458                                         back_right=True,
5459                                         back_left=True),
5460                             from_fields(front_right=True,
5461                                         front_center=True,
5462                                         low_frequency=True,
5463                                         front_left=True,
5464                                         back_right=True,
5465                                         back_left=True)]:
5466                    self.__test_pcm_conversion__(source_audio_class,
5467                                                 target_audio_class,
5468                                                 mask)
5469
5470        for source_audio_class in (self.wav_channel_masks +
5471                                   self.vorbis_channel_masks):
5472            for target_audio_class in (self.wav_channel_masks +
5473                                       self.vorbis_channel_masks):
5474                for mask in [from_fields(front_left=True,
5475                                         front_right=True,
5476                                         front_center=True,
5477                                         side_left=True,
5478                                         side_right=True,
5479                                         back_center=True,
5480                                         low_frequency=True),
5481                             from_fields(front_left=True,
5482                                         front_right=True,
5483                                         side_left=True,
5484                                         side_right=True,
5485                                         back_left=True,
5486                                         back_right=True,
5487                                         front_center=True,
5488                                         low_frequency=True)]:
5489                    self.__test_pcm_conversion__(source_audio_class,
5490                                                 target_audio_class,
5491                                                 mask)
5492
5493        for source_audio_class in self.wav_channel_masks:
5494            for target_audio_class in self.wav_channel_masks:
5495                for mask in [from_fields(front_left=True,
5496                                         front_right=True,
5497                                         side_left=True,
5498                                         side_right=True,
5499                                         back_left=True,
5500                                         back_right=True,
5501                                         front_center=True,
5502                                         back_center=True,
5503                                         low_frequency=True),
5504                             from_fields(front_left=True,
5505                                         front_right=True,
5506                                         side_left=True,
5507                                         side_right=True,
5508                                         back_left=True,
5509                                         back_right=True,
5510                                         front_center=True,
5511                                         back_center=True)]:
5512                    self.__test_pcm_conversion__(source_audio_class,
5513                                                 target_audio_class,
5514                                                 mask)
5515
5516    @LIB_CORE
5517    def test_unsupported_channel_mask_from_pcm(self):
5518        for channels in range(1, 6 + 1):
5519            self.__test_undefined_mask_blank__(audiotools.WaveAudio,
5520                                               channels,
5521                                               False)
5522        for channels in range(1, 3):
5523            self.__test_undefined_mask_blank__(audiotools.WavPackAudio,
5524                                               channels,
5525                                               False)
5526        for channels in range(3, 21):
5527            self.__test_undefined_mask_blank__(audiotools.WavPackAudio,
5528                                               channels,
5529                                               True)
5530
5531        for channels in range(1, 9):
5532            self.__test_undefined_mask_blank__(audiotools.ALACAudio,
5533                                               channels,
5534                                               False)
5535        for channels in range(9, 21):
5536            self.__test_undefined_mask_blank__(audiotools.ALACAudio,
5537                                               channels,
5538                                               True)
5539
5540        for audio_class in [audiotools.FlacAudio, audiotools.OggFlacAudio]:
5541            for channels in range(1, 9):
5542                self.__test_undefined_mask_blank__(audio_class,
5543                                                   channels,
5544                                                   False)
5545
5546            self.__test_error_channel_count__(audio_class,
5547                                              9, audiotools.ChannelMask(0))
5548            self.__test_error_channel_count__(audio_class,
5549                                              10, audiotools.ChannelMask(0))
5550
5551        for stereo_audio_class in [audiotools.MP3Audio,
5552                                   audiotools.MP2Audio]:
5553
5554            self.__test_undefined_mask_blank__(stereo_audio_class,
5555                                               2, False)
5556            for channels in range(3, 20):
5557                temp_file = tempfile.NamedTemporaryFile(
5558                    suffix="." + stereo_audio_class.SUFFIX)
5559                try:
5560                    temp_track = stereo_audio_class.from_pcm(
5561                        temp_file.name,
5562                        Join_Reader(
5563                            [BLANK_PCM_Reader(2, channels=1)
5564                             for i in range(channels)],
5565                            int(audiotools.ChannelMask(0))))
5566                    self.assertEqual(temp_track.channels(), 2)
5567                    self.assertEqual(
5568                        int(temp_track.channel_mask()),
5569                        int(audiotools.ChannelMask.from_fields(
5570                            front_left=True, front_right=True)))
5571                    pcm = temp_track.to_pcm()
5572                    self.assertEqual(int(pcm.channel_mask),
5573                                     int(temp_track.channel_mask()))
5574                    audiotools.transfer_framelist_data(pcm, lambda x: x)
5575                finally:
5576                    temp_file.close()
5577
5578        for channels in range(1, 9):
5579            self.__test_undefined_mask_blank__(audiotools.VorbisAudio,
5580                                               channels,
5581                                               False)
5582
5583        for channels in range(9, 20):
5584            self.__test_undefined_mask_blank__(audiotools.VorbisAudio,
5585                                               channels,
5586                                               True)
5587
5588        for channels in [1, 2]:
5589            self.__test_undefined_mask_blank__(audiotools.AiffAudio,
5590                                               channels,
5591                                               False)
5592
5593        for channels in [3, 4, 5, 6, 7, 8, 9, 10]:
5594            self.__test_undefined_mask_blank__(audiotools.AiffAudio,
5595                                               channels,
5596                                               True)
5597
5598        for channels in [1, 2]:
5599            self.__test_undefined_mask_blank__(audiotools.AuAudio,
5600                                               channels,
5601                                               False)
5602        for channels in range(3, 11):
5603            self.__test_undefined_mask_blank__(audiotools.AuAudio,
5604                                               channels,
5605                                               True)
5606
5607
5608class Test_FreeDB(unittest.TestCase):
5609    def __test_disc_id_tracks__(self, disc_id_obj,
5610                                track_lengths,
5611                                cuesheet,
5612                                disc_id):
5613        from audiotools.cdio import CDDAReader
5614        from shutil import rmtree
5615        from sys import version_info
5616
5617        self.assertTrue(isinstance(cuesheet, bytes))
5618
5619        dir = tempfile.mkdtemp()
5620        try:
5621            # dump cuesheet to temporary directory
5622            with open(os.path.join(dir, "CDImage.cue"), "wb") as f:
5623                f.write(cuesheet)
5624
5625            # build CD image from track lengths
5626            with open(os.path.join(dir, "CDImage.bin"), "wb") as f:
5627                f.write(b"\x00" * 2 * 2 * sum(track_lengths))
5628
5629            # open disc image with CDDAReader
5630            cddareader = CDDAReader(os.path.join(dir, "CDImage.cue"))
5631
5632            # ensure DiscID from CDDAReader matches
5633            self.assertEqual(str(disc_id_obj.from_cddareader(cddareader)),
5634                             disc_id)
5635
5636            # dump contents of CDDAReader to individual tracks
5637            tracks = []
5638            for i in sorted(cddareader.track_offsets.keys()):
5639                offset = cddareader.track_offsets[i]
5640                length = cddareader.track_lengths[i]
5641                self.assertEqual(length, track_lengths[i - 1])
5642                self.assertEqual(cddareader.seek(offset), offset)
5643                tracks.append(audiotools.WaveAudio.from_pcm(
5644                    os.path.join(dir, "track%d.wav"),
5645                    audiotools.PCMReaderHead(cddareader, length, False),
5646                    total_pcm_frames=length))
5647
5648            # ensure DiscID from tracks matches
5649            self.assertEqual(str(disc_id_obj.from_tracks(tracks)),
5650                             str(disc_id_obj.from_cddareader(cddareader)))
5651
5652            # open cuesheet as a Sheet object
5653            sheet = audiotools.read_sheet(os.path.join(dir, "CDImage.cue"))
5654
5655            # ensure DiscID from sheet matches
5656            self.assertEqual(str(disc_id_obj.from_sheet(sheet,
5657                                                        sum(track_lengths),
5658                                                        44100)),
5659                             str(disc_id_obj.from_cddareader(cddareader)))
5660        finally:
5661            rmtree(dir)
5662
5663    def __test_disc_id__(self, disc_id_obj,
5664                         total_length,
5665                         cuesheet,
5666                         disc_id):
5667        from audiotools.cdio import CDDAReader
5668        from shutil import rmtree
5669
5670        self.assertTrue(isinstance(cuesheet, bytes))
5671
5672        dir = tempfile.mkdtemp()
5673        try:
5674            # dump cuesheet to temporary directory
5675            f = open(os.path.join(dir, "CDImage.cue"), "wb")
5676            f.write(cuesheet)
5677            f.close()
5678
5679            # build CD image from total length
5680            f = open(os.path.join(dir, "CDImage.bin"), "wb")
5681            f.write(b"\x00" * 2 * 2 * total_length)
5682            f.close()
5683
5684            # open disc image with CDDAReader
5685            cddareader = CDDAReader(os.path.join(dir, "CDImage.cue"))
5686
5687            # ensure DiscID from CDDAReader matches
5688            self.assertEqual(str(disc_id_obj.from_cddareader(cddareader)),
5689                             disc_id)
5690
5691            # open cuesheet as a Sheet object
5692            sheet = audiotools.read_sheet(os.path.join(dir, "CDImage.cue"))
5693
5694            # ensure DiscID from sheet matches
5695            self.assertEqual(str(disc_id_obj.from_sheet(sheet,
5696                                                        total_length,
5697                                                        44100)),
5698                             str(disc_id_obj.from_cddareader(cddareader)))
5699        finally:
5700            rmtree(dir)
5701
5702    @LIB_FREEDB
5703    def test_testcd(self):
5704        from audiotools import read_sheet
5705        from audiotools.freedb import DiscID
5706        from audiotools.cdio import CDDAReader
5707        from shutil import rmtree
5708
5709        # use the official FreeDB test disc to verify ID calculation
5710
5711        testdir = tempfile.mkdtemp()
5712        try:
5713            # dump CD image to temporary directory
5714
5715            with open(os.path.join(testdir, "CDImage.cue"), "wb") as f:
5716                f.write(b"FILE \"CDImage.bin\" BINARY\r\n" +
5717                        b"TRACK 01 AUDIO\r\nINDEX 00 00:00:00\r\n" +
5718                        b"INDEX 01 00:01:71\r\n")
5719
5720            with open(os.path.join(testdir, "CDImage.bin"), "wb") as f:
5721                f.write(b"\x00" * 60328800)
5722
5723            # ensure DiscID works from CDDAReader
5724            discid = DiscID.from_cddareader(
5725               CDDAReader(os.path.join(testdir, "CDImage.cue")))
5726
5727            self.assertEqual(discid.__unicode__(), u"03015501")
5728            self.assertEqual(discid.track_count, 1)
5729            self.assertEqual(discid.offsets, [296])
5730            self.assertEqual(discid.playable_length, 344)
5731
5732            # ensure DiscID works from cuesheet + length
5733            discid = DiscID.from_sheet(
5734                sheet=read_sheet(os.path.join(testdir, "CDImage.cue")),
5735                total_pcm_frames=60328800 // 4,
5736                sample_rate=44100)
5737
5738            self.assertEqual(discid.__unicode__(), u"03015501")
5739            self.assertEqual(discid.track_count, 1)
5740            self.assertEqual(discid.offsets, [296])
5741            self.assertEqual(discid.playable_length, 344)
5742        finally:
5743            rmtree(testdir)
5744
5745    @LIB_FREEDB
5746    def test_discid(self):
5747        from audiotools.freedb import DiscID
5748
5749        with open("freedb_test_discid-1.cue", "rb") as cue:
5750            self.__test_disc_id_tracks__(
5751                disc_id_obj=DiscID,
5752                track_lengths=[7939176, 4799256, 6297480, 5383140,
5753                               5246136, 5052684, 5013876],
5754                cuesheet=cue.read(),
5755                disc_id="5A038407")
5756
5757        with open("freedb_test_discid-2.cue", "rb") as cue:
5758            self.__test_disc_id_tracks__(
5759                disc_id_obj=DiscID,
5760                track_lengths=[1339464, 4048380, 692076, 10600464, 10602816,
5761                               1178940, 7454664, 2664816, 989604, 7008960,
5762                               9632616, 1070160, 6094620, 1622880, 13361124,
5763                               403956, 5208504, 7373520, 483336, 12012840,
5764                               8534820, 439824, 7626360, 1262436, 4874520,
5765                               398664, 11229036, 483924, 9003456, 883764,
5766                               5018580],
5767                cuesheet=cue.read(),
5768                disc_id="BE0D9A1F")
5769
5770        with open("freedb_test_discid-3.cue", "rb") as cue:
5771            self.__test_disc_id__(
5772                disc_id_obj=DiscID,
5773                total_length=190928304,
5774                cuesheet=cue.read(),
5775                disc_id="A610E90A")
5776
5777        with open("freedb_test_discid-4.cue", "rb") as cue:
5778            self.__test_disc_id__(
5779                disc_id_obj=DiscID,
5780                total_length=127937040,
5781                cuesheet=cue.read(),
5782                disc_id="CE0AD30E")
5783
5784        with open("freedb_test_discid-5.cue", "rb") as cue:
5785            self.__test_disc_id__(
5786                disc_id_obj=DiscID,
5787                total_length=119882616,
5788                cuesheet=cue.read(),
5789                disc_id="FC0A9E14")
5790
5791
5792class Test_MusicBrainz(Test_FreeDB):
5793    @LIB_MUSICBRAINZ
5794    def test_testcd(self):
5795        # test disc constructed from MusicBrainz's documentation
5796
5797        from audiotools.musicbrainz import DiscID
5798        from audiotools.freedb import DiscID as FDiscID
5799        from audiotools.cdio import CDDAReader
5800        from audiotools import read_sheet
5801        from shutil import rmtree
5802
5803        testdir = tempfile.mkdtemp()
5804        try:
5805            # dump CD image to temporary directory
5806            with open(os.path.join(testdir, "CDImage.cue"), "wb") as f:
5807                f.write(b'FILE "CDImage.bin" BINARY\n  ' +
5808                        b'TRACK 01 AUDIO\nINDEX 01 00:00:00\n  ' +
5809                        b'TRACK 02 AUDIO\n    INDEX 01 03:22:63\n  ' +
5810                        b'TRACK 03 AUDIO\n    INDEX 01 07:08:64\n  ' +
5811                        b'TRACK 04 AUDIO\n    INDEX 01 10:19:17\n  ' +
5812                        b'TRACK 05 AUDIO\n    INDEX 01 14:03:39\n  ' +
5813                        b'TRACK 06 AUDIO\n    INDEX 01 17:51:14\n')
5814
5815            with open(os.path.join(testdir, "CDImage.bin"), "wb") as f:
5816                f.write(b"\x00" * 224173824)
5817
5818            # ensure DiscID works with CDDAReader
5819            cddareader = CDDAReader(os.path.join(testdir, "CDImage.cue"))
5820
5821            discid = DiscID.from_cddareader(cddareader)
5822
5823            self.assertEqual(discid.__unicode__(),
5824                             u"49HHV7Eb8UKF3aQiNmu1GR8vKTY-")
5825
5826            self.assertEqual(FDiscID.from_cddareader(cddareader).__unicode__(),
5827                             u"3404F606")
5828
5829            # ensure DiscID works with cuesheet + length
5830            discid = DiscID.from_sheet(
5831                sheet=read_sheet(os.path.join(testdir, "CDImage.cue")),
5832                total_pcm_frames=224173824 // 4,
5833                sample_rate=44100)
5834
5835            self.assertEqual(discid.__unicode__(),
5836                             u"49HHV7Eb8UKF3aQiNmu1GR8vKTY-")
5837
5838            self.assertEqual(FDiscID.from_cddareader(cddareader).__unicode__(),
5839                             u"3404F606")
5840        finally:
5841            rmtree(testdir)
5842
5843    @LIB_MUSICBRAINZ
5844    def test_discid(self):
5845        from audiotools.musicbrainz import DiscID
5846
5847        with open("freedb_test_discid-1.cue", "rb") as cue:
5848            self.__test_disc_id_tracks__(
5849                disc_id_obj=DiscID,
5850                track_lengths=[7939176, 4799256, 6297480, 5383140,
5851                               5246136, 5052684, 5013876],
5852                cuesheet=cue.read(),
5853                disc_id="SJco4q4a9rzKdBw7HcFvBQugKc8-")
5854
5855        with open("freedb_test_discid-2.cue", "rb") as cue:
5856            self.__test_disc_id_tracks__(
5857                disc_id_obj=DiscID,
5858                track_lengths=[1339464, 4048380, 692076, 10600464, 10602816,
5859                               1178940, 7454664, 2664816, 989604, 7008960,
5860                               9632616, 1070160, 6094620, 1622880, 13361124,
5861                               403956, 5208504, 7373520, 483336, 12012840,
5862                               8534820, 439824, 7626360, 1262436, 4874520,
5863                               398664, 11229036, 483924, 9003456, 883764,
5864                               5018580],
5865                cuesheet=cue.read(),
5866                disc_id="yrelpXuXXP2WKDpTUqrS62keIFE-")
5867
5868        with open("freedb_test_discid-3.cue", "rb") as cue:
5869            self.__test_disc_id__(
5870                disc_id_obj=DiscID,
5871                total_length=190928304,
5872                cuesheet=cue.read(),
5873                disc_id="naJ8mpfbMHx_qQnJbyRx4lE_h4E-")
5874
5875        with open("freedb_test_discid-4.cue", "rb") as cue:
5876            self.__test_disc_id__(
5877                disc_id_obj=DiscID,
5878                total_length=127937040,
5879                cuesheet=cue.read(),
5880                disc_id="1o5aDeltYCEwCecU1cMMi1cvees-")
5881
5882        with open("freedb_test_discid-5.cue", "rb") as cue:
5883            self.__test_disc_id__(
5884                disc_id_obj=DiscID,
5885                total_length=119882616,
5886                cuesheet=cue.read(),
5887                disc_id="aS0RfXDrxs718yypC2AlgpNEIE0-")
5888
5889
5890class Test_Accuraterip(Test_FreeDB):
5891    @LIB_ACCURATERIP
5892    def test_testcd(self):
5893        # not sure if there is an AccurateRip test disc
5894
5895        self.assertTrue(True)
5896
5897    @LIB_ACCURATERIP
5898    def test_discid(self):
5899        from audiotools.accuraterip import DiscID
5900
5901        with open("freedb_test_discid-1.cue", "rb") as cue:
5902            self.__test_disc_id_tracks__(
5903                disc_id_obj=DiscID,
5904                track_lengths=[7939176, 4799256, 6297480, 5383140,
5905                               5246136, 5052684, 5013876],
5906                cuesheet=cue.read(),
5907                disc_id="dBAR-007-00045db7-0019b8d8-5a038407.bin")
5908
5909        with open("freedb_test_discid-2.cue", "rb") as cue:
5910            self.__test_disc_id_tracks__(
5911                disc_id_obj=DiscID,
5912                track_lengths=[1339464, 4048380, 692076, 10600464, 10602816,
5913                               1178940, 7454664, 2664816, 989604, 7008960,
5914                               9632616, 1070160, 6094620, 1622880, 13361124,
5915                               403956, 5208504, 7373520, 483336, 12012840,
5916                               8534820, 439824, 7626360, 1262436, 4874520,
5917                               398664, 11229036, 483924, 9003456, 883764,
5918                               5018580],
5919                cuesheet=cue.read(),
5920                disc_id="dBAR-031-003fee31-058c64b9-be0d9a1f.bin")
5921
5922        with open("freedb_test_discid-3.cue", "rb") as cue:
5923            self.__test_disc_id__(
5924                disc_id_obj=DiscID,
5925                total_length=190928304,
5926                cuesheet=cue.read(),
5927                disc_id="dBAR-010-00193b54-00c9f723-a610e90a.bin")
5928
5929        with open("freedb_test_discid-4.cue", "rb") as cue:
5930            self.__test_disc_id__(
5931                disc_id_obj=DiscID,
5932                total_length=127937040,
5933                cuesheet=cue.read(),
5934                disc_id="dBAR-014-001977f9-01097144-ce0ad30e.bin")
5935
5936        with open("freedb_test_discid-5.cue", "rb") as cue:
5937            self.__test_disc_id__(
5938                disc_id_obj=DiscID,
5939                total_length=119882616,
5940                cuesheet=cue.read(),
5941                disc_id="dBAR-020-001d4d13-01bab5f6-fc0a9e14.bin")
5942
5943    @LIB_ACCURATERIP
5944    def test_checksum(self):
5945        from audiotools.accuraterip import Checksum
5946        from test_streams import Simple_Sine, Generate02
5947
5948        # sanity checking for initial options
5949        self.assertRaises(ValueError,
5950                          Checksum,
5951                          total_pcm_frames=10,
5952                          sample_rate=0,
5953                          is_first=False,
5954                          is_last=False,
5955                          accurateripv2_offset=0)
5956
5957        self.assertRaises(ValueError,
5958                          Checksum,
5959                          total_pcm_frames=10,
5960                          sample_rate=-1,
5961                          is_first=False,
5962                          is_last=False,
5963                          accurateripv2_offset=0)
5964
5965        self.assertRaises(ValueError,
5966                          Checksum,
5967                          total_pcm_frames=0,
5968                          sample_rate=44100,
5969                          is_first=False,
5970                          is_last=False,
5971                          accurateripv2_offset=0)
5972
5973        self.assertRaises(ValueError,
5974                          Checksum,
5975                          total_pcm_frames=-1,
5976                          sample_rate=44100,
5977                          is_first=False,
5978                          is_last=False,
5979                          accurateripv2_offset=0)
5980
5981        self.assertRaises(ValueError,
5982                          Checksum,
5983                          total_pcm_frames=-1,
5984                          sample_rate=-1,
5985                          is_first=False,
5986                          is_last=False,
5987                          accurateripv2_offset=0)
5988
5989        self.assertRaises(ValueError,
5990                          Checksum,
5991                          total_pcm_frames=10,
5992                          sample_rate=44100,
5993                          is_first=False,
5994                          is_last=False,
5995                          pcm_frame_range=0,
5996                          accurateripv2_offset=0)
5997
5998        self.assertRaises(ValueError,
5999                          Checksum,
6000                          total_pcm_frames=10,
6001                          sample_rate=44100,
6002                          is_first=False,
6003                          is_last=False,
6004                          pcm_frame_range=-1,
6005                          accurateripv2_offset=0)
6006
6007        self.assertRaises(ValueError,
6008                          Checksum,
6009                          total_pcm_frames=10,
6010                          sample_rate=44100,
6011                          is_first=False,
6012                          is_last=False,
6013                          accurateripv2_offset=-1)
6014
6015        checksum = Checksum(total_pcm_frames=200000,
6016                            sample_rate=44100,
6017                            is_first=False,
6018                            is_last=False,
6019                            pcm_frame_range=1)
6020
6021        for v in [None, 0, 1, "foo", "bar"]:
6022            self.assertRaises(TypeError,
6023                              checksum.update,
6024                              v)
6025
6026        # sanity checking for stream parameters
6027        for params in [[200000, 44100, 0x04, 8, (25, 10000)],      # 8bps 1ch
6028                       [200000, 44100, 0x03, 8, (25, 10000),
6029                                                (50, 20000)],      # 8bps 2ch
6030                       [200000, 44100, 0x07, 8, (25, 10000),
6031                                                (50, 20000),
6032                                                (120, 30000)],     # 8bps 3ch
6033                       [200000, 44100, 0x04, 16, (6400, 10000)],   # 16bps 1ch
6034                       [200000, 44100, 0x07, 16, (6400, 10000),
6035                                                 (12800, 20000),
6036                                                 (30720, 30000)],  # 16bps 3ch
6037                       [200000, 44100, 0x04, 24,
6038                        (1638400, 10000)],                         # 24bps 1ch
6039                       [200000, 44100, 0x03, 24,
6040                        (1638400, 10000),
6041                        (3276800, 20000)],                         # 24bps 2ch
6042                       [200000, 44100, 0x07, 24,
6043                        (1638400, 10000),
6044                        (3276800, 20000),
6045                        (7864320, 30000)]]:                        # 24bps 3ch
6046            sine = Simple_Sine(*params)
6047            checksum = Checksum(total_pcm_frames=200000,
6048                                sample_rate=44100,
6049                                is_first=False,
6050                                is_last=False,
6051                                pcm_frame_range=1)
6052            self.assertRaises(ValueError,
6053                              audiotools.transfer_data,
6054                              sine.read, checksum.update)
6055
6056        # ensure very short streams work correctly
6057        # whether middle, first, last or only track
6058        short_track = Checksum(total_pcm_frames=1,
6059                               sample_rate=44100,
6060                               is_first=False,
6061                               is_last=False,
6062                               pcm_frame_range=1)
6063        audiotools.transfer_data(
6064            Generate02(44100).read, short_track.update)
6065        self.assertEqual(short_track.checksums_v1(), [0x7FFF8000])
6066        self.assertEqual(short_track.checksum_v2(), 0x7FFF8000)
6067
6068        short_track = Checksum(total_pcm_frames=1,
6069                               sample_rate=44100,
6070                               is_first=True,
6071                               is_last=False,
6072                               pcm_frame_range=1)
6073        audiotools.transfer_data(
6074            Generate02(44100).read, short_track.update)
6075        self.assertEqual(short_track.checksums_v1(), [0])
6076        self.assertEqual(short_track.checksum_v2(), 0)
6077
6078        short_track = Checksum(total_pcm_frames=1,
6079                               sample_rate=44100,
6080                               is_first=False,
6081                               is_last=True,
6082                               pcm_frame_range=1)
6083        audiotools.transfer_data(
6084            Generate02(44100).read, short_track.update)
6085        self.assertEqual(short_track.checksums_v1(), [0])
6086        self.assertEqual(short_track.checksum_v2(), 0)
6087
6088        short_track = Checksum(total_pcm_frames=1,
6089                               sample_rate=44100,
6090                               is_first=True,
6091                               is_last=True,
6092                               pcm_frame_range=1)
6093        audiotools.transfer_data(
6094            Generate02(44100).read, short_track.update)
6095        self.assertEqual(short_track.checksums_v1(), [0])
6096        self.assertEqual(short_track.checksum_v2(), 0)
6097
6098        track = audiotools.open("tone.flac")
6099
6100        # ensure various checksum range options work correctly
6101        # values taken from reference implementation
6102
6103        middle_track = Checksum(total_pcm_frames=track.total_frames(),
6104                                sample_rate=track.sample_rate(),
6105                                is_first=False,
6106                                is_last=False)
6107        with track.to_pcm() as pcmreader:
6108            audiotools.transfer_data(pcmreader.read, middle_track.update)
6109        self.assertEqual(middle_track.checksums_v1(), [0xF6E4AD26])
6110        self.assertEqual(middle_track.checksum_v2(), 0x4781FC37)
6111
6112        middle_track = Checksum(total_pcm_frames=track.total_frames(),
6113                                sample_rate=track.sample_rate(),
6114                                is_first=False,
6115                                is_last=False,
6116                                pcm_frame_range=3,
6117                                accurateripv2_offset=1)
6118        with audiotools.PCMReaderWindow(track.to_pcm(),
6119                                        -1,
6120                                        track.total_frames() + 2) as pcmreader:
6121            audiotools.transfer_data(pcmreader.read, middle_track.update)
6122        self.assertEqual(middle_track.checksums_v1(), [0xCA705E69,
6123                                                       0xF6E4AD26,
6124                                                       0x951FB12F])
6125        self.assertEqual(middle_track.checksum_v2(), 0x4781FC37)
6126        # self.assertEqual(middle_track.checksums_v2(), [0x1B0BE28C,
6127        #                                                0x4781FC37,
6128        #                                                0xE5B9A28E])
6129
6130        first_track = Checksum(total_pcm_frames=track.total_frames(),
6131                               sample_rate=track.sample_rate(),
6132                               is_first=True,
6133                               is_last=False)
6134        with track.to_pcm() as pcmreader:
6135            audiotools.transfer_data(pcmreader.read, first_track.update)
6136        self.assertEqual(first_track.checksums_v1(), [0xEE4DBEB4])
6137        self.assertEqual(first_track.checksum_v2(), 0x3ECA2C04)
6138
6139        first_track = Checksum(total_pcm_frames=track.total_frames(),
6140                               sample_rate=track.sample_rate(),
6141                               is_first=True,
6142                               is_last=False,
6143                               pcm_frame_range=3,
6144                               accurateripv2_offset=1)
6145        with audiotools.PCMReaderWindow(track.to_pcm(),
6146                                        -1,
6147                                        track.total_frames() + 2) as pcmreader:
6148            audiotools.transfer_data(pcmreader.read, first_track.update)
6149        self.assertEqual(first_track.checksums_v1(), [0x7CC66A55,
6150                                                      0xEE4DBEB4,
6151                                                      0x9A58C7EC])
6152        self.assertEqual(first_track.checksum_v2(), 0x3ECA2C04)
6153        # self.assertEqual(first_track.checksums_v2(), [0xCD410A97,
6154        #                                               0x3ECA2C04,
6155        #                                               0xEAD1D8AA])
6156
6157        last_track = Checksum(total_pcm_frames=track.total_frames(),
6158                              sample_rate=track.sample_rate(),
6159                              is_first=False,
6160                              is_last=True)
6161        with track.to_pcm() as pcmreader:
6162            audiotools.transfer_data(pcmreader.read, last_track.update)
6163        self.assertEqual(last_track.checksums_v1(), [0xF819E862])
6164        self.assertEqual(last_track.checksum_v2(), 0x222E32FA)
6165
6166        last_track = Checksum(total_pcm_frames=track.total_frames(),
6167                              sample_rate=track.sample_rate(),
6168                              is_first=False,
6169                              is_last=True,
6170                              pcm_frame_range=3,
6171                              accurateripv2_offset=1)
6172        with audiotools.PCMReaderWindow(track.to_pcm(),
6173                                        -1,
6174                                        track.total_frames() + 2) as pcmreader:
6175            audiotools.transfer_data(pcmreader.read, last_track.update)
6176        self.assertEqual(last_track.checksums_v1(), [0x682F9316,
6177                                                     0xF819E862,
6178                                                     0x00DBAF4E])
6179        self.assertEqual(last_track.checksum_v2(), 0x222E32FA)
6180        # self.assertEqual(last_track.checksums_v2(), [0x92419BB8,
6181        #                                              0x222E32FA,
6182        #                                              0x2AF10061])
6183
6184        only_track = Checksum(total_pcm_frames=track.total_frames(),
6185                              sample_rate=track.sample_rate(),
6186                              is_first=True,
6187                              is_last=True)
6188        with track.to_pcm() as pcmreader:
6189            audiotools.transfer_data(pcmreader.read, only_track.update)
6190        self.assertEqual(only_track.checksums_v1(), [0xEF82F9F0])
6191        self.assertEqual(only_track.checksum_v2(), 0x197662C7)
6192
6193        only_track = Checksum(total_pcm_frames=track.total_frames(),
6194                              sample_rate=track.sample_rate(),
6195                              is_first=True,
6196                              is_last=True,
6197                              pcm_frame_range=3,
6198                              accurateripv2_offset=1)
6199        with audiotools.PCMReaderWindow(track.to_pcm(),
6200                                        -1,
6201                                        track.total_frames() + 2) as pcmreader:
6202            audiotools.transfer_data(pcmreader.read, only_track.update)
6203        self.assertEqual(only_track.checksums_v1(), [0x1A859F02,
6204                                                     0xEF82F9F0,
6205                                                     0x0614C60B])
6206        self.assertEqual(only_track.checksum_v2(), 0x197662C7)
6207        # self.assertEqual(only_track.checksums_v2(), [0x4476C3C3,
6208        #                                              0x197662C7,
6209        #                                              0x3009367D])
6210
6211        # ensure feeding checksum with not enough samples
6212        # raises ValueError at checksums()-time
6213        insufficient_samples = Checksum(
6214            total_pcm_frames=track.total_frames() + 1,
6215            sample_rate=track.sample_rate(),
6216            is_first=False,
6217            is_last=False,
6218            pcm_frame_range=1)
6219        with track.to_pcm() as pcmreader:
6220            audiotools.transfer_data(pcmreader.read,
6221                                     insufficient_samples.update)
6222        self.assertRaises(ValueError, insufficient_samples.checksums_v1)
6223        self.assertRaises(ValueError, insufficient_samples.checksum_v2)
6224
6225        # ensure insufficient samples also works with a range
6226        insufficient_samples = Checksum(
6227            total_pcm_frames=track.total_frames(),
6228            sample_rate=track.sample_rate(),
6229            is_first=False,
6230            is_last=False,
6231            pcm_frame_range=2,
6232            accurateripv2_offset=1)
6233        with track.to_pcm() as pcmreader:
6234            audiotools.transfer_data(pcmreader.read,
6235                                     insufficient_samples.update)
6236        self.assertRaises(ValueError, insufficient_samples.checksums_v1)
6237        self.assertRaises(ValueError, insufficient_samples.checksum_v2)
6238
6239        # ensure feeding checksum with too many samples
6240        # raises ValueError at update()-time
6241        too_many_samples = Checksum(
6242            total_pcm_frames=track.total_frames() - 1,
6243            sample_rate=track.sample_rate(),
6244            is_first=False,
6245            is_last=False,
6246            pcm_frame_range=1)
6247        with track.to_pcm() as pcmreader:
6248            self.assertRaises(ValueError,
6249                              audiotools.transfer_data,
6250                              pcmreader.read,
6251                              too_many_samples.update)
6252
6253        # ensure too many samples also works with a range
6254        too_many_samples = Checksum(
6255            total_pcm_frames=track.total_frames() - 2,
6256            sample_rate=track.sample_rate(),
6257            is_first=False,
6258            is_last=False,
6259            pcm_frame_range=2)
6260        with track.to_pcm() as pcmreader:
6261            self.assertRaises(ValueError,
6262                              audiotools.transfer_data,
6263                              pcmreader.read,
6264                              too_many_samples.update)
6265
6266    @LIB_ACCURATERIP
6267    def test_perform_lookup(self):
6268        from audiotools.freedb import DiscID as FDiscID
6269        from audiotools.accuraterip import DiscID, perform_lookup
6270        import time
6271
6272        offsets = [0, 2278, 9163, 10340, 28368, 46400, 48405,
6273                   61083, 65615, 67298, 79218, 95600, 97420,
6274                   107785, 110545, 133268, 133955, 142813,
6275                   155353, 156175, 176605, 191120, 191868,
6276                   204838, 206985, 215275, 215953, 235050,
6277                   235873, 251185, 252688]
6278
6279        freedb_disc_id = FDiscID(
6280            offsets=offsets,
6281            total_length=3482,
6282            track_count=31,
6283            playable_length=3482)
6284
6285        disc_id = DiscID(
6286            track_numbers=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
6287                           13, 14, 15, 16, 17, 18, 19, 20, 21, 22,
6288                           23, 24, 25, 26, 27, 28, 29, 30, 31],
6289            track_offsets=offsets,
6290            lead_out_offset=261223,
6291            freedb_disc_id=freedb_disc_id)
6292
6293        matches = perform_lookup(disc_id)
6294        time.sleep(1)
6295
6296        self.assertEqual(set(matches.keys()), set(range(1, 32)))
6297
6298    @LIB_ACCURATERIP
6299    def test_accuraterip_lookup(self):
6300        import time
6301
6302        lengths = [1339464, 4048380, 692076, 10600464, 10602816,
6303                   1178940, 7454664, 2664816, 989604, 7008960,
6304                   9632616, 1070160, 6094620, 1622880, 13361124,
6305                   403956, 5208504, 7373520, 483336, 12012840,
6306                   8534820, 439824, 7626360, 1262436, 4874520,
6307                   398664, 11229036, 483924, 9003456, 883764, 5018580]
6308        tempfiles = [tempfile.NamedTemporaryFile(suffix=".flac")
6309                     for l in lengths]
6310
6311        tracks = [audiotools.FlacAudio.from_pcm(
6312                  t.name,
6313                  EXACT_SILENCE_PCM_Reader(pcm_frames=l,
6314                                           sample_rate=44100,
6315                                           channels=2,
6316                                           channel_mask=0x3,
6317                                           bits_per_sample=16),
6318                  total_pcm_frames=l)
6319                  for (t, l) in zip(tempfiles, lengths)]
6320
6321        matches = audiotools.accuraterip_lookup(tracks)
6322        time.sleep(1)
6323
6324        for t in tempfiles:
6325            t.close()
6326
6327        self.assertEqual(set(matches.keys()), set(range(1, 32)))
6328
6329    @LIB_ACCURATERIP
6330    def test_accuraterip_sheet_lookup(self):
6331        import time
6332
6333        cuesheet = tempfile.NamedTemporaryFile(suffix=".cue")
6334        with open("freedb_test_discid-2.cue", "rb") as f:
6335            cuesheet.write(f.read())
6336            cuesheet.flush()
6337        sheet = audiotools.read_sheet(cuesheet.name)
6338        cuesheet.close()
6339
6340        matches = audiotools.accuraterip_sheet_lookup(sheet, 153599124, 44100)
6341        time.sleep(1)
6342
6343        self.assertEqual(set(matches.keys()), set(range(1, 32)))
6344
6345    @LIB_ACCURATERIP
6346    def test_match_offset(self):
6347        from audiotools.accuraterip import match_offset
6348
6349        # no checksums, raise Exception
6350        self.assertRaises(ValueError,
6351                          match_offset,
6352                          [(1, 1, 0)],
6353                          [],
6354                          0)
6355
6356        # no AR matches, one checksum, no checksum found
6357        (checksum,
6358         confidence,
6359         offset) = match_offset([], [1], 0)
6360        self.assertEqual(checksum, 1)
6361        self.assertEqual(confidence, None)
6362        self.assertEqual(offset, 0)
6363
6364        # one AR match, one checksum, no checksum found in matches
6365        (checksum,
6366         confidence,
6367         offset) = match_offset([(1, 1, 0)], [10], 0)
6368        self.assertEqual(checksum, 10)
6369        self.assertEqual(confidence, None)
6370        self.assertEqual(offset, 0)
6371
6372        # one AR match, one checksum, one checksum found in matches
6373        (checksum,
6374         confidence,
6375         offset) = match_offset([(1, 1, 0)], [1], 0)
6376        self.assertEqual(checksum, 1)
6377        self.assertEqual(confidence, 1)
6378        self.assertEqual(offset, 0)
6379
6380        # one AR match, multiple checksums, no checksum found in matches
6381        (checksum,
6382         confidence,
6383         offset) = match_offset([(1, 1, 0)], [2, 3, 4], -1)
6384        self.assertEqual(checksum, 3)
6385        self.assertEqual(confidence, None)
6386        self.assertEqual(offset, 0)
6387
6388        # one AR match, multiple checksums, one checksum found in matches
6389        (checksum,
6390         confidence,
6391         offset) = match_offset([(1, 1, 0)], [99, 0, 1], -1)
6392        self.assertEqual(checksum, 1)
6393        self.assertEqual(confidence, 1)
6394        self.assertEqual(offset, 1)
6395
6396        # multiple AR matches, one checksum, no checksum found in matches
6397        (checksum,
6398         confidence,
6399         offset) = match_offset([(1, 1, 0), (2, 2, 0)], [3], 0)
6400        self.assertEqual(checksum, 3)
6401        self.assertEqual(confidence, None)
6402        self.assertEqual(offset, 0)
6403
6404        # multiple AR matches, one checksum, one checksum found in matches
6405        (checksum,
6406         confidence,
6407         offset) = match_offset([(1, 1, 0), (2, 2, 0)], [2], 0)
6408        self.assertEqual(checksum, 2)
6409        self.assertEqual(confidence, 2)
6410        self.assertEqual(offset, 0)
6411
6412        # multiple AR matches, multiple checksums, no checksums found
6413        (checksum,
6414         confidence,
6415         offset) = match_offset([(1, 1, 0), (2, 2, 0)], [3, 4, 5], -1)
6416        self.assertEqual(checksum, 4)
6417        self.assertEqual(confidence, None)
6418        self.assertEqual(offset, 0)
6419
6420        # multiple AR matches, multiple checksums, one match found
6421        (checksum,
6422         confidence,
6423         offset) = match_offset([(1, 1, 0), (2, 2, 0)], [98, 99, 2], -1)
6424        self.assertEqual(checksum, 2)
6425        self.assertEqual(confidence, 2)
6426        self.assertEqual(offset, 1)
6427
6428        # multiple AR matches, multiple checksums, multiple matches found
6429        (checksum,
6430         confidence,
6431         offset) = match_offset([(1, 1, 0), (2, 2, 0)], [0, 1, 2], -1)
6432        self.assertEqual(checksum, 2)
6433        self.assertEqual(confidence, 2)
6434        self.assertEqual(offset, 1)
6435
6436
6437class Test_Lookup(unittest.TestCase):
6438    @LIB_FREEDB
6439    @LIB_MUSICBRAINZ
6440    def test_metadata_lookup(self):
6441        from audiotools.freedb import DiscID as FDiscID
6442        from audiotools.musicbrainz import DiscID as MDiscID
6443        import time
6444
6445        freedb_disc_id = FDiscID(
6446            offsets=[150, 2428, 9313, 10490, 28518, 46550, 48555, 61233,
6447                     65765, 67448, 79368, 95750, 97570, 107935, 110695,
6448                     133418, 134105, 142963, 155503, 156325, 176755,
6449                     191270, 192018, 204988, 207135, 215425, 216103,
6450                     235200, 236023, 251335, 252838],
6451            total_length=3482,
6452            track_count=31,
6453            playable_length=3482)
6454        musicbrainz_disc_id = MDiscID(
6455            first_track_number=1,
6456            last_track_number=31,
6457            lead_out_offset=261373,
6458            offsets=[150, 2428, 9313, 10490, 28518, 46550, 48555, 61233,
6459                     65765, 67448, 79368, 95750, 97570, 107935, 110695,
6460                     133418, 134105, 142963, 155503, 156325, 176755,
6461                     191270, 192018, 204988, 207135, 215425, 216103,
6462                     235200, 236023, 251335, 252838])
6463        total_tracks = 31
6464
6465        self.assertEqual(str(freedb_disc_id),
6466                         "BE0D9A1F")
6467        self.assertEqual(str(musicbrainz_disc_id),
6468                         "yrelpXuXXP2WKDpTUqrS62keIFE-")
6469
6470        # since the contents of lookup services
6471        # can change over time, all we can really verify
6472        # is the track count
6473        for choice in audiotools.metadata_lookup(
6474            musicbrainz_disc_id=musicbrainz_disc_id,
6475            freedb_disc_id=freedb_disc_id):
6476            self.assertEqual(len(choice), total_tracks)
6477        time.sleep(1)
6478
6479    @LIB_FREEDB
6480    @LIB_MUSICBRAINZ
6481    def test_cddareader_lookup(self):
6482        from shutil import rmtree
6483        from audiotools.cdio import CDDAReader
6484        from audiotools.freedb import DiscID as FDiscID
6485        from audiotools.musicbrainz import DiscID as MDiscID
6486        import time
6487
6488        dir = tempfile.mkdtemp()
6489        try:
6490            # dump cuesheet to file
6491            with open(os.path.join(dir, "CDImage.cue"), "wb") as w:
6492                with open("freedb_test_discid-2.cue", "rb") as r:
6493                    w.write(r.read())
6494
6495            # dump CD image to file
6496            with open(os.path.join(dir, "CDImage.bin"), "wb") as w:
6497                w.write(b"\x00" * 2 * 2 * 153599124)
6498
6499            r = CDDAReader(os.path.join(dir, "CDImage.cue"))
6500
6501            freedb_disc_id = FDiscID.from_cddareader(r)
6502            musicbrainz_disc_id = MDiscID.from_cddareader(r)
6503            total_tracks = 31
6504
6505            self.assertEqual(str(freedb_disc_id),
6506                             "BE0D9A1F")
6507            self.assertEqual(str(musicbrainz_disc_id),
6508                             "yrelpXuXXP2WKDpTUqrS62keIFE-")
6509
6510            # since the contents of lookup services
6511            # can change over time, all we can really verify
6512            # is the track count
6513            for choice in audiotools.metadata_lookup(
6514                musicbrainz_disc_id=musicbrainz_disc_id,
6515                freedb_disc_id=freedb_disc_id):
6516                self.assertEqual(len(choice), total_tracks)
6517            time.sleep(1)
6518        finally:
6519            rmtree(dir)
6520
6521    @LIB_FREEDB
6522    @LIB_MUSICBRAINZ
6523    def test_track_lookup(self):
6524        from audiotools.freedb import DiscID as FDiscID
6525        from audiotools.musicbrainz import DiscID as MDiscID
6526        import time
6527
6528        lengths = [1339464, 4048380, 692076, 10600464, 10602816,
6529                   1178940, 7454664, 2664816, 989604, 7008960,
6530                   9632616, 1070160, 6094620, 1622880, 13361124,
6531                   403956, 5208504, 7373520, 483336, 12012840,
6532                   8534820, 439824, 7626360, 1262436, 4874520,
6533                   398664, 11229036, 483924, 9003456, 883764, 5018580]
6534        tempfiles = [tempfile.NamedTemporaryFile(suffix=".flac")
6535                     for l in lengths]
6536
6537        tracks = [audiotools.FlacAudio.from_pcm(
6538                  t.name,
6539                  EXACT_SILENCE_PCM_Reader(pcm_frames=l,
6540                                           sample_rate=44100,
6541                                           channels=2,
6542                                           channel_mask=0x3,
6543                                           bits_per_sample=16),
6544                  total_pcm_frames=l)
6545                  for (t, l) in zip(tempfiles, lengths)]
6546
6547        freedb_disc_id = FDiscID.from_tracks(tracks)
6548        musicbrainz_disc_id = MDiscID.from_tracks(tracks)
6549        total_tracks = 31
6550
6551        self.assertEqual(str(freedb_disc_id),
6552                         "BE0D9A1F")
6553        self.assertEqual(str(musicbrainz_disc_id),
6554                         "yrelpXuXXP2WKDpTUqrS62keIFE-")
6555
6556        # since the contents of lookup services
6557        # can change over time, all we can really verify
6558        # is the track count
6559        for choice in audiotools.metadata_lookup(
6560            musicbrainz_disc_id=musicbrainz_disc_id,
6561            freedb_disc_id=freedb_disc_id):
6562            self.assertEqual(len(choice), total_tracks)
6563        time.sleep(1)
6564
6565        for t in tempfiles:
6566            t.close()
6567
6568    @LIB_FREEDB
6569    @LIB_MUSICBRAINZ
6570    def test_sheet_lookup(self):
6571        from audiotools.freedb import DiscID as FDiscID
6572        from audiotools.musicbrainz import DiscID as MDiscID
6573        import time
6574
6575        with tempfile.NamedTemporaryFile(suffix=".cue") as cuesheet:
6576            with open("freedb_test_discid-2.cue", "rb") as r:
6577                cuesheet.write(r.read())
6578                cuesheet.flush()
6579            sheet = audiotools.read_sheet(cuesheet.name)
6580
6581        freedb_disc_id = FDiscID.from_sheet(sheet, 153599124, 44100)
6582        musicbrainz_disc_id = MDiscID.from_sheet(sheet, 153599124, 44100)
6583        total_tracks = 31
6584
6585        self.assertEqual(str(freedb_disc_id),
6586                         "BE0D9A1F")
6587        self.assertEqual(str(musicbrainz_disc_id),
6588                         "yrelpXuXXP2WKDpTUqrS62keIFE-")
6589
6590        # since the contents of lookup services
6591        # can change over time, all we can really verify
6592        # is the track count
6593        for choice in audiotools.metadata_lookup(
6594            musicbrainz_disc_id=musicbrainz_disc_id,
6595            freedb_disc_id=freedb_disc_id):
6596            self.assertEqual(len(choice), total_tracks)
6597        time.sleep(1)
6598
6599
6600class Test_Ogg(unittest.TestCase):
6601    @LIB_OGG
6602    def test_roundtrip(self):
6603        import audiotools.ogg
6604
6605        for packet_len in range(0, 1000):
6606            packet = os.urandom(packet_len)
6607            ogg_stream = BytesIO()
6608            self.assertEqual(packet_len, len(packet))
6609
6610            ogg_writer = audiotools.ogg.PageWriter(ogg_stream)
6611            for page in audiotools.ogg.packet_to_pages(packet, 1234):
6612                ogg_writer.write(page)
6613            ogg_writer.flush()
6614
6615            ogg_stream.seek(0)
6616
6617            ogg_reader = audiotools.ogg.PacketReader(
6618                audiotools.ogg.PageReader(ogg_stream))
6619
6620            self.assertEqual(packet, ogg_reader.read_packet())
6621
6622            ogg_writer.close()
6623            ogg_reader.close()
6624
6625
6626class Test_Image(unittest.TestCase):
6627    @LIB_IMAGE
6628    def test_metrics(self):
6629        from audiotools.image import image_metrics
6630
6631        with open("image_test_metrics-1.jpg", "rb") as f:
6632            jpeg = image_metrics(f.read())
6633            self.assertEqual(jpeg.width, 3)
6634            self.assertEqual(jpeg.height, 2)
6635            self.assertEqual(jpeg.bits_per_pixel, 24)
6636            self.assertEqual(jpeg.color_count, 0)
6637            self.assertEqual(jpeg.mime_type, "image/jpeg")
6638
6639        with open("image_test_metrics-2.png", "rb") as f:
6640            png1 = image_metrics(f.read())
6641            self.assertEqual(png1.width, 3)
6642            self.assertEqual(png1.height, 2)
6643            self.assertEqual(png1.bits_per_pixel, 24)
6644            self.assertEqual(png1.color_count, 0)
6645            self.assertEqual(png1.mime_type, "image/png")
6646
6647        with open("image_test_metrics-3.png", "rb") as f:
6648            png2 = image_metrics(f.read())
6649            self.assertEqual(png2.width, 3)
6650            self.assertEqual(png2.height, 2)
6651            self.assertEqual(png2.bits_per_pixel, 8)
6652            self.assertEqual(png2.color_count, 1)
6653            self.assertEqual(png2.mime_type, "image/png")
6654
6655        with open("image_test_metrics-4.gif", "rb") as f:
6656            gif = image_metrics(f.read())
6657            self.assertEqual(gif.width, 3)
6658            self.assertEqual(gif.height, 2)
6659            self.assertEqual(gif.bits_per_pixel, 8)
6660            self.assertEqual(gif.color_count, 2)
6661            self.assertEqual(gif.mime_type, "image/gif")
6662
6663        with open("image_test_metrics-5.bmp", "rb") as f:
6664            bmp = image_metrics(f.read())
6665            self.assertEqual(bmp.width, 3)
6666            self.assertEqual(bmp.height, 2)
6667            self.assertEqual(bmp.bits_per_pixel, 24)
6668            self.assertEqual(bmp.color_count, 0)
6669            self.assertEqual(bmp.mime_type, "image/x-ms-bmp")
6670
6671        with open("image_test_metrics-6.tiff", "rb") as f:
6672            tiff = image_metrics(f.read())
6673            self.assertEqual(tiff.width, 3)
6674            self.assertEqual(tiff.height, 2)
6675            self.assertEqual(tiff.bits_per_pixel, 24)
6676            self.assertEqual(tiff.color_count, 0)
6677            self.assertEqual(tiff.mime_type, "image/tiff")
6678
6679
6680class Test_ExecProgressQueue(unittest.TestCase):
6681    @LIB_CORE
6682    def test_queue(self):
6683        def range_sum(start, end, progress):
6684            import time
6685
6686            sum_ = 0
6687            for i in range(start, end):
6688                progress(start, end)
6689                sum_ += i
6690                time.sleep(0.1)
6691            return sum_
6692
6693        def range_sum_output(total):
6694            return u"%d" % (total)
6695
6696        for max_processes in range(1, 21):
6697            queue = audiotools.ExecProgressQueue(audiotools.SilentMessenger())
6698
6699            for i in range(100):
6700                queue.execute(
6701                    function=range_sum,
6702                    progress_text=u"Sum %d" % (i + 1),
6703                    completion_output=((u"Sum %d Finished" % (i + 1))
6704                                       if (i % 2) else range_sum_output),
6705                    start=i,
6706                    end=i + 10)
6707
6708            results = queue.run(100)
6709
6710            for i in range(max_processes):
6711                self.assertEqual(results[i], sum(range(i, i + 10)))
6712
6713
6714class Test_Output_Text(unittest.TestCase):
6715    @LIB_CORE
6716    def test_output_text(self):
6717        from audiotools import output_text
6718
6719        # ensure invalid colors and styles raise an exception
6720        self.assertRaises(ValueError,
6721                          output_text,
6722                          unicode_string=u"Foo",
6723                          fg_color="unknown")
6724
6725        self.assertRaises(ValueError,
6726                          output_text,
6727                          unicode_string=u"Foo",
6728                          bg_color="unknown")
6729
6730        self.assertRaises(ValueError,
6731                          output_text,
6732                          unicode_string=u"Foo",
6733                          style="unknown")
6734
6735        # ensure setting format returns new output_text with that format
6736        t1 = output_text(unicode_string=u"Foo")
6737        self.assertEqual(u"%s" % (t1,), u"Foo")
6738        self.assertEqual(t1.format(False), u"Foo")
6739        self.assertIn(u"Foo", t1.format(True))
6740        self.assertEqual(t1.fg_color(), None)
6741        self.assertEqual(t1.fg_color(), None)
6742        self.assertEqual(t1.style(), None)
6743
6744        t2 = t1.set_format(fg_color="black",
6745                           bg_color="blue",
6746                           style="underline")
6747        self.assertEqual(u"%s" % (t2,), u"Foo")
6748        self.assertEqual(t2.format(False), u"Foo")
6749        self.assertIn(u"Foo", t2.format(True))
6750        self.assertEqual(t2.fg_color(), "black")
6751        self.assertEqual(t2.bg_color(), "blue")
6752        self.assertEqual(t2.style(), "underline")
6753
6754        t3 = t2.set_format(fg_color=None,
6755                           bg_color=None,
6756                           style=None)
6757        self.assertEqual(u"%s" % (t3,), u"Foo")
6758        self.assertEqual(t3.format(False), u"Foo")
6759        self.assertIn(u"Foo", t3.format(True))
6760        self.assertEqual(t3.fg_color(), None)
6761        self.assertEqual(t3.fg_color(), None)
6762        self.assertEqual(t3.style(), None)
6763
6764        # ensure negative head, tail and split values raise ValueError
6765        self.assertRaises(ValueError,
6766                          t1.head,
6767                          -1)
6768
6769        self.assertRaises(ValueError,
6770                          t1.tail,
6771                          -1)
6772
6773        self.assertRaises(ValueError,
6774                          t1.split,
6775                          -1)
6776
6777        for string in [u"a",
6778                       u"Foo",
6779                       u"a" * 100,
6780                       u'\u1107' * 50,
6781                       u"a" + (u'\u1107' * 50),
6782                       u"a" + (u'\u1107' * 50) + u"b"]:
6783            for (fg_color,
6784                 bg_color,
6785                 style) in Possibilities([None, "black"],
6786                                         [None, "blue"],
6787                                         [None, "underline"]):
6788
6789                t = output_text(unicode_string=string,
6790                                fg_color=fg_color,
6791                                bg_color=bg_color,
6792                                style=style)
6793
6794                # ensure calling head returns a new output_text
6795                # with no more than "display_characters" and
6796                # with the same formatting as the original
6797                for i in range(len(t) + 5):
6798                    t2 = t.head(i)
6799                    self.assertLessEqual(len(t2), i)
6800                    self.assertEqual(t2.fg_color(), fg_color)
6801                    self.assertEqual(t2.bg_color(), bg_color)
6802                    self.assertEqual(t2.style(), style)
6803
6804                # ensure calling tail returns a new output_text
6805                # with no more than "display_characters" and
6806                # with the same formatting as the original
6807                for i in range(len(t) + 5):
6808                    t2 = t.tail(i)
6809                    self.assertLessEqual(len(t2), i)
6810                    self.assertEqual(t2.fg_color(), fg_color)
6811                    self.assertEqual(t2.bg_color(), bg_color)
6812                    self.assertEqual(t2.style(), style)
6813
6814                # ensure calling split returns a tuple of new output_text
6815                # with no more than "display_characters" in the head
6816                # and with the same formatting as the original
6817                for i in range(len(t) + 5):
6818                    (t2, t3) = t.split(i)
6819                    self.assertLessEqual(len(t2), i)
6820                    self.assertEqual(len(t2) + len(t3), len(t))
6821                    self.assertEqual(t2.fg_color(), fg_color)
6822                    self.assertEqual(t2.bg_color(), bg_color)
6823                    self.assertEqual(t2.style(), style)
6824                    self.assertEqual(t3.fg_color(), fg_color)
6825                    self.assertEqual(t3.bg_color(), bg_color)
6826                    self.assertEqual(t3.style(), style)
6827
6828    @LIB_CORE
6829    def test_output_list(self):
6830        from audiotools import output_text, output_list
6831
6832        # ensure invalid colors and styles raise an exception
6833        self.assertRaises(ValueError,
6834                          output_list,
6835                          output_texts=[u"Foo", u"Bar"],
6836                          fg_color="unknown")
6837
6838        self.assertRaises(ValueError,
6839                          output_list,
6840                          output_texts=[u"Foo", u"Bar"],
6841                          bg_color="unknown")
6842
6843        self.assertRaises(ValueError,
6844                          output_list,
6845                          output_texts=[u"Foo", u"Bar"],
6846                          style="unknown")
6847
6848        # ensure setting format returns new output_list with that format
6849        t1 = output_list(output_texts=[u"Foo", output_text(u"Bar")])
6850        self.assertEqual(u"%s" % (t1,), u"FooBar")
6851        self.assertEqual(t1.format(False), u"FooBar")
6852        self.assertIn(u"FooBar", t1.format(True))
6853        self.assertEqual(t1.fg_color(), None)
6854        self.assertEqual(t1.fg_color(), None)
6855        self.assertEqual(t1.style(), None)
6856
6857        t2 = t1.set_format(fg_color="black",
6858                           bg_color="blue",
6859                           style="underline")
6860        self.assertEqual(u"%s" % (t2,), u"FooBar")
6861        self.assertEqual(t2.format(False), u"FooBar")
6862        self.assertIn(u"FooBar", t2.format(True))
6863        self.assertEqual(t2.fg_color(), "black")
6864        self.assertEqual(t2.bg_color(), "blue")
6865        self.assertEqual(t2.style(), "underline")
6866
6867        t3 = t2.set_format(fg_color=None,
6868                           bg_color=None,
6869                           style=None)
6870        self.assertEqual(u"%s" % (t3,), u"FooBar")
6871        self.assertEqual(t3.format(False), u"FooBar")
6872        self.assertIn(u"FooBar", t3.format(True))
6873        self.assertEqual(t3.fg_color(), None)
6874        self.assertEqual(t3.fg_color(), None)
6875        self.assertEqual(t3.style(), None)
6876
6877        # ensure negative head, tail and split values raise ValueError
6878        self.assertRaises(ValueError,
6879                          t1.head,
6880                          -1)
6881
6882        self.assertRaises(ValueError,
6883                          t1.tail,
6884                          -1)
6885
6886        self.assertRaises(ValueError,
6887                          t1.split,
6888                          -1)
6889
6890        for strings in [[u"a"],
6891                        [output_text(u"a",
6892                                     fg_color="white",
6893                                     bg_color="blue",
6894                                     style="underline")],
6895                        [u"Foo"],
6896                        [u"a" * 100],
6897                        [output_text(u"Foo")],
6898                        [u"Foo", output_text(u"Bar")],
6899                        [output_text(u'\u1107' * 10)],
6900                        [u"a", output_text(u'\u1107' * 20,
6901                                           fg_color="white",
6902                                           bg_color="blue",
6903                                           style="underline"), u"b"]]:
6904            for (fg_color,
6905                 bg_color,
6906                 style) in Possibilities([None, "black"],
6907                                         [None, "blue"],
6908                                         [None, "underline"]):
6909
6910                t = output_list(output_texts=strings,
6911                                fg_color=fg_color,
6912                                bg_color=bg_color,
6913                                style=style)
6914
6915                # ensure calling head returns a new output_list
6916                # with no more than "display_characters" and
6917                # with the same formatting as the original
6918                for i in range(len(t) + 5):
6919                    t2 = t.head(i)
6920                    self.assertLessEqual(len(t2), i)
6921                    self.assertEqual(t2.fg_color(), fg_color)
6922                    self.assertEqual(t2.bg_color(), bg_color)
6923                    self.assertEqual(t2.style(), style)
6924
6925                # ensure calling tail returns a new output_list
6926                # with no more than "display_characters" and
6927                # with the same formatting as the original
6928                for i in range(len(t) + 5):
6929                    t2 = t.tail(i)
6930                    self.assertLessEqual(len(t2), i)
6931                    self.assertEqual(t2.fg_color(), fg_color)
6932                    self.assertEqual(t2.bg_color(), bg_color)
6933                    self.assertEqual(t2.style(), style)
6934
6935                # ensure calling split returns a tuple of new output_list
6936                # with no more than "display_characters" in the head
6937                # and with the same formatting as the original
6938                for i in range(len(t) + 5):
6939                    (t2, t3) = t.split(i)
6940                    self.assertLessEqual(len(t2), i)
6941                    self.assertEqual(len(t2) + len(t3), len(t))
6942                    self.assertEqual(t2.fg_color(), fg_color)
6943                    self.assertEqual(t2.bg_color(), bg_color)
6944                    self.assertEqual(t2.style(), style)
6945                    self.assertEqual(t3.fg_color(), fg_color)
6946                    self.assertEqual(t3.bg_color(), bg_color)
6947                    self.assertEqual(t3.style(), style)
6948
6949    @LIB_CORE
6950    def test_output_table(self):
6951        from audiotools import output_table, output_text, output_list
6952
6953        # check a table with mismatched columns
6954        err = output_table()
6955        row1 = err.row()
6956        row1.add_column(u"Foo")
6957        self.assertEqual(len(list(err.format(False))), 1)
6958        self.assertEqual(len(list(err.format(True))), 1)
6959        row2 = err.row()
6960        row2.add_column(u"Foo")
6961        row2.add_column(u"Bar")
6962        self.assertRaises(ValueError,
6963                          list,
6964                          err.format(False))
6965        self.assertRaises(ValueError,
6966                          list,
6967                          err.format(True))
6968
6969        # check a table with nothing but dividers
6970        dividers = output_table()
6971        dividers.divider_row([u"-", u"-"])
6972        dividers.divider_row([u"*", u"*"])
6973        dividers.divider_row([u"x", u"x"])
6974        self.assertEqual(len(list(dividers.format(False))), 3)
6975        self.assertEqual(len(list(dividers.format(True))), 3)
6976        dividers.divider_row([u"_"])
6977        self.assertRaises(ValueError,
6978                          list,
6979                          dividers.format(False))
6980        self.assertRaises(ValueError,
6981                          list,
6982                          dividers.format(True))
6983
6984        # check a table with nothing but blank rows
6985        blanks = output_table()
6986        blanks.blank_row()
6987        blanks.blank_row()
6988        blanks.blank_row()
6989        blanks.blank_row()
6990        self.assertEqual(len(list(blanks.format(False))), 4)
6991        self.assertEqual(len(list(blanks.format(True))), 4)
6992
6993        # check a typical table with black rows and dividers
6994        table = output_table()
6995
6996        row1 = table.blank_row()
6997        row2 = table.divider_row([u"-", u"-", u"-"])
6998        row3 = table.row()
6999        row3.add_column(u"a", "left")
7000        row3.add_column(u"b", "center")
7001        row3.add_column(u"c", "right")
7002        self.assertRaises(ValueError,
7003                          row3.add_column,
7004                          u"d", "unknown")
7005
7006        row4 = table.row()
7007        row4.add_column(output_text(u"Foo",
7008                                    fg_color="black",
7009                                    bg_color="white"))
7010        row4.add_column(output_list([u"Bar", u'\u1107' * 5],
7011                                    fg_color="blue",
7012                                    bg_color="red"), "center")
7013        row4.add_column(output_text(u"Blah"), "right")
7014
7015        row5 = table.divider_row([u"-", u"-", u"-"])
7016        row6 = table.blank_row()
7017
7018        self.assertEqual(len(list(table.format(False))), 6)
7019        self.assertEqual(len(list(table.format(True))), 6)
7020
7021
7022class Test_Most_Numerous(unittest.TestCase):
7023    @LIB_CORE
7024    def test_most_numerous(self):
7025        from audiotools import most_numerous
7026
7027        self.assertEqual(most_numerous([1, 1, 2, 3]), 1)
7028        self.assertEqual(most_numerous([1, 2, 2, 3]), 2)
7029        self.assertEqual(most_numerous([1, 2, 3, 3]), 3)
7030        self.assertEqual(most_numerous([], empty_list=-1), -1)
7031        self.assertEqual(most_numerous([1, 2, 3],
7032                                       empty_list=-1,
7033                                       all_differ=-2), -2)
7034
7035
7036class Test_Test_Iter(unittest.TestCase):
7037    @LIB_CORE
7038    def test_iter(self):
7039        from audiotools import iter_first, iter_last
7040
7041        self.assertEqual(list(iter_first([])),
7042                         [])
7043        self.assertEqual(list(iter_first([1])),
7044                         [(True, 1)])
7045        self.assertEqual(list(iter_first([1, 2])),
7046                         [(True, 1), (False, 2)])
7047        self.assertEqual(list(iter_first([1, 2, 3])),
7048                         [(True, 1), (False, 2), (False, 3)])
7049
7050        self.assertEqual(list(iter_last([])),
7051                         [])
7052        self.assertEqual(list(iter_last([1])),
7053                         [(True, 1)])
7054        self.assertEqual(list(iter_last([1, 2])),
7055                         [(False, 1), (True, 2)])
7056        self.assertEqual(list(iter_last([1, 2, 3])),
7057                         [(False, 1), (False, 2), (True, 3)])
7058