1#!/usr/bin/python
2
3# Audio Tools, a module and set of tools for manipulating audio data
4# Copyright (C) 2007-2014  Brian Langenberger
5
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19
20from audiotools import (AudioFile, InvalidFile)
21from audiotools.vorbis import (VorbisAudio, VorbisChannelMask)
22from audiotools.vorbiscomment import VorbisComment
23
24
25class InvalidOpus(InvalidFile):
26    pass
27
28
29#######################
30# Vorbis File
31#######################
32
33class OpusAudio(VorbisAudio):
34    """an Opus file"""
35
36    SUFFIX = "opus"
37    NAME = "opus"
38    DESCRIPTION = u"Opus Audio Codec"
39    DEFAULT_COMPRESSION = "10"
40    COMPRESSION_MODES = tuple(map(str, range(0, 11)))
41    COMPRESSION_DESCRIPTIONS = {"0": u"lowest quality, fastest encode",
42                                "10": u"best quality, slowest encode"}
43
44    def __init__(self, filename):
45        """filename is a plain string"""
46
47        AudioFile.__init__(self, filename)
48        self.__channels__ = 0
49        self.__channel_mask__ = 0
50
51        # get channel count and channel mask from first packet
52        from audiotools.bitstream import BitstreamReader
53        try:
54            with BitstreamReader(open(filename, "rb"), True) as ogg_reader:
55                (magic_number,
56                 version,
57                 header_type,
58                 granule_position,
59                 self.__serial_number__,
60                 page_sequence_number,
61                 checksum,
62                 segment_count) = ogg_reader.parse(
63                    "4b 8u 8u 64S 32u 32u 32u 8u")
64
65                if magic_number != b'OggS':
66                    from audiotools.text import ERR_OGG_INVALID_MAGIC_NUMBER
67                    raise InvalidOpus(ERR_OGG_INVALID_MAGIC_NUMBER)
68                if version != 0:
69                    from audiotools.text import ERR_OGG_INVALID_VERSION
70                    raise InvalidOpus(ERR_OGG_INVALID_VERSION)
71
72                segment_length = ogg_reader.read(8)
73
74                (opushead,
75                 version,
76                 self.__channels__,
77                 pre_skip,
78                 input_sample_rate,
79                 output_gain,
80                 mapping_family) = ogg_reader.parse(
81                    "8b 8u 8u 16u 32u 16s 8u")
82
83                if opushead != b"OpusHead":
84                    from audiotools.text import ERR_OPUS_INVALID_TYPE
85                    raise InvalidOpus(ERR_OPUS_INVALID_TYPE)
86                if version != 1:
87                    from audiotools.text import ERR_OPUS_INVALID_VERSION
88                    raise InvalidOpus(ERR_OPUS_INVALID_VERSION)
89                if self.__channels__ == 0:
90                    from audiotools.text import ERR_OPUS_INVALID_CHANNELS
91                    raise InvalidOpus(ERR_OPUS_INVALID_CHANNELS)
92
93                # FIXME - assign channel mask from mapping family
94                if mapping_family == 0:
95                    if self.__channels__ == 1:
96                        self.__channel_mask__ = VorbisChannelMask(0x4)
97                    elif self.__channels__ == 2:
98                        self.__channel_mask__ = VorbisChannelMask(0x3)
99                    else:
100                        self.__channel_mask__ = VorbisChannelMask(0)
101                else:
102                    (stream_count,
103                     coupled_stream_count) = ogg_reader.parse("8u 8u")
104                    if (self.__channels__ !=
105                        ((coupled_stream_count * 2) +
106                         (stream_count - coupled_stream_count))):
107                        from audiotools.text import ERR_OPUS_INVALID_CHANNELS
108                        raise InvalidOpus(ERR_OPUS_INVALID_CHANNELS)
109                    channel_mapping = [ogg_reader.read(8)
110                                       for i in range(self.__channels__)]
111        except IOError as msg:
112            raise InvalidOpus(str(msg))
113
114    @classmethod
115    def supports_replay_gain(cls):
116        """returns True if this class supports ReplayGain"""
117
118        return False
119
120    def get_replay_gain(self):
121        """returns a ReplayGain object of our ReplayGain values
122
123        returns None if we have no values
124
125        may raise IOError if unable to read the file"""
126
127        return None
128
129    def set_replay_gain(self, replaygain):
130        """given a ReplayGain object, sets the track's gain to those values
131
132        may raise IOError if unable to modify the file"""
133
134        pass
135
136    def delete_replay_gain(self):
137        """removes ReplayGain values from file, if any
138
139        may raise IOError if unable to modify the file"""
140
141        pass
142
143    def total_frames(self):
144        """returns the total PCM frames of the track as an integer"""
145
146        from audiotools._ogg import PageReader
147
148        try:
149            with PageReader(open(self.filename, "rb")) as reader:
150                page = reader.read()
151                pcm_samples = page.granule_position
152
153                while not page.stream_end:
154                    page = reader.read()
155                    pcm_samples = max(pcm_samples, page.granule_position)
156
157                return pcm_samples
158        except (IOError, ValueError):
159            return 0
160
161    def sample_rate(self):
162        """returns the rate of the track's audio as an integer number of Hz"""
163
164        return 48000
165
166    def to_pcm(self):
167        """returns a PCMReader object containing the track's PCM data
168
169        if an error occurs initializing a decoder, this should
170        return a PCMReaderError with an appropriate error message"""
171
172        from audiotools.decoders import OpusDecoder
173
174        try:
175            return OpusDecoder(self.filename)
176        except ValueError as err:
177            from audiotools import PCMReaderError
178            return PCMReaderError(error_message=str(err),
179                                  sample_rate=self.sample_rate(),
180                                  channels=self.channels(),
181                                  channel_mask=int(self.channel_mask()),
182                                  bits_per_sample=self.bits_per_sample())
183
184    @classmethod
185    def from_pcm(cls, filename, pcmreader,
186                 compression=None, total_pcm_frames=None):
187        """encodes a new file from PCM data
188
189        takes a filename string, PCMReader object,
190        optional compression level string and
191        optional total_pcm_frames integer
192        encodes a new audio file from pcmreader's data
193        at the given filename with the specified compression level
194        and returns a new AudioFile-compatible object
195
196        may raise EncodingError if some problem occurs when
197        encoding the input file.  This includes an error
198        in the input stream, a problem writing the output file,
199        or even an EncodingError subclass such as
200        "UnsupportedBitsPerSample" if the input stream
201        is formatted in a way this class is unable to support
202        """
203
204        from audiotools import (BufferedPCMReader,
205                                PCMConverter,
206                                __default_quality__,
207                                EncodingError)
208        from audiotools.encoders import encode_opus
209
210        if (((compression is None) or
211             (compression not in cls.COMPRESSION_MODES))):
212            compression = __default_quality__(cls.NAME)
213
214        if (pcmreader.channels > 2) and (pcmreader.channels <= 8):
215            if ((pcmreader.channel_mask != 0) and
216                (pcmreader.channel_mask not in
217                 {0x7,      # FR, FC, FL
218                  0x33,     # FR, FL, BR, BL
219                  0x37,     # FR, FC, FL, BL, BR
220                  0x3f,     # FR, FC, FL, BL, BR, LFE
221                  0x70f,    # FL, FC, FR, SL, SR, BC, LFE
222                  0x63f})):  # FL, FC, FR, SL, SR, BL, BR, LFE
223                from audiotools import UnsupportedChannelMask
224                pcmreader.close()
225                raise UnsupportedChannelMask(filename, channel_mask)
226
227        try:
228            if total_pcm_frames is not None:
229                from audiotools import CounterPCMReader
230                pcmreader = CounterPCMReader(pcmreader)
231
232            encode_opus(filename,
233                        BufferedPCMReader(
234                            PCMConverter(pcmreader,
235                                         sample_rate=48000,
236                                         channels=pcmreader.channels,
237                                         channel_mask=pcmreader.channel_mask,
238                                         bits_per_sample=16)),
239                        quality=int(compression),
240                        original_sample_rate=pcmreader.sample_rate)
241
242            pcmreader.close()
243
244            if ((total_pcm_frames is not None) and
245                (total_pcm_frames != pcmreader.frames_written)):
246                from audiotools.text import ERR_TOTAL_PCM_FRAMES_MISMATCH
247                cls.__unlink__(filename)
248                raise EncodingError(ERR_TOTAL_PCM_FRAMES_MISMATCH)
249
250            return cls(filename)
251        except (ValueError, IOError) as err:
252            pcmreader.close()
253            cls.__unlink__(filename)
254            raise EncodingError(err)
255
256    def update_metadata(self, metadata):
257        """takes this track's current MetaData object
258        as returned by get_metadata() and sets this track's metadata
259        with any fields updated in that object
260
261        raises IOError if unable to write the file
262        """
263
264        import os
265        from audiotools import TemporaryFile
266        from audiotools.ogg import (PageReader,
267                                    PacketReader,
268                                    PageWriter,
269                                    packet_to_pages,
270                                    packets_to_pages)
271        from audiotools.bitstream import BitstreamRecorder
272
273        if metadata is None:
274            return
275        elif not isinstance(metadata, VorbisComment):
276            from audiotools.text import ERR_FOREIGN_METADATA
277            raise ValueError(ERR_FOREIGN_METADATA)
278        elif not os.access(self.filename, os.W_OK):
279            raise IOError(self.filename)
280
281        original_ogg = PacketReader(PageReader(open(self.filename, "rb")))
282        new_ogg = PageWriter(TemporaryFile(self.filename))
283
284        sequence_number = 0
285
286        # transfer current file's identification packet in its own page
287        identification_packet = original_ogg.read_packet()
288        for (i, page) in enumerate(packet_to_pages(
289                identification_packet,
290                self.__serial_number__,
291                starting_sequence_number=sequence_number)):
292            page.stream_beginning = (i == 0)
293            new_ogg.write(page)
294            sequence_number += 1
295
296        # discard the current file's comment packet
297        comment_packet = original_ogg.read_packet()
298
299        # generate new comment packet
300        comment_writer = BitstreamRecorder(True)
301        comment_writer.write_bytes(b"OpusTags")
302        vendor_string = metadata.vendor_string.encode('utf-8')
303        comment_writer.build("32u %db" % (len(vendor_string)),
304                             (len(vendor_string), vendor_string))
305        comment_writer.write(32, len(metadata.comment_strings))
306        for comment_string in metadata.comment_strings:
307            comment_string = comment_string.encode('utf-8')
308            comment_writer.build("32u %db" % (len(comment_string)),
309                                 (len(comment_string), comment_string))
310
311        for page in packet_to_pages(
312                comment_writer.data(),
313                self.__serial_number__,
314                starting_sequence_number=sequence_number):
315            new_ogg.write(page)
316            sequence_number += 1
317
318        # transfer remaining pages after re-sequencing
319        page = original_ogg.read_page()
320        page.sequence_number = sequence_number
321        sequence_number += 1
322        new_ogg.write(page)
323        while not page.stream_end:
324            page = original_ogg.read_page()
325            page.sequence_number = sequence_number
326            page.bitstream_serial_number = self.__serial_number__
327            sequence_number += 1
328            new_ogg.write(page)
329
330        original_ogg.close()
331        new_ogg.close()
332
333    def set_metadata(self, metadata):
334        """takes a MetaData object and sets this track's metadata
335
336        this metadata includes track name, album name, and so on
337        raises IOError if unable to write the file"""
338
339        if metadata is None:
340            return self.delete_metadata()
341
342        metadata = VorbisComment.converted(metadata)
343
344        old_metadata = self.get_metadata()
345
346        metadata.vendor_string = old_metadata.vendor_string
347
348        # port REPLAYGAIN and ENCODER from old metadata to new metadata
349        for key in [u"REPLAYGAIN_TRACK_GAIN",
350                    u"REPLAYGAIN_TRACK_PEAK",
351                    u"REPLAYGAIN_ALBUM_GAIN",
352                    u"REPLAYGAIN_ALBUM_PEAK",
353                    u"REPLAYGAIN_REFERENCE_LOUDNESS",
354                    u"ENCODER"]:
355            try:
356                metadata[key] = old_metadata[key]
357            except KeyError:
358                metadata[key] = []
359
360        self.update_metadata(metadata)
361
362    @classmethod
363    def supports_metadata(cls):
364        """returns True if this audio type supports MetaData"""
365
366        return True
367
368    def get_metadata(self):
369        """returns a MetaData object, or None
370
371        raises IOError if unable to read the file"""
372
373        from io import BytesIO
374        from audiotools.bitstream import BitstreamReader
375        from audiotools.ogg import PacketReader, PageReader
376
377        with PacketReader(PageReader(open(self.filename, "rb"))) as reader:
378            identification = reader.read_packet()
379            comment = BitstreamReader(BytesIO(reader.read_packet()), True)
380
381            if comment.read_bytes(8) == b"OpusTags":
382                vendor_string = \
383                    comment.read_bytes(comment.read(32)).decode('utf-8')
384                comment_strings = [
385                    comment.read_bytes(comment.read(32)).decode('utf-8')
386                    for i in range(comment.read(32))]
387
388                return VorbisComment(comment_strings, vendor_string)
389            else:
390                return None
391
392    def delete_metadata(self):
393        """deletes the track's MetaData
394
395        this removes or unsets tags as necessary in order to remove all data
396        raises IOError if unable to write the file"""
397
398        from audiotools import MetaData
399
400        # the vorbis comment packet is required,
401        # so simply zero out its contents
402        self.set_metadata(MetaData())
403
404    def verify(self, progress=None):
405        """verifies the current file for correctness
406
407        returns True if the file is okay
408        raises an InvalidFile with an error message if there is
409        some problem with the file"""
410
411        # Checking for a truncated Ogg stream typically involves
412        # verifying that the "end of stream" flag is set on the last
413        # Ogg page in the stream in the event that one or more whole
414        # pages is lost.  But since the OpusFile decoder doesn't perform
415        # this check and doesn't provide any access to its internal
416        # Ogg decoder (unlike Vorbis), we'll perform that check externally.
417        #
418        # And since it's a fast check, we won't bother to update progress.
419
420        from audiotools.ogg import PageReader
421        import os.path
422
423        try:
424            f = open(self.filename, "rb")
425        except IOError as err:
426            raise InvalidOpus(str(err))
427        try:
428            reader = PageReader(f)
429        except IOError as err:
430            f.close()
431            raise InvalidOpus(str(err))
432
433        try:
434            page = reader.read()
435            while not page.stream_end:
436                page = reader.read()
437        except (IOError, ValueError) as err:
438            raise InvalidOpus(str(err))
439        finally:
440            reader.close()
441
442        return AudioFile.verify(self, progress)
443
444    @classmethod
445    def available(cls, system_binaries):
446        """returns True if all necessary compenents are available
447        to support format"""
448
449        try:
450            from audiotools.decoders import OpusDecoder
451            from audiotools.encoders import encode_opus
452
453            return True
454        except ImportError:
455            return False
456
457    @classmethod
458    def missing_components(cls, messenger):
459        """given a Messenger object, displays missing binaries or libraries
460        needed to support this format and where to get them"""
461
462        from audiotools.text import (ERR_LIBRARY_NEEDED,
463                                     ERR_LIBRARY_DOWNLOAD_URL,
464                                     ERR_PROGRAM_PACKAGE_MANAGER)
465
466        format_ = cls.NAME.decode('ascii')
467
468        # display where to get vorbisfile
469        messenger.info(
470            ERR_LIBRARY_NEEDED %
471            {"library": u"\"libopus\" and \"opusfile\"",
472             "format": format_})
473        messenger.info(
474            ERR_LIBRARY_DOWNLOAD_URL %
475            {"library": u"libopus and opusfile",
476             "url": "http://www.opus-codec.org/"})
477
478        messenger.info(ERR_PROGRAM_PACKAGE_MANAGER)
479