1#!/usr/bin/python
2
3# Audio Tools, a module and set of tools for manipulating audio data
4# Copyright (C) 2007-2014  Brian Langenberger
5
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19
20from audiotools import (AudioFile, InvalidFile, ChannelMask)
21
22
23class InvalidVorbis(InvalidFile):
24    pass
25
26
27class VorbisAudio(AudioFile):
28    """an Ogg Vorbis file"""
29
30    from audiotools.text import (COMP_VORBIS_0,
31                                 COMP_VORBIS_10)
32
33    SUFFIX = "ogg"
34    NAME = SUFFIX
35    DESCRIPTION = u"Ogg Vorbis"
36    DEFAULT_COMPRESSION = "3"
37    COMPRESSION_MODES = tuple([str(i) for i in range(0, 11)])
38    COMPRESSION_DESCRIPTIONS = {"0": COMP_VORBIS_0,
39                                "10": COMP_VORBIS_10}
40
41    def __init__(self, filename):
42        """filename is a plain string"""
43
44        AudioFile.__init__(self, filename)
45        self.__sample_rate__ = 0
46        self.__channels__ = 0
47        try:
48            self.__read_identification__()
49        except IOError as msg:
50            raise InvalidVorbis(str(msg))
51
52    def __read_identification__(self):
53        from audiotools.bitstream import BitstreamReader
54
55        with BitstreamReader(open(self.filename, "rb"), True) as ogg_reader:
56            (magic_number,
57             version,
58             header_type,
59             granule_position,
60             self.__serial_number__,
61             page_sequence_number,
62             checksum,
63             segment_count) = ogg_reader.parse("4b 8u 8u 64S 32u 32u 32u 8u")
64
65            if magic_number != b'OggS':
66                from audiotools.text import ERR_OGG_INVALID_MAGIC_NUMBER
67                raise InvalidVorbis(ERR_OGG_INVALID_MAGIC_NUMBER)
68            if version != 0:
69                from audiotools.text import ERR_OGG_INVALID_VERSION
70                raise InvalidVorbis(ERR_OGG_INVALID_VERSION)
71
72            segment_length = ogg_reader.read(8)
73
74            (vorbis_type,
75             header,
76             version,
77             self.__channels__,
78             self.__sample_rate__,
79             maximum_bitrate,
80             nominal_bitrate,
81             minimum_bitrate,
82             blocksize0,
83             blocksize1,
84             framing) = ogg_reader.parse(
85                "8u 6b 32u 8u 32u 32u 32u 32u 4u 4u 1u")
86
87            if vorbis_type != 1:
88                from audiotools.text import ERR_VORBIS_INVALID_TYPE
89                raise InvalidVorbis(ERR_VORBIS_INVALID_TYPE)
90            if header != b'vorbis':
91                from audiotools.text import ERR_VORBIS_INVALID_HEADER
92                raise InvalidVorbis(ERR_VORBIS_INVALID_HEADER)
93            if version != 0:
94                from audiotools.text import ERR_VORBIS_INVALID_VERSION
95                raise InvalidVorbis(ERR_VORBIS_INVALID_VERSION)
96            if framing != 1:
97                from audiotools.text import ERR_VORBIS_INVALID_FRAMING_BIT
98                raise InvalidVorbis(ERR_VORBIS_INVALID_FRAMING_BIT)
99
100    def lossless(self):
101        """returns False"""
102
103        return False
104
105    def bits_per_sample(self):
106        """returns an integer number of bits-per-sample this track contains"""
107
108        return 16
109
110    def channels(self):
111        """returns an integer number of channels this track contains"""
112
113        return self.__channels__
114
115    def channel_mask(self):
116        """returns a ChannelMask object of this track's channel layout"""
117
118        if self.channels() == 1:
119            return ChannelMask.from_fields(
120                front_center=True)
121        elif self.channels() == 2:
122            return ChannelMask.from_fields(
123                front_left=True, front_right=True)
124        elif self.channels() == 3:
125            return ChannelMask.from_fields(
126                front_left=True, front_right=True,
127                front_center=True)
128        elif self.channels() == 4:
129            return ChannelMask.from_fields(
130                front_left=True, front_right=True,
131                back_left=True, back_right=True)
132        elif self.channels() == 5:
133            return ChannelMask.from_fields(
134                front_left=True, front_right=True,
135                front_center=True,
136                back_left=True, back_right=True)
137        elif self.channels() == 6:
138            return ChannelMask.from_fields(
139                front_left=True, front_right=True,
140                front_center=True,
141                back_left=True, back_right=True,
142                low_frequency=True)
143        elif self.channels() == 7:
144            return ChannelMask.from_fields(
145                front_left=True, front_right=True,
146                front_center=True,
147                side_left=True, side_right=True,
148                back_center=True, low_frequency=True)
149        elif self.channels() == 8:
150            return ChannelMask.from_fields(
151                front_left=True, front_right=True,
152                side_left=True, side_right=True,
153                back_left=True, back_right=True,
154                front_center=True, low_frequency=True)
155        else:
156            return ChannelMask(0)
157
158    def total_frames(self):
159        """returns the total PCM frames of the track as an integer"""
160
161        from audiotools._ogg import PageReader
162
163        try:
164            with PageReader(open(self.filename, "rb")) as reader:
165                page = reader.read()
166                pcm_samples = page.granule_position
167
168                while not page.stream_end:
169                    page = reader.read()
170                    pcm_samples = max(pcm_samples, page.granule_position)
171
172                return pcm_samples
173        except (IOError, ValueError):
174            return 0
175
176    def sample_rate(self):
177        """returns the rate of the track's audio as an integer number of Hz"""
178
179        return self.__sample_rate__
180
181    def to_pcm(self):
182        """returns a PCMReader object containing the track's PCM data"""
183
184        from audiotools.decoders import VorbisDecoder
185
186        try:
187            return VorbisDecoder(self.filename)
188        except ValueError as err:
189            from audiotools import PCMReaderError
190            return PCMReaderError(str(err),
191                                  self.sample_rate(),
192                                  self.channels(),
193                                  int(self.channel_mask()),
194                                  self.bits_per_sample())
195
196    @classmethod
197    def from_pcm(cls, filename, pcmreader,
198                 compression=None, total_pcm_frames=None):
199        """encodes a new file from PCM data
200
201        takes a filename string, PCMReader object,
202        optional compression level string and
203        optional total_pcm_frames integer
204        encodes a new audio file from pcmreader's data
205        at the given filename with the specified compression level
206        and returns a new VorbisAudio object"""
207
208        from audiotools import (BufferedPCMReader,
209                                __default_quality__,
210                                EncodingError)
211        from audiotools.encoders import encode_vorbis
212
213        if (((compression is None) or
214             (compression not in cls.COMPRESSION_MODES))):
215            compression = __default_quality__(cls.NAME)
216
217        if (pcmreader.channels > 2) and (pcmreader.channels <= 8):
218            channel_mask = int(pcmreader.channel_mask)
219            if ((channel_mask != 0) and
220                (channel_mask not in
221                 (0x7,       # FR, FC, FL
222                  0x33,      # FR, FL, BR, BL
223                  0x37,      # FR, FC, FL, BL, BR
224                  0x3f,      # FR, FC, FL, BL, BR, LFE
225                  0x70f,     # FL, FC, FR, SL, SR, BC, LFE
226                  0x63f))):  # FL, FC, FR, SL, SR, BL, BR, LFE
227                from audiotools import UnsupportedChannelMask
228                raise UnsupportedChannelMask(filename, channel_mask)
229
230        if total_pcm_frames is not None:
231            from audiotools import CounterPCMReader
232            pcmreader = CounterPCMReader(pcmreader)
233        try:
234
235            encode_vorbis(filename,
236                          BufferedPCMReader(pcmreader),
237                          float(compression) / 10)
238
239            if ((total_pcm_frames is not None) and
240                (total_pcm_frames != pcmreader.frames_written)):
241                from audiotools.text import ERR_TOTAL_PCM_FRAMES_MISMATCH
242                cls.__unlink__(filename)
243                raise EncodingError(ERR_TOTAL_PCM_FRAMES_MISMATCH)
244
245            return VorbisAudio(filename)
246        except (ValueError, IOError) as err:
247            cls.__unlink__(filename)
248            raise EncodingError(str(err))
249        finally:
250            pcmreader.close()
251
252    def update_metadata(self, metadata):
253        """takes this track's current MetaData object
254        as returned by get_metadata() and sets this track's metadata
255        with any fields updated in that object
256
257        raises IOError if unable to write the file
258        """
259
260        import os
261        from audiotools import TemporaryFile
262        from audiotools.ogg import (PageReader,
263                                    PacketReader,
264                                    PageWriter,
265                                    packet_to_pages,
266                                    packets_to_pages)
267        from audiotools.vorbiscomment import VorbisComment
268        from audiotools.bitstream import BitstreamRecorder
269
270        if metadata is None:
271            return
272        elif not isinstance(metadata, VorbisComment):
273            from audiotools.text import ERR_FOREIGN_METADATA
274            raise ValueError(ERR_FOREIGN_METADATA)
275        elif not os.access(self.filename, os.W_OK):
276            raise IOError(self.filename)
277
278        original_ogg = PacketReader(PageReader(open(self.filename, "rb")))
279        new_ogg = PageWriter(TemporaryFile(self.filename))
280
281        sequence_number = 0
282
283        # transfer current file's identification packet in its own page
284        identification_packet = original_ogg.read_packet()
285        for (i, page) in enumerate(packet_to_pages(
286                identification_packet,
287                self.__serial_number__,
288                starting_sequence_number=sequence_number)):
289            page.stream_beginning = (i == 0)
290            new_ogg.write(page)
291            sequence_number += 1
292
293        # discard the current file's comment packet
294        comment_packet = original_ogg.read_packet()
295
296        # generate new comment packet
297        comment_writer = BitstreamRecorder(True)
298        comment_writer.build("8u 6b", (3, b"vorbis"))
299        vendor_string = metadata.vendor_string.encode('utf-8')
300        comment_writer.build("32u %db" % (len(vendor_string)),
301                             (len(vendor_string), vendor_string))
302        comment_writer.write(32, len(metadata.comment_strings))
303        for comment_string in metadata.comment_strings:
304            comment_string = comment_string.encode('utf-8')
305            comment_writer.build("32u %db" % (len(comment_string)),
306                                 (len(comment_string), comment_string))
307
308        comment_writer.build("1u a", (1,))  # framing bit
309
310        # transfer codebooks packet from original file to new file
311        codebooks_packet = original_ogg.read_packet()
312
313        for page in packets_to_pages(
314                [comment_writer.data(), codebooks_packet],
315                self.__serial_number__,
316                starting_sequence_number=sequence_number):
317            new_ogg.write(page)
318            sequence_number += 1
319
320        # transfer remaining pages after re-sequencing
321        page = original_ogg.read_page()
322        page.sequence_number = sequence_number
323        sequence_number += 1
324        new_ogg.write(page)
325        while not page.stream_end:
326            page = original_ogg.read_page()
327            page.sequence_number = sequence_number
328            page.bitstream_serial_number = self.__serial_number__
329            sequence_number += 1
330            new_ogg.write(page)
331
332        original_ogg.close()
333        new_ogg.close()
334
335    def set_metadata(self, metadata):
336        """takes a MetaData object and sets this track's metadata
337
338        this metadata includes track name, album name, and so on
339        raises IOError if unable to write the file"""
340
341        from audiotools.vorbiscomment import VorbisComment
342
343        if metadata is None:
344            return self.delete_metadata()
345
346        metadata = VorbisComment.converted(metadata)
347
348        old_metadata = self.get_metadata()
349
350        metadata.vendor_string = old_metadata.vendor_string
351
352        # remove REPLAYGAIN_* tags from new metadata (if any)
353        for key in [u"REPLAYGAIN_TRACK_GAIN",
354                    u"REPLAYGAIN_TRACK_PEAK",
355                    u"REPLAYGAIN_ALBUM_GAIN",
356                    u"REPLAYGAIN_ALBUM_PEAK",
357                    u"REPLAYGAIN_REFERENCE_LOUDNESS"]:
358            try:
359                metadata[key] = old_metadata[key]
360            except KeyError:
361                metadata[key] = []
362
363        self.update_metadata(metadata)
364
365    @classmethod
366    def supports_metadata(cls):
367        """returns True if this audio type supports MetaData"""
368
369        return True
370
371    def get_metadata(self):
372        """returns a MetaData object, or None
373
374        raises IOError if unable to read the file"""
375
376        from io import BytesIO
377        from audiotools.bitstream import BitstreamReader
378        from audiotools.ogg import PacketReader, PageReader
379        from audiotools.vorbiscomment import VorbisComment
380
381        with PacketReader(PageReader(open(self.filename, "rb"))) as reader:
382            identification = reader.read_packet()
383            comment = BitstreamReader(BytesIO(reader.read_packet()), True)
384
385            (packet_type, packet_header) = comment.parse("8u 6b")
386            if (packet_type == 3) and (packet_header == b'vorbis'):
387                vendor_string = \
388                    comment.read_bytes(comment.read(32)).decode('utf-8')
389                comment_strings = [
390                    comment.read_bytes(comment.read(32)).decode('utf-8')
391                    for i in range(comment.read(32))]
392                if comment.read(1) == 1:   # framing bit
393                    return VorbisComment(comment_strings, vendor_string)
394                else:
395                    return None
396            else:
397                return None
398
399    def delete_metadata(self):
400        """deletes the track's MetaData
401
402        this removes or unsets tags as necessary in order to remove all data
403        raises IOError if unable to write the file"""
404
405        from audiotools import MetaData
406
407        # the vorbis comment packet is required,
408        # so simply zero out its contents
409        self.set_metadata(MetaData())
410
411    @classmethod
412    def supports_replay_gain(cls):
413        """returns True if this class supports ReplayGain"""
414
415        return True
416
417    def get_replay_gain(self):
418        """returns a ReplayGain object of our ReplayGain values
419
420        returns None if we have no values"""
421
422        from audiotools import ReplayGain
423
424        vorbis_metadata = self.get_metadata()
425
426        if ((vorbis_metadata is not None) and
427            ({u'REPLAYGAIN_TRACK_PEAK',
428              u'REPLAYGAIN_TRACK_GAIN',
429              u'REPLAYGAIN_ALBUM_PEAK',
430              u'REPLAYGAIN_ALBUM_GAIN'}.issubset(vorbis_metadata.keys()))):
431            # we have ReplayGain data
432            try:
433                return ReplayGain(
434                    vorbis_metadata[u'REPLAYGAIN_TRACK_GAIN'][0][0:-len(u" dB")],
435                    vorbis_metadata[u'REPLAYGAIN_TRACK_PEAK'][0],
436                    vorbis_metadata[u'REPLAYGAIN_ALBUM_GAIN'][0][0:-len(u" dB")],
437                    vorbis_metadata[u'REPLAYGAIN_ALBUM_PEAK'][0])
438            except (IndexError, ValueError):
439                return None
440        else:
441            return None
442
443    def set_replay_gain(self, replaygain):
444        """given a ReplayGain object, sets the track's gain to those values
445
446        may raise IOError if unable to modify the file"""
447
448        if replaygain is None:
449            return self.delete_replay_gain()
450
451        vorbis_comment = self.get_metadata()
452        if vorbis_comment is None:
453            from audiotools.vorbiscomment import VorbisComment
454            from audiotools import VERSION
455
456            vorbis_comment = VorbisComment(
457                [], u"Python Audio Tools %s" % (VERSION))
458
459        vorbis_comment[u"REPLAYGAIN_TRACK_GAIN"] = [
460            u"%1.2f dB" % (replaygain.track_gain)]
461        vorbis_comment[u"REPLAYGAIN_TRACK_PEAK"] = [
462            u"%1.8f" % (replaygain.track_peak)]
463        vorbis_comment[u"REPLAYGAIN_ALBUM_GAIN"] = [
464            u"%1.2f dB" % (replaygain.album_gain)]
465        vorbis_comment[u"REPLAYGAIN_ALBUM_PEAK"] = [
466            u"%1.8f" % (replaygain.album_peak)]
467        vorbis_comment[u"REPLAYGAIN_REFERENCE_LOUDNESS"] = [u"89.0 dB"]
468
469        self.update_metadata(vorbis_comment)
470
471    def delete_replay_gain(self):
472        """removes ReplayGain values from file, if any
473
474        may raise IOError if unable to modify the file"""
475
476        vorbis_comment = self.get_metadata()
477        if vorbis_comment is not None:
478            for field in [u"REPLAYGAIN_TRACK_GAIN",
479                          u"REPLAYGAIN_TRACK_PEAK",
480                          u"REPLAYGAIN_ALBUM_GAIN",
481                          u"REPLAYGAIN_ALBUM_PEAK",
482                          u"REPLAYGAIN_REFERENCE_LOUDNESS"]:
483                try:
484                    del(vorbis_comment[field])
485                except KeyError:
486                    pass
487
488            self.update_metadata(vorbis_comment)
489
490    @classmethod
491    def available(cls, system_binaries):
492        """returns True if all necessary compenents are available
493        to support format"""
494
495        try:
496            from audiotools.decoders import VorbisDecoder
497            from audiotools.encoders import encode_vorbis
498
499            return True
500        except ImportError:
501            return False
502
503    @classmethod
504    def missing_components(cls, messenger):
505        """given a Messenger object, displays missing binaries or libraries
506        needed to support this format and where to get them"""
507
508        from audiotools.text import (ERR_LIBRARY_NEEDED,
509                                     ERR_LIBRARY_DOWNLOAD_URL,
510                                     ERR_PROGRAM_PACKAGE_MANAGER)
511
512        format_ = cls.NAME.decode('ascii')
513
514        # display where to get vorbisfile
515        messenger.info(
516            ERR_LIBRARY_NEEDED %
517            {"library": u"\"libvorbisfile\"",
518             "format": format_})
519        messenger.info(
520            ERR_LIBRARY_DOWNLOAD_URL %
521            {"library": u"libvorbisfile",
522             "url": "http://www.xiph.org/"})
523
524        messenger.info(ERR_PROGRAM_PACKAGE_MANAGER)
525
526
527class VorbisChannelMask(ChannelMask):
528    """the Vorbis-specific channel mapping"""
529
530    def __repr__(self):
531        return "VorbisChannelMask(%s)" % \
532            ",".join(["%s=%s" % (field, getattr(self, field))
533                      for field in self.SPEAKER_TO_MASK.keys()
534                      if (getattr(self, field))])
535
536    def channels(self):
537        """returns a list of speaker strings this mask contains
538
539        returned in the order in which they should appear
540        in the PCM stream
541        """
542
543        count = len(self)
544        if count == 1:
545            return ["front_center"]
546        elif count == 2:
547            return ["front_left", "front_right"]
548        elif count == 3:
549            return ["front_left", "front_center", "front_right"]
550        elif count == 4:
551            return ["front_left", "front_right",
552                    "back_left", "back_right"]
553        elif count == 5:
554            return ["front_left", "front_center", "front_right",
555                    "back_left", "back_right"]
556        elif count == 6:
557            return ["front_left", "front_center", "front_right",
558                    "back_left", "back_right", "low_frequency"]
559        elif count == 7:
560            return ["front_left", "front_center", "front_right",
561                    "side_left", "side_right", "back_center",
562                    "low_frequency"]
563        elif count == 8:
564            return ["front_left", "front_center", "front_right",
565                    "side_left", "side_right",
566                    "back_left", "back_right", "low_frequency"]
567        else:
568            return []
569