1#!/usr/bin/python
2
3# Audio Tools, a module and set of tools for manipulating audio data
4# Copyright (C) 2007-2014  Brian Langenberger
5
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19
20
21from audiotools import WaveContainer, InvalidFile
22from audiotools.ape import ApeTaggedAudio, ApeGainedAudio
23
24
25class InvalidWavPack(InvalidFile):
26    pass
27
28
29def __riff_chunk_ids__(data_size, data):
30    (riff, size, wave) = data.parse("4b 32u 4b")
31    if riff != b"RIFF":
32        return
33    elif wave != b"WAVE":
34        return
35    else:
36        data_size -= 12
37
38    while data_size > 0:
39        (chunk_id, chunk_size) = data.parse("4b 32u")
40        data_size -= 8
41        if (chunk_size % 2) == 1:
42            chunk_size += 1
43        yield chunk_id
44        if chunk_id != b"data":
45            data.skip_bytes(chunk_size)
46            data_size -= chunk_size
47
48
49class WavPackAudio(ApeTaggedAudio, ApeGainedAudio, WaveContainer):
50    """a WavPack audio file"""
51
52    from audiotools.text import (COMP_WAVPACK_VERYFAST,
53                                 COMP_WAVPACK_VERYHIGH)
54
55    SUFFIX = "wv"
56    NAME = SUFFIX
57    DESCRIPTION = u"WavPack"
58    DEFAULT_COMPRESSION = "standard"
59    COMPRESSION_MODES = ("veryfast", "fast", "standard", "high", "veryhigh")
60    COMPRESSION_DESCRIPTIONS = {"veryfast": COMP_WAVPACK_VERYFAST,
61                                "veryhigh": COMP_WAVPACK_VERYHIGH}
62
63    BITS_PER_SAMPLE = (8, 16, 24, 32)
64    SAMPLING_RATE = (6000,  8000,  9600,   11025,
65                     12000, 16000, 22050,  24000,
66                     32000, 44100, 48000,  64000,
67                     88200, 96000, 192000, 0)
68
69    __options__ = {"veryfast": {"block_size": 44100,
70                                "joint_stereo": True,
71                                "false_stereo": True,
72                                "wasted_bits": True,
73                                "correlation_passes": 1},
74                   "fast": {"block_size": 44100,
75                            "joint_stereo": True,
76                            "false_stereo": True,
77                            "wasted_bits": True,
78                            "correlation_passes": 2},
79                   "standard": {"block_size": 44100,
80                                "joint_stereo": True,
81                                "false_stereo": True,
82                                "wasted_bits": True,
83                                "correlation_passes": 5},
84                   "high": {"block_size": 44100,
85                            "joint_stereo": True,
86                            "false_stereo": True,
87                            "wasted_bits": True,
88                            "correlation_passes": 10},
89                   "veryhigh": {"block_size": 44100,
90                                "joint_stereo": True,
91                                "false_stereo": True,
92                                "wasted_bits": True,
93                                "correlation_passes": 16}}
94
95    def __init__(self, filename):
96        """filename is a plain string"""
97
98        WaveContainer.__init__(self, filename)
99        self.__samplerate__ = 0
100        self.__channels__ = 0
101        self.__bitspersample__ = 0
102        self.__total_frames__ = 0
103
104        try:
105            self.__read_info__()
106        except IOError as msg:
107            raise InvalidWavPack(str(msg))
108
109    def lossless(self):
110        """returns True"""
111
112        return True
113
114    def channel_mask(self):
115        """returns a ChannelMask object of this track's channel layout"""
116
117        return self.__channel_mask__
118
119    @classmethod
120    def supports_metadata(cls):
121        """returns True if this audio type supports MetaData"""
122
123        return True
124
125    def get_metadata(self):
126        """returns a MetaData object, or None
127
128        raises IOError if unable to read the file"""
129
130        metadata = ApeTaggedAudio.get_metadata(self)
131        if metadata is not None:
132            metadata.frame_count = self.total_frames()
133        return metadata
134
135    def has_foreign_wave_chunks(self):
136        """returns True if the audio file contains non-audio RIFF chunks
137
138        during transcoding, if the source audio file has foreign RIFF chunks
139        and the target audio format supports foreign RIFF chunks,
140        conversion should be routed through .wav conversion
141        to avoid losing those chunks"""
142
143        for (sub_header, nondecoder, data_size, data) in self.sub_blocks():
144            if (sub_header == 1) and nondecoder:
145                if (set(__riff_chunk_ids__(data_size,
146                                           data)) != {b"fmt ", b"data"}):
147                    return True
148            elif (sub_header == 2) and nondecoder:
149                return True
150        else:
151            return False
152
153    def wave_header_footer(self):
154        """returns (header, footer) tuple of strings
155        containing all data before and after the PCM stream
156
157        may raise ValueError if there's a problem with
158        the header or footer data
159        may raise IOError if there's a problem reading
160        header or footer data from the file
161        """
162
163        head = None
164        tail = None
165
166        for (sub_block_id, nondecoder, data_size, data) in self.sub_blocks():
167            if (sub_block_id == 1) and nondecoder:
168                head = data.read_bytes(data_size)
169            elif (sub_block_id == 2) and nondecoder:
170                tail = data.read_bytes(data_size)
171
172        if head is not None:
173            return (head, tail if tail is not None else b"")
174        else:
175            raise ValueError("no wave header found")
176
177    @classmethod
178    def from_wave(cls, filename, header, pcmreader, footer, compression=None,
179                  encoding_function=None):
180        """encodes a new file from wave data
181
182        takes a filename string, header string,
183        PCMReader object, footer string
184        and optional compression level string
185        encodes a new audio file from pcmreader's data
186        at the given filename with the specified compression level
187        and returns a new WaveAudio object
188
189        header + pcm data + footer should always result
190        in the original wave file being restored
191        without need for any padding bytes
192
193        may raise EncodingError if some problem occurs when
194        encoding the input file"""
195
196        from audiotools.encoders import encode_wavpack
197        from audiotools import BufferedPCMReader
198        from audiotools import CounterPCMReader
199        from audiotools.wav import (validate_header, validate_footer)
200        from audiotools import EncodingError
201        from audiotools import __default_quality__
202
203        if (((compression is None) or
204             (compression not in cls.COMPRESSION_MODES))):
205            compression = __default_quality__(cls.NAME)
206
207        # ensure header is valid
208        try:
209            (total_size, data_size) = validate_header(header)
210        except ValueError as err:
211            raise EncodingError(str(err))
212
213        counter = CounterPCMReader(pcmreader)
214
215        try:
216            (encode_wavpack if encoding_function is None
217             else encoding_function)(filename,
218                                     BufferedPCMReader(counter),
219                                     wave_header=header,
220                                     wave_footer=footer,
221                                     **cls.__options__[compression])
222
223            counter.close()
224
225            data_bytes_written = counter.bytes_written()
226
227            # ensure output data size matches the "data" chunk's size
228            if data_size != data_bytes_written:
229                from audiotools.text import ERR_WAV_TRUNCATED_DATA_CHUNK
230                raise EncodingError(ERR_WAV_TRUNCATED_DATA_CHUNK)
231
232            # ensure footer validates correctly
233            try:
234                validate_footer(footer, data_bytes_written)
235            except ValueError as err:
236                raise EncodingError(str(err))
237
238            # ensure total size is correct
239            if (len(header) + data_size + len(footer)) != total_size:
240                from audiotools.text import ERR_WAV_INVALID_SIZE
241                raise EncodingError(ERR_WAV_INVALID_SIZE)
242
243            return cls(filename)
244        except (ValueError, IOError) as msg:
245            counter.close()
246            cls.__unlink__(filename)
247            raise EncodingError(str(msg))
248        except Exception as err:
249            counter.close()
250            cls.__unlink__(filename)
251            raise err
252
253    def blocks(self, reader=None):
254        """yields (length, reader) tuples of WavPack frames
255
256        length is the total length of all the substreams
257        reader is a BitstreamReader which can be parsed
258        """
259
260        def blocks_iter(reader):
261            try:
262                while True:
263                    (wvpk, block_size) = reader.parse("4b 32u 192p")
264                    if wvpk == b"wvpk":
265                        yield (block_size - 24,
266                               reader.substream(block_size - 24))
267                    else:
268                        return
269            except IOError:
270                return
271
272        if reader is None:
273            from audiotools.bitstream import BitstreamReader
274
275            with BitstreamReader(open(self.filename, "rb"), True) as reader:
276                for block in blocks_iter(reader):
277                    yield block
278        else:
279            for block in blocks_iter(reader):
280                yield block
281
282    def sub_blocks(self, reader=None):
283        """yields (function, nondecoder, data_size, data) tuples
284
285        function is an integer
286        nondecoder is a boolean indicating non-decoder data
287        data is a BitstreamReader which can be parsed
288        """
289
290        for (block_size, block_data) in self.blocks(reader):
291            while block_size > 0:
292                (metadata_function,
293                 nondecoder_data,
294                 actual_size_1_less,
295                 large_block) = block_data.parse("5u 1u 1u 1u")
296
297                if large_block:
298                    sub_block_size = block_data.read(24)
299                    block_size -= 4
300                else:
301                    sub_block_size = block_data.read(8)
302                    block_size -= 2
303
304                if actual_size_1_less:
305                    yield (metadata_function,
306                           nondecoder_data,
307                           sub_block_size * 2 - 1,
308                           block_data.substream(sub_block_size * 2 - 1))
309                    block_data.skip(8)
310                else:
311                    yield (metadata_function,
312                           nondecoder_data,
313                           sub_block_size * 2,
314                           block_data.substream(sub_block_size * 2))
315
316                block_size -= sub_block_size * 2
317
318    def __read_info__(self):
319        from audiotools.bitstream import BitstreamReader
320        from audiotools import ChannelMask
321
322        with BitstreamReader(open(self.filename, "rb"), True) as reader:
323            pos = reader.getpos()
324
325            (block_id,
326             total_samples,
327             bits_per_sample,
328             mono_output,
329             initial_block,
330             final_block,
331             sample_rate) = reader.parse(
332                "4b 64p 32u 64p 2u 1u 8p 1u 1u 5p 5p 4u 37p")
333
334            if block_id != b"wvpk":
335                from audiotools.text import ERR_WAVPACK_INVALID_HEADER
336                raise InvalidWavPack(ERR_WAVPACK_INVALID_HEADER)
337
338            if sample_rate != 0xF:
339                self.__samplerate__ = WavPackAudio.SAMPLING_RATE[sample_rate]
340            else:
341                # if unknown, pull from SAMPLE_RATE sub-block
342                for (block_id,
343                     nondecoder,
344                     data_size,
345                     data) in self.sub_blocks(reader):
346                    if (block_id == 0x7) and nondecoder:
347                        self.__samplerate__ = data.read(data_size * 8)
348                        break
349                else:
350                    # no SAMPLE RATE sub-block found
351                    # so pull info from FMT chunk
352                    reader.setpos(pos)
353                    (self.__samplerate__,) = self.fmt_chunk(reader).parse(
354                        "32p 32u")
355
356            self.__bitspersample__ = [8, 16, 24, 32][bits_per_sample]
357            self.__total_frames__ = total_samples
358
359            if initial_block and final_block:
360                if mono_output:
361                    self.__channels__ = 1
362                    self.__channel_mask__ = ChannelMask(0x4)
363                else:
364                    self.__channels__ = 2
365                    self.__channel_mask__ = ChannelMask(0x3)
366            else:
367                # if not mono or stereo, pull from CHANNEL INFO sub-block
368                reader.setpos(pos)
369                for (block_id,
370                     nondecoder,
371                     data_size,
372                     data) in self.sub_blocks(reader):
373                    if (block_id == 0xD) and not nondecoder:
374                        self.__channels__ = data.read(8)
375                        self.__channel_mask__ = ChannelMask(
376                            data.read((data_size - 1) * 8))
377                        break
378                else:
379                    # no CHANNEL INFO sub-block found
380                    # so pull info from FMT chunk
381                    reader.setpos(pos)
382                    fmt = self.fmt_chunk(reader)
383                    compression_code = fmt.read(16)
384                    self.__channels__ = fmt.read(16)
385                    if compression_code == 1:
386                        # this is theoretically possible
387                        # with very old .wav files,
388                        # but shouldn't happen in practice
389                        self.__channel_mask__ = \
390                            {1: ChannelMask.from_fields(front_center=True),
391                             2: ChannelMask.from_fields(front_left=True,
392                                                        front_right=True),
393                             3: ChannelMask.from_fields(front_left=True,
394                                                        front_right=True,
395                                                        front_center=True),
396                             4: ChannelMask.from_fields(front_left=True,
397                                                        front_right=True,
398                                                        back_left=True,
399                                                        back_right=True),
400                             5: ChannelMask.from_fields(front_left=True,
401                                                        front_right=True,
402                                                        back_left=True,
403                                                        back_right=True,
404                                                        front_center=True),
405                             6: ChannelMask.from_fields(front_left=True,
406                                                        front_right=True,
407                                                        back_left=True,
408                                                        back_right=True,
409                                                        front_center=True,
410                                                        low_frequency=True)
411                             }.get(self.__channels__, ChannelMask(0))
412                    elif compression_code == 0xFFFE:
413                        fmt.skip(128)
414                        mask = fmt.read(32)
415                        self.__channel_mask__ = ChannelMask(mask)
416                    else:
417                        from audiotools.text import ERR_WAVPACK_UNSUPPORTED_FMT
418                        raise InvalidWavPack(ERR_WAVPACK_UNSUPPORTED_FMT)
419
420    def bits_per_sample(self):
421        """returns an integer number of bits-per-sample this track contains"""
422
423        return self.__bitspersample__
424
425    def channels(self):
426        """returns an integer number of channels this track contains"""
427
428        return self.__channels__
429
430    def total_frames(self):
431        """returns the total PCM frames of the track as an integer"""
432
433        return self.__total_frames__
434
435    def sample_rate(self):
436        """returns the rate of the track's audio as an integer number of Hz"""
437
438        return self.__samplerate__
439
440    def seekable(self):
441        """returns True if the file is seekable"""
442
443        return True
444
445    @classmethod
446    def from_pcm(cls, filename, pcmreader,
447                 compression=None,
448                 total_pcm_frames=None,
449                 encoding_function=None):
450        """encodes a new file from PCM data
451
452        takes a filename string, PCMReader object,
453        optional compression level string and
454        optional total_pcm_frames integer
455        encodes a new audio file from pcmreader's data
456        at the given filename with the specified compression level
457        and returns a new WavPackAudio object"""
458
459        from audiotools.encoders import encode_wavpack
460        from audiotools import BufferedPCMReader
461        from audiotools import CounterPCMReader
462        from audiotools import EncodingError
463        from audiotools import __default_quality__
464
465        if (((compression is None) or
466             (compression not in cls.COMPRESSION_MODES))):
467            compression = __default_quality__(cls.NAME)
468
469        counter = CounterPCMReader(pcmreader)
470
471        try:
472            (encode_wavpack if encoding_function is None
473             else encoding_function)(
474                filename,
475                BufferedPCMReader(counter),
476                total_pcm_frames=(total_pcm_frames if
477                                  total_pcm_frames is not None else 0),
478                **cls.__options__[compression])
479            counter.close()
480        except (ValueError, IOError) as msg:
481            counter.close()
482            cls.__unlink__(filename)
483            raise EncodingError(str(msg))
484        except Exception:
485            counter.close()
486            cls.__unlink__(filename)
487            raise
488
489        # ensure actual total PCM frames matches argument, if any
490        if (((total_pcm_frames is not None) and
491             (counter.frames_written != total_pcm_frames))):
492            cls.__unlink__(filename)
493            from audiotools.text import ERR_TOTAL_PCM_FRAMES_MISMATCH
494            raise EncodingError(ERR_TOTAL_PCM_FRAMES_MISMATCH)
495
496        return cls(filename)
497
498    def to_pcm(self):
499        """returns a PCMReader object containing the track's PCM data"""
500
501        from audiotools import decoders
502        from audiotools import PCMReaderError
503
504        try:
505            f = open(self.filename, "rb")
506        except IOError as msg:
507            return PCMReaderError(error_message=str(msg),
508                                  sample_rate=self.__samplerate__,
509                                  channels=self.__channels__,
510                                  channel_mask=int(self.channel_mask()),
511                                  bits_per_sample=self.__bitspersample__)
512
513        try:
514            return decoders.WavPackDecoder(f)
515        except (IOError, ValueError) as msg:
516            f.close()
517            return PCMReaderError(error_message=str(msg),
518                                  sample_rate=self.__samplerate__,
519                                  channels=self.__channels__,
520                                  channel_mask=int(self.channel_mask()),
521                                  bits_per_sample=self.__bitspersample__)
522
523    def fmt_chunk(self, reader=None):
524        """returns the 'fmt' chunk as a BitstreamReader"""
525
526        for (block_id,
527             nondecoder,
528             data_size,
529             data) in self.sub_blocks(reader):
530            if (block_id == 1) and nondecoder:
531                (riff, wave) = data.parse("4b 32p 4b")
532                if (riff != b"RIFF") or (wave != b"WAVE"):
533                    from audiotools.text import ERR_WAVPACK_INVALID_FMT
534                    raise InvalidWavPack(ERR_WAVPACK_INVALID_FMT)
535                else:
536                    while True:
537                        (chunk_id, chunk_size) = data.parse("4b 32u")
538                        if chunk_id == b"fmt ":
539                            return data.substream(chunk_size)
540                        elif chunk_id == b"data":
541                            from audiotools.text import ERR_WAVPACK_INVALID_FMT
542                            raise InvalidWavPack(ERR_WAVPACK_INVALID_FMT)
543                        else:
544                            data.skip_bytes(chunk_size)
545        else:
546            from audiotools.text import ERR_WAVPACK_NO_FMT
547            raise InvalidWavPack(ERR_WAVPACK_NO_FMT)
548
549    @classmethod
550    def supports_cuesheet(cls):
551        return True
552
553    def get_cuesheet(self):
554        """returns the embedded Cuesheet-compatible object, or None
555
556        raises IOError if a problem occurs when reading the file"""
557
558        from audiotools import cue as cue
559        from audiotools import SheetException
560
561        metadata = self.get_metadata()
562
563        if (metadata is not None) and (b'Cuesheet' in metadata.keys()):
564            try:
565                return cue.read_cuesheet_string(
566                    metadata[b'Cuesheet'].__unicode__())
567            except SheetException:
568                # unlike FLAC, just because a cuesheet is embedded
569                # does not mean it is compliant
570                return None
571        else:
572            return None
573
574    def set_cuesheet(self, cuesheet):
575        """imports cuesheet data from a Sheet object
576
577        Raises IOError if an error occurs setting the cuesheet"""
578
579        import os.path
580        from io import BytesIO
581        from audiotools import (MetaData, Filename, FS_ENCODING)
582        from audiotools import cue as cue
583        from audiotools.cue import write_cuesheet
584        from audiotools.ape import ApeTag
585
586        if cuesheet is None:
587            return self.delete_cuesheet()
588
589        metadata = self.get_metadata()
590        if metadata is None:
591            metadata = ApeTag([])
592
593        cuesheet_data = BytesIO()
594        write_cuesheet(cuesheet,
595                       u"%s" % (Filename(self.filename).basename(),),
596                       cuesheet_data)
597
598        metadata[b'Cuesheet'] = ApeTag.ITEM.string(
599            b'Cuesheet',
600            cuesheet_data.getvalue().decode(FS_ENCODING, 'replace'))
601
602        self.update_metadata(metadata)
603
604    def delete_cuesheet(self):
605        """deletes embedded Sheet object, if any
606
607        Raises IOError if a problem occurs when updating the file"""
608
609        metadata = self.get_metadata()
610        if (metadata is not None) and (b'Cuesheet' in metadata):
611            del(metadata[b'Cuesheet'])
612            self.update_metadata(metadata)
613