1#!/usr/bin/python
2
3# Audio Tools, a module and set of tools for manipulating audio data
4# Copyright (C) 2007-2014  Brian Langenberger
5
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19
20import audiotools
21from io import BytesIO
22import math
23import os
24from hashlib import md5
25from audiotools.decoders import (Sine_Mono,
26                                 Sine_Stereo,
27                                 Sine_Simple,
28                                 SameSample)
29
30# these are test stream generators using stream formulas
31# taken from the FLAC reference encoder
32# but converted to PCMReaders for more general use
33
34
35class FrameListReader:
36    def __init__(self, samples, sample_rate, channels, bits_per_sample,
37                 channel_mask=None):
38        import audiotools.pcm
39
40        self.framelist = audiotools.pcm.from_list(samples,
41                                                  channels,
42                                                  bits_per_sample,
43                                                  True)
44        self.samples = samples[:]
45        self.sample_rate = sample_rate
46        self.channels = channels
47        if channel_mask is None:
48            self.channel_mask = \
49                int(audiotools.ChannelMask.from_channels(channels))
50        else:
51            self.channel_mask = channel_mask
52        self.bits_per_sample = bits_per_sample
53        self.read = self.read_opened
54
55    def __enter__(self):
56        return self
57
58    def __exit__(self, exc_type, exc_value, traceback):
59        self.close()
60
61    def read_opened(self, pcm_frames):
62        (framelist, self.framelist) = self.framelist.split(pcm_frames)
63        return framelist
64
65    def reset(self):
66        self.framelist = audiotools.pcm.from_list(self.samples,
67                                                  self.channels,
68                                                  self.bits_per_sample,
69                                                  True)
70        self.read = self.read_opened
71
72    def read_closed(self, pcm_frames):
73        raise ValueError()
74
75    def close(self):
76        self.read = self.read_closed
77
78
79class MD5Reader(audiotools.PCMReader):
80    def __init__(self, pcmreader):
81        audiotools.PCMReader.__init__(
82            self,
83            sample_rate=pcmreader.sample_rate,
84            channels=pcmreader.channels,
85            channel_mask=pcmreader.channel_mask,
86            bits_per_sample=pcmreader.bits_per_sample)
87        self.pcmreader = pcmreader
88        self.md5 = md5()
89
90    def reset(self):
91        if hasattr(self.pcmreader, "reset"):
92            self.pcmreader.reset()
93        self.md5 = md5()
94
95    def __repr__(self):
96        return "MD5Reader(%s,%s,%s)" % (self.sample_rate,
97                                        self.channels,
98                                        self.bits_per_sample)
99
100    def read(self, pcm_frames):
101        framelist = self.pcmreader.read(pcm_frames)
102        self.md5.update(framelist.to_bytes(False, True))
103        return framelist
104
105    def close(self):
106        self.pcmreader.close()
107
108    def digest(self):
109        return self.md5.digest()
110
111    def hexdigest(self):
112        return self.md5.hexdigest()
113
114
115class ShortStream(MD5Reader):
116    def __init__(self, samples, sample_rate, channels, bits_per_sample):
117        MD5Reader.__init__(
118            self,
119            FrameListReader(samples,
120                            sample_rate,
121                            channels,
122                            bits_per_sample))
123
124
125class Generate01(ShortStream):
126    def __init__(self, sample_rate):
127        ShortStream.__init__(self, [-32768],
128                             sample_rate, 1, 16)
129
130
131class Generate02(ShortStream):
132    def __init__(self, sample_rate):
133        ShortStream.__init__(self, [-32768, 32767],
134                             sample_rate, 2, 16)
135
136
137class Generate03(ShortStream):
138    def __init__(self, sample_rate):
139        ShortStream.__init__(self, [-25, 0, 25, 50, 100],
140                             sample_rate, 1, 16)
141
142
143class Generate04(ShortStream):
144    def __init__(self, sample_rate):
145        ShortStream.__init__(self, [-25, 500, 0, 400, 25, 300, 50, 200,
146                                    100, 100],
147                             sample_rate, 2, 16)
148
149
150class Silence8_Mono(SameSample):
151    def __init__(self, pcm_frames, sample_rate):
152        SameSample.__init__(self,
153                            sample=0,
154                            total_pcm_frames=pcm_frames,
155                            sample_rate=sample_rate,
156                            channels=1,
157                            channel_mask=0x4,
158                            bits_per_sample=8)
159        self.pcm_frames = pcm_frames
160        self.md5 = md5()
161
162    def read(self, pcm_frames):
163        framelist = SameSample.read(self, pcm_frames)
164        self.md5.update(framelist.to_bytes(False, True))
165        return framelist
166
167    def digest(self):
168        return self.md5.digest()
169
170    def hexdigest(self):
171        return self.md5.hexdigest()
172
173    def reset(self):
174        SameSample.reset(self)
175        self.md5 = md5()
176
177    def __repr__(self):
178        return "Silence8_Mono(%s, %s)" % \
179            (repr(self.pcm_frames),
180             repr(self.sample_rate))
181
182
183class Silence16_Mono(Silence8_Mono):
184    def __init__(self, pcm_frames, sample_rate):
185        SameSample.__init__(self,
186                            sample=0,
187                            total_pcm_frames=pcm_frames,
188                            sample_rate=sample_rate,
189                            channels=1,
190                            channel_mask=0x4,
191                            bits_per_sample=16)
192        self.pcm_frames = pcm_frames
193        self.md5 = md5()
194
195    def __repr__(self):
196        return "Silence16_Mono(%s, %s)" % \
197            (repr(self.pcm_frames),
198             repr(self.sample_rate))
199
200
201class Silence24_Mono(Silence8_Mono):
202    def __init__(self, pcm_frames, sample_rate):
203        SameSample.__init__(self,
204                            sample=0,
205                            total_pcm_frames=pcm_frames,
206                            sample_rate=sample_rate,
207                            channels=1,
208                            channel_mask=0x4,
209                            bits_per_sample=24)
210        self.pcm_frames = pcm_frames
211        self.md5 = md5()
212
213    def __repr__(self):
214        return "Silence24_Mono(%s, %s)" % \
215            (repr(self.pcm_frames),
216             repr(self.sample_rate))
217
218
219class Silence8_Stereo(Silence8_Mono):
220    def __init__(self, pcm_frames, sample_rate):
221        SameSample.__init__(self,
222                            sample=0,
223                            total_pcm_frames=pcm_frames,
224                            sample_rate=sample_rate,
225                            channels=2,
226                            channel_mask=0x3,
227                            bits_per_sample=8)
228        self.pcm_frames = pcm_frames
229        self.md5 = md5()
230
231    def __repr__(self):
232        return "Silence8_Stereo(%s, %s)" % \
233            (repr(self.pcm_frames),
234             repr(self.sample_rate))
235
236
237class Silence16_Stereo(Silence8_Mono):
238    def __init__(self, pcm_frames, sample_rate):
239        SameSample.__init__(self,
240                            sample=0,
241                            total_pcm_frames=pcm_frames,
242                            sample_rate=sample_rate,
243                            channels=2,
244                            channel_mask=0x3,
245                            bits_per_sample=16)
246        self.pcm_frames = pcm_frames
247        self.md5 = md5()
248
249    def __repr__(self):
250        return "Silence16_Stereo(%s, %s)" % \
251            (repr(self.pcm_frames),
252             repr(self.sample_rate))
253
254
255class Silence24_Stereo(Silence8_Mono):
256    def __init__(self, pcm_frames, sample_rate):
257        SameSample.__init__(self,
258                            sample=0,
259                            total_pcm_frames=pcm_frames,
260                            sample_rate=sample_rate,
261                            channels=2,
262                            channel_mask=0x3,
263                            bits_per_sample=24)
264        self.pcm_frames = pcm_frames
265        self.md5 = md5()
266
267    def __repr__(self):
268        return "Silence24_Stereo(%s, %s)" % \
269            (repr(self.pcm_frames),
270             repr(self.sample_rate))
271
272
273class Sine8_Mono(Sine_Mono):
274    def __init__(self,
275                 pcm_frames,
276                 sample_rate,
277                 f1, a1, f2, a2):
278        Sine_Mono.__init__(self, 8, pcm_frames, sample_rate,
279                           f1, a1, f2, a2)
280        self.pcm_frames = pcm_frames
281        self.f1 = f1
282        self.a1 = a1
283        self.f2 = f2
284        self.a2 = a2
285        self.md5 = md5()
286
287    def read(self, pcm_frames):
288        framelist = Sine_Mono.read(self, pcm_frames)
289        self.md5.update(framelist.to_bytes(False, True))
290        return framelist
291
292    def digest(self):
293        return self.md5.digest()
294
295    def hexdigest(self):
296        return self.md5.hexdigest()
297
298    def reset(self):
299        Sine_Mono.reset(self)
300        self.md5 = md5()
301
302    def __repr__(self):
303        return "Sine8_Mono(%s, %s, %s, %s, %s, %s)" % \
304            (repr(self.pcm_frames),
305             repr(self.sample_rate),
306             repr(self.f1),
307             repr(self.a1),
308             repr(self.f2),
309             repr(self.a2))
310
311
312class Sine8_Stereo(Sine_Stereo):
313    def __init__(self, pcm_frames, sample_rate,
314                 f1, a1, f2, a2, fmult):
315        Sine_Stereo.__init__(self, 8, pcm_frames,
316                             sample_rate, f1, a1, f2, a2, fmult)
317        self.pcm_frames = pcm_frames
318        self.f1 = f1
319        self.a1 = a1
320        self.f2 = f2
321        self.a2 = a2
322        self.fmult = fmult
323        self.md5 = md5()
324
325    def read(self, pcm_frames):
326        framelist = Sine_Stereo.read(self, pcm_frames)
327        self.md5.update(framelist.to_bytes(False, True))
328        return framelist
329
330    def digest(self):
331        return self.md5.digest()
332
333    def hexdigest(self):
334        return self.md5.hexdigest()
335
336    def reset(self):
337        Sine_Stereo.reset(self)
338        self.md5 = md5()
339
340    def __repr__(self):
341        return "Sine8_Stereo(%s, %s, %s, %s, %s, %s, %s)" % \
342            (repr(self.pcm_frames),
343             repr(self.sample_rate),
344             repr(self.f1),
345             repr(self.a1),
346             repr(self.f2),
347             repr(self.a2),
348             repr(self.fmult))
349
350
351class Sine16_Mono(Sine8_Mono):
352    def __init__(self, pcm_frames, sample_rate,
353                 f1, a1, f2, a2):
354        Sine_Mono.__init__(self, 16, pcm_frames, sample_rate,
355                           f1, a1, f2, a2)
356        self.pcm_frames = pcm_frames
357        self.f1 = f1
358        self.a1 = a1
359        self.f2 = f2
360        self.a2 = a2
361        self.md5 = md5()
362
363    def __repr__(self):
364        return "Sine16_Mono(%s, %s, %s, %s, %s, %s)" % \
365            (repr(self.pcm_frames),
366             repr(self.sample_rate),
367             repr(self.f1),
368             repr(self.a1),
369             repr(self.f2),
370             repr(self.a2))
371
372
373class Sine16_Stereo(Sine8_Stereo):
374    def __init__(self, pcm_frames, sample_rate,
375                 f1, a1, f2, a2, fmult):
376        Sine_Stereo.__init__(self, 16, pcm_frames, sample_rate,
377                             f1, a1, f2, a2, fmult)
378        self.pcm_frames = pcm_frames
379        self.f1 = f1
380        self.a1 = a1
381        self.f2 = f2
382        self.a2 = a2
383        self.fmult = fmult
384        self.md5 = md5()
385
386    def __repr__(self):
387        return "Sine16_Stereo(%s, %s, %s, %s, %s, %s, %s)" % \
388            (repr(self.pcm_frames),
389             repr(self.sample_rate),
390             repr(self.f1),
391             repr(self.a1),
392             repr(self.f2),
393             repr(self.a2),
394             repr(self.fmult))
395
396
397class Sine24_Mono(Sine8_Mono):
398    def __init__(self, pcm_frames, sample_rate,
399                 f1, a1, f2, a2):
400        Sine_Mono.__init__(self, 24, pcm_frames, sample_rate,
401                           f1, a1, f2, a2)
402        self.pcm_frames = pcm_frames
403        self.f1 = f1
404        self.a1 = a1
405        self.f2 = f2
406        self.a2 = a2
407        self.md5 = md5()
408
409    def __repr__(self):
410        return "Sine24_Mono(%s, %s, %s, %s, %s, %s)" % \
411            (repr(self.pcm_frames),
412             repr(self.sample_rate),
413             repr(self.f1),
414             repr(self.a1),
415             repr(self.f2),
416             repr(self.a2))
417
418
419class Sine24_Stereo(Sine8_Stereo):
420    def __init__(self, pcm_frames, sample_rate,
421                 f1, a1, f2, a2, fmult):
422        Sine_Stereo.__init__(self, 24, pcm_frames, sample_rate,
423                             f1, a1, f2, a2, fmult)
424        self.pcm_frames = pcm_frames
425        self.f1 = f1
426        self.a1 = a1
427        self.f2 = f2
428        self.a2 = a2
429        self.fmult = fmult
430        self.md5 = md5()
431
432    def __repr__(self):
433        return "Sine24_Stereo(%s, %s, %s, %s, %s, %s, %s)" % \
434            (repr(self.pcm_frames),
435             repr(self.sample_rate),
436             repr(self.f1),
437             repr(self.a1),
438             repr(self.f2),
439             repr(self.a2),
440             repr(self.fmult))
441
442
443class Simple_Sine(audiotools.PCMReader):
444    def __init__(self, pcm_frames, sample_rate, channel_mask,
445                 bits_per_sample, *values):
446        audiotools.PCMReader.__init__(
447            self,
448            sample_rate=sample_rate,
449            channels=len(values),
450            channel_mask=channel_mask,
451            bits_per_sample=bits_per_sample)
452
453        self.pcm_frames = pcm_frames
454        self.total_frames = pcm_frames
455        self.i = 0
456        self.channel_max_values = [v[0] for v in values]
457        self.channel_counts = [v[1] for v in values]
458
459        self.streams = [Sine_Simple(pcm_frames,
460                                    bits_per_sample,
461                                    sample_rate,
462                                    max_value,
463                                    count)
464                        for (max_value, count) in zip(self.channel_max_values,
465                                                      self.channel_counts)]
466        self.md5 = md5()
467
468    def read(self, pcm_frames):
469        framelist = audiotools.pcm.from_channels(
470            [stream.read(pcm_frames) for stream in self.streams])
471        self.md5.update(framelist.to_bytes(False, True))
472        return framelist
473
474    def reset(self):
475        for stream in self.streams:
476            stream.reset()
477        self.md5 = md5()
478
479    def digest(self):
480        return self.md5.digest()
481
482    def hexdigest(self):
483        return self.md5.hexdigest()
484
485    def close(self):
486        for stream in self.streams:
487            stream.close()
488
489    def __repr__(self):
490        return "Simple_Sine(%s, %s, %s, %s, *%s)" % \
491            (self.pcm_frames,
492             self.sample_rate,
493             self.channel_mask,
494             self.bits_per_sample,
495             repr([(m, c) for m, c in zip(self.channel_max_values,
496                                          self.channel_counts)]))
497
498
499class WastedBPS16:
500    def __init__(self, pcm_frames):
501        self.total_frames = pcm_frames
502        self.pcm_frames = pcm_frames
503
504        self.i = 0
505        self.sample_rate = 44100
506        self.channels = 2
507        self.channel_mask = 0x3
508        self.bits_per_sample = 16
509        self.signed = True
510        self.sample_frame = audiotools.pcm.empty_framelist(2, 16)
511        self.md5 = md5()
512        self.read = self.read_opened
513
514    def __enter__(self):
515        return self
516
517    def __exit__(self, exc_type, exc_value, traceback):
518        self.close()
519
520    def read_opened(self, pcm_frames):
521        wave = []
522        for i in range(min(pcm_frames, self.pcm_frames)):
523            wave.append((self.i % 2000) << 2)
524            wave.append((self.i % 1000) << 3)
525            self.i += 1
526
527        framelist = audiotools.pcm.from_list(wave,
528                                             self.channels,
529                                             self.bits_per_sample,
530                                             self.signed)
531        self.pcm_frames -= framelist.frames
532        self.md5.update(framelist.to_bytes(False, True))
533        return framelist
534
535    def read_closed(self, pcm_frames):
536        raise ValueError()
537
538    def reset(self):
539        self.read = self.read_opened
540        self.i = 0
541        self.pcm_frames = self.total_frames
542        self.md5 = md5()
543
544    def digest(self):
545        return self.md5.digest()
546
547    def hexdigest(self):
548        return self.md5.hexdigest()
549
550    def close(self):
551        self.read = self.read_closed
552
553    def __repr__(self):
554        return "WastedBPS(%s)" % (repr(self.pcm_frames))
555
556
557class Raw(audiotools.PCMReader):
558    def __init__(self, pcm_frames, channels, bits_per_sample):
559        audiotools.PCMReader.__init__(
560            self,
561            sample_rate=44100,
562            channels=channels,
563            channel_mask=0,
564            bits_per_sample=bits_per_sample)
565
566        self.file = BytesIO()
567
568        full_scale = (1 << (bits_per_sample - 1)) - 1
569        f1 = 441.0
570        a1 = 0.61
571        f2 = 661.5
572        a2 = 0.37
573        delta1 = 2.0 * math.pi / (self.sample_rate / f1)
574        delta2 = 2.0 * math.pi / (self.sample_rate / f2)
575        theta1 = theta2 = 0.0
576        channel = []
577        for i in range(pcm_frames):
578            channel.append(int(((a1 * math.sin(theta1) + a2 *
579                                 math.sin(theta2)) * full_scale) + 0.5) +
580                           ((ord(os.urandom(1)) >> 4) - 8))
581            theta1 += delta1
582            theta2 += delta2
583
584        self.file.write(
585            audiotools.FrameList.from_channels(
586                [channel] * channels).string(bits_per_sample))
587
588        self.file.seek(0, 0)
589
590PATTERN01 = [1, -1]
591PATTERN02 = [1, 1, -1]
592PATTERN03 = [1, -1, -1]
593PATTERN04 = [1, -1, 1, -1]
594PATTERN05 = [1, -1, -1, 1]
595PATTERN06 = [1, -1, 1, 1, -1]
596PATTERN07 = [1, -1, -1, 1, -1]
597
598
599def fsd8(pattern, reps):
600    # FIXME - not quite accurate
601    values = {1: 127, -1: -128}
602    return FrameListReader([values[p] for p in pattern] * reps,
603                           44100, 1, 8)
604
605
606def fsd16(pattern, reps):
607    values = {1: 32767, -1: -32768}
608    return FrameListReader([values[p] for p in pattern] * reps,
609                           44100, 1, 16)
610
611
612def fsd24(pattern, reps):
613    values = {1: 8388607, -1: -8388608}
614    return FrameListReader([values[p] for p in pattern] * reps,
615                           44100, 1, 24)
616