1#!/usr/bin/python 2 3# Audio Tools, a module and set of tools for manipulating audio data 4# Copyright (C) 2007-2014 Brian Langenberger 5 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 19 20from audiotools import (AudioFile, InvalidFile, ChannelMask) 21 22 23class InvalidVorbis(InvalidFile): 24 pass 25 26 27class VorbisAudio(AudioFile): 28 """an Ogg Vorbis file""" 29 30 from audiotools.text import (COMP_VORBIS_0, 31 COMP_VORBIS_10) 32 33 SUFFIX = "ogg" 34 NAME = SUFFIX 35 DESCRIPTION = u"Ogg Vorbis" 36 DEFAULT_COMPRESSION = "3" 37 COMPRESSION_MODES = tuple([str(i) for i in range(0, 11)]) 38 COMPRESSION_DESCRIPTIONS = {"0": COMP_VORBIS_0, 39 "10": COMP_VORBIS_10} 40 41 def __init__(self, filename): 42 """filename is a plain string""" 43 44 AudioFile.__init__(self, filename) 45 self.__sample_rate__ = 0 46 self.__channels__ = 0 47 try: 48 self.__read_identification__() 49 except IOError as msg: 50 raise InvalidVorbis(str(msg)) 51 52 def __read_identification__(self): 53 from audiotools.bitstream import BitstreamReader 54 55 with BitstreamReader(open(self.filename, "rb"), True) as ogg_reader: 56 (magic_number, 57 version, 58 header_type, 59 granule_position, 60 self.__serial_number__, 61 page_sequence_number, 62 checksum, 63 segment_count) = ogg_reader.parse("4b 8u 8u 64S 32u 32u 32u 8u") 64 65 if magic_number != b'OggS': 66 from audiotools.text import ERR_OGG_INVALID_MAGIC_NUMBER 67 raise InvalidVorbis(ERR_OGG_INVALID_MAGIC_NUMBER) 68 if version != 0: 69 from audiotools.text import ERR_OGG_INVALID_VERSION 70 raise InvalidVorbis(ERR_OGG_INVALID_VERSION) 71 72 segment_length = ogg_reader.read(8) 73 74 (vorbis_type, 75 header, 76 version, 77 self.__channels__, 78 self.__sample_rate__, 79 maximum_bitrate, 80 nominal_bitrate, 81 minimum_bitrate, 82 blocksize0, 83 blocksize1, 84 framing) = ogg_reader.parse( 85 "8u 6b 32u 8u 32u 32u 32u 32u 4u 4u 1u") 86 87 if vorbis_type != 1: 88 from audiotools.text import ERR_VORBIS_INVALID_TYPE 89 raise InvalidVorbis(ERR_VORBIS_INVALID_TYPE) 90 if header != b'vorbis': 91 from audiotools.text import ERR_VORBIS_INVALID_HEADER 92 raise InvalidVorbis(ERR_VORBIS_INVALID_HEADER) 93 if version != 0: 94 from audiotools.text import ERR_VORBIS_INVALID_VERSION 95 raise InvalidVorbis(ERR_VORBIS_INVALID_VERSION) 96 if framing != 1: 97 from audiotools.text import ERR_VORBIS_INVALID_FRAMING_BIT 98 raise InvalidVorbis(ERR_VORBIS_INVALID_FRAMING_BIT) 99 100 def lossless(self): 101 """returns False""" 102 103 return False 104 105 def bits_per_sample(self): 106 """returns an integer number of bits-per-sample this track contains""" 107 108 return 16 109 110 def channels(self): 111 """returns an integer number of channels this track contains""" 112 113 return self.__channels__ 114 115 def channel_mask(self): 116 """returns a ChannelMask object of this track's channel layout""" 117 118 if self.channels() == 1: 119 return ChannelMask.from_fields( 120 front_center=True) 121 elif self.channels() == 2: 122 return ChannelMask.from_fields( 123 front_left=True, front_right=True) 124 elif self.channels() == 3: 125 return ChannelMask.from_fields( 126 front_left=True, front_right=True, 127 front_center=True) 128 elif self.channels() == 4: 129 return ChannelMask.from_fields( 130 front_left=True, front_right=True, 131 back_left=True, back_right=True) 132 elif self.channels() == 5: 133 return ChannelMask.from_fields( 134 front_left=True, front_right=True, 135 front_center=True, 136 back_left=True, back_right=True) 137 elif self.channels() == 6: 138 return ChannelMask.from_fields( 139 front_left=True, front_right=True, 140 front_center=True, 141 back_left=True, back_right=True, 142 low_frequency=True) 143 elif self.channels() == 7: 144 return ChannelMask.from_fields( 145 front_left=True, front_right=True, 146 front_center=True, 147 side_left=True, side_right=True, 148 back_center=True, low_frequency=True) 149 elif self.channels() == 8: 150 return ChannelMask.from_fields( 151 front_left=True, front_right=True, 152 side_left=True, side_right=True, 153 back_left=True, back_right=True, 154 front_center=True, low_frequency=True) 155 else: 156 return ChannelMask(0) 157 158 def total_frames(self): 159 """returns the total PCM frames of the track as an integer""" 160 161 from audiotools._ogg import PageReader 162 163 try: 164 with PageReader(open(self.filename, "rb")) as reader: 165 page = reader.read() 166 pcm_samples = page.granule_position 167 168 while not page.stream_end: 169 page = reader.read() 170 pcm_samples = max(pcm_samples, page.granule_position) 171 172 return pcm_samples 173 except (IOError, ValueError): 174 return 0 175 176 def sample_rate(self): 177 """returns the rate of the track's audio as an integer number of Hz""" 178 179 return self.__sample_rate__ 180 181 def to_pcm(self): 182 """returns a PCMReader object containing the track's PCM data""" 183 184 from audiotools.decoders import VorbisDecoder 185 186 try: 187 return VorbisDecoder(self.filename) 188 except ValueError as err: 189 from audiotools import PCMReaderError 190 return PCMReaderError(str(err), 191 self.sample_rate(), 192 self.channels(), 193 int(self.channel_mask()), 194 self.bits_per_sample()) 195 196 @classmethod 197 def from_pcm(cls, filename, pcmreader, 198 compression=None, total_pcm_frames=None): 199 """encodes a new file from PCM data 200 201 takes a filename string, PCMReader object, 202 optional compression level string and 203 optional total_pcm_frames integer 204 encodes a new audio file from pcmreader's data 205 at the given filename with the specified compression level 206 and returns a new VorbisAudio object""" 207 208 from audiotools import (BufferedPCMReader, 209 __default_quality__, 210 EncodingError) 211 from audiotools.encoders import encode_vorbis 212 213 if (((compression is None) or 214 (compression not in cls.COMPRESSION_MODES))): 215 compression = __default_quality__(cls.NAME) 216 217 if (pcmreader.channels > 2) and (pcmreader.channels <= 8): 218 channel_mask = int(pcmreader.channel_mask) 219 if ((channel_mask != 0) and 220 (channel_mask not in 221 (0x7, # FR, FC, FL 222 0x33, # FR, FL, BR, BL 223 0x37, # FR, FC, FL, BL, BR 224 0x3f, # FR, FC, FL, BL, BR, LFE 225 0x70f, # FL, FC, FR, SL, SR, BC, LFE 226 0x63f))): # FL, FC, FR, SL, SR, BL, BR, LFE 227 from audiotools import UnsupportedChannelMask 228 raise UnsupportedChannelMask(filename, channel_mask) 229 230 if total_pcm_frames is not None: 231 from audiotools import CounterPCMReader 232 pcmreader = CounterPCMReader(pcmreader) 233 try: 234 235 encode_vorbis(filename, 236 BufferedPCMReader(pcmreader), 237 float(compression) / 10) 238 239 if ((total_pcm_frames is not None) and 240 (total_pcm_frames != pcmreader.frames_written)): 241 from audiotools.text import ERR_TOTAL_PCM_FRAMES_MISMATCH 242 cls.__unlink__(filename) 243 raise EncodingError(ERR_TOTAL_PCM_FRAMES_MISMATCH) 244 245 return VorbisAudio(filename) 246 except (ValueError, IOError) as err: 247 cls.__unlink__(filename) 248 raise EncodingError(str(err)) 249 finally: 250 pcmreader.close() 251 252 def update_metadata(self, metadata): 253 """takes this track's current MetaData object 254 as returned by get_metadata() and sets this track's metadata 255 with any fields updated in that object 256 257 raises IOError if unable to write the file 258 """ 259 260 import os 261 from audiotools import TemporaryFile 262 from audiotools.ogg import (PageReader, 263 PacketReader, 264 PageWriter, 265 packet_to_pages, 266 packets_to_pages) 267 from audiotools.vorbiscomment import VorbisComment 268 from audiotools.bitstream import BitstreamRecorder 269 270 if metadata is None: 271 return 272 elif not isinstance(metadata, VorbisComment): 273 from audiotools.text import ERR_FOREIGN_METADATA 274 raise ValueError(ERR_FOREIGN_METADATA) 275 elif not os.access(self.filename, os.W_OK): 276 raise IOError(self.filename) 277 278 original_ogg = PacketReader(PageReader(open(self.filename, "rb"))) 279 new_ogg = PageWriter(TemporaryFile(self.filename)) 280 281 sequence_number = 0 282 283 # transfer current file's identification packet in its own page 284 identification_packet = original_ogg.read_packet() 285 for (i, page) in enumerate(packet_to_pages( 286 identification_packet, 287 self.__serial_number__, 288 starting_sequence_number=sequence_number)): 289 page.stream_beginning = (i == 0) 290 new_ogg.write(page) 291 sequence_number += 1 292 293 # discard the current file's comment packet 294 comment_packet = original_ogg.read_packet() 295 296 # generate new comment packet 297 comment_writer = BitstreamRecorder(True) 298 comment_writer.build("8u 6b", (3, b"vorbis")) 299 vendor_string = metadata.vendor_string.encode('utf-8') 300 comment_writer.build("32u %db" % (len(vendor_string)), 301 (len(vendor_string), vendor_string)) 302 comment_writer.write(32, len(metadata.comment_strings)) 303 for comment_string in metadata.comment_strings: 304 comment_string = comment_string.encode('utf-8') 305 comment_writer.build("32u %db" % (len(comment_string)), 306 (len(comment_string), comment_string)) 307 308 comment_writer.build("1u a", (1,)) # framing bit 309 310 # transfer codebooks packet from original file to new file 311 codebooks_packet = original_ogg.read_packet() 312 313 for page in packets_to_pages( 314 [comment_writer.data(), codebooks_packet], 315 self.__serial_number__, 316 starting_sequence_number=sequence_number): 317 new_ogg.write(page) 318 sequence_number += 1 319 320 # transfer remaining pages after re-sequencing 321 page = original_ogg.read_page() 322 page.sequence_number = sequence_number 323 sequence_number += 1 324 new_ogg.write(page) 325 while not page.stream_end: 326 page = original_ogg.read_page() 327 page.sequence_number = sequence_number 328 page.bitstream_serial_number = self.__serial_number__ 329 sequence_number += 1 330 new_ogg.write(page) 331 332 original_ogg.close() 333 new_ogg.close() 334 335 def set_metadata(self, metadata): 336 """takes a MetaData object and sets this track's metadata 337 338 this metadata includes track name, album name, and so on 339 raises IOError if unable to write the file""" 340 341 from audiotools.vorbiscomment import VorbisComment 342 343 if metadata is None: 344 return self.delete_metadata() 345 346 metadata = VorbisComment.converted(metadata) 347 348 old_metadata = self.get_metadata() 349 350 metadata.vendor_string = old_metadata.vendor_string 351 352 # remove REPLAYGAIN_* tags from new metadata (if any) 353 for key in [u"REPLAYGAIN_TRACK_GAIN", 354 u"REPLAYGAIN_TRACK_PEAK", 355 u"REPLAYGAIN_ALBUM_GAIN", 356 u"REPLAYGAIN_ALBUM_PEAK", 357 u"REPLAYGAIN_REFERENCE_LOUDNESS"]: 358 try: 359 metadata[key] = old_metadata[key] 360 except KeyError: 361 metadata[key] = [] 362 363 self.update_metadata(metadata) 364 365 @classmethod 366 def supports_metadata(cls): 367 """returns True if this audio type supports MetaData""" 368 369 return True 370 371 def get_metadata(self): 372 """returns a MetaData object, or None 373 374 raises IOError if unable to read the file""" 375 376 from io import BytesIO 377 from audiotools.bitstream import BitstreamReader 378 from audiotools.ogg import PacketReader, PageReader 379 from audiotools.vorbiscomment import VorbisComment 380 381 with PacketReader(PageReader(open(self.filename, "rb"))) as reader: 382 identification = reader.read_packet() 383 comment = BitstreamReader(BytesIO(reader.read_packet()), True) 384 385 (packet_type, packet_header) = comment.parse("8u 6b") 386 if (packet_type == 3) and (packet_header == b'vorbis'): 387 vendor_string = \ 388 comment.read_bytes(comment.read(32)).decode('utf-8') 389 comment_strings = [ 390 comment.read_bytes(comment.read(32)).decode('utf-8') 391 for i in range(comment.read(32))] 392 if comment.read(1) == 1: # framing bit 393 return VorbisComment(comment_strings, vendor_string) 394 else: 395 return None 396 else: 397 return None 398 399 def delete_metadata(self): 400 """deletes the track's MetaData 401 402 this removes or unsets tags as necessary in order to remove all data 403 raises IOError if unable to write the file""" 404 405 from audiotools import MetaData 406 407 # the vorbis comment packet is required, 408 # so simply zero out its contents 409 self.set_metadata(MetaData()) 410 411 @classmethod 412 def supports_replay_gain(cls): 413 """returns True if this class supports ReplayGain""" 414 415 return True 416 417 def get_replay_gain(self): 418 """returns a ReplayGain object of our ReplayGain values 419 420 returns None if we have no values""" 421 422 from audiotools import ReplayGain 423 424 vorbis_metadata = self.get_metadata() 425 426 if ((vorbis_metadata is not None) and 427 ({u'REPLAYGAIN_TRACK_PEAK', 428 u'REPLAYGAIN_TRACK_GAIN', 429 u'REPLAYGAIN_ALBUM_PEAK', 430 u'REPLAYGAIN_ALBUM_GAIN'}.issubset(vorbis_metadata.keys()))): 431 # we have ReplayGain data 432 try: 433 return ReplayGain( 434 vorbis_metadata[u'REPLAYGAIN_TRACK_GAIN'][0][0:-len(u" dB")], 435 vorbis_metadata[u'REPLAYGAIN_TRACK_PEAK'][0], 436 vorbis_metadata[u'REPLAYGAIN_ALBUM_GAIN'][0][0:-len(u" dB")], 437 vorbis_metadata[u'REPLAYGAIN_ALBUM_PEAK'][0]) 438 except (IndexError, ValueError): 439 return None 440 else: 441 return None 442 443 def set_replay_gain(self, replaygain): 444 """given a ReplayGain object, sets the track's gain to those values 445 446 may raise IOError if unable to modify the file""" 447 448 if replaygain is None: 449 return self.delete_replay_gain() 450 451 vorbis_comment = self.get_metadata() 452 if vorbis_comment is None: 453 from audiotools.vorbiscomment import VorbisComment 454 from audiotools import VERSION 455 456 vorbis_comment = VorbisComment( 457 [], u"Python Audio Tools %s" % (VERSION)) 458 459 vorbis_comment[u"REPLAYGAIN_TRACK_GAIN"] = [ 460 u"%1.2f dB" % (replaygain.track_gain)] 461 vorbis_comment[u"REPLAYGAIN_TRACK_PEAK"] = [ 462 u"%1.8f" % (replaygain.track_peak)] 463 vorbis_comment[u"REPLAYGAIN_ALBUM_GAIN"] = [ 464 u"%1.2f dB" % (replaygain.album_gain)] 465 vorbis_comment[u"REPLAYGAIN_ALBUM_PEAK"] = [ 466 u"%1.8f" % (replaygain.album_peak)] 467 vorbis_comment[u"REPLAYGAIN_REFERENCE_LOUDNESS"] = [u"89.0 dB"] 468 469 self.update_metadata(vorbis_comment) 470 471 def delete_replay_gain(self): 472 """removes ReplayGain values from file, if any 473 474 may raise IOError if unable to modify the file""" 475 476 vorbis_comment = self.get_metadata() 477 if vorbis_comment is not None: 478 for field in [u"REPLAYGAIN_TRACK_GAIN", 479 u"REPLAYGAIN_TRACK_PEAK", 480 u"REPLAYGAIN_ALBUM_GAIN", 481 u"REPLAYGAIN_ALBUM_PEAK", 482 u"REPLAYGAIN_REFERENCE_LOUDNESS"]: 483 try: 484 del(vorbis_comment[field]) 485 except KeyError: 486 pass 487 488 self.update_metadata(vorbis_comment) 489 490 @classmethod 491 def available(cls, system_binaries): 492 """returns True if all necessary compenents are available 493 to support format""" 494 495 try: 496 from audiotools.decoders import VorbisDecoder 497 from audiotools.encoders import encode_vorbis 498 499 return True 500 except ImportError: 501 return False 502 503 @classmethod 504 def missing_components(cls, messenger): 505 """given a Messenger object, displays missing binaries or libraries 506 needed to support this format and where to get them""" 507 508 from audiotools.text import (ERR_LIBRARY_NEEDED, 509 ERR_LIBRARY_DOWNLOAD_URL, 510 ERR_PROGRAM_PACKAGE_MANAGER) 511 512 format_ = cls.NAME.decode('ascii') 513 514 # display where to get vorbisfile 515 messenger.info( 516 ERR_LIBRARY_NEEDED % 517 {"library": u"\"libvorbisfile\"", 518 "format": format_}) 519 messenger.info( 520 ERR_LIBRARY_DOWNLOAD_URL % 521 {"library": u"libvorbisfile", 522 "url": "http://www.xiph.org/"}) 523 524 messenger.info(ERR_PROGRAM_PACKAGE_MANAGER) 525 526 527class VorbisChannelMask(ChannelMask): 528 """the Vorbis-specific channel mapping""" 529 530 def __repr__(self): 531 return "VorbisChannelMask(%s)" % \ 532 ",".join(["%s=%s" % (field, getattr(self, field)) 533 for field in self.SPEAKER_TO_MASK.keys() 534 if (getattr(self, field))]) 535 536 def channels(self): 537 """returns a list of speaker strings this mask contains 538 539 returned in the order in which they should appear 540 in the PCM stream 541 """ 542 543 count = len(self) 544 if count == 1: 545 return ["front_center"] 546 elif count == 2: 547 return ["front_left", "front_right"] 548 elif count == 3: 549 return ["front_left", "front_center", "front_right"] 550 elif count == 4: 551 return ["front_left", "front_right", 552 "back_left", "back_right"] 553 elif count == 5: 554 return ["front_left", "front_center", "front_right", 555 "back_left", "back_right"] 556 elif count == 6: 557 return ["front_left", "front_center", "front_right", 558 "back_left", "back_right", "low_frequency"] 559 elif count == 7: 560 return ["front_left", "front_center", "front_right", 561 "side_left", "side_right", "back_center", 562 "low_frequency"] 563 elif count == 8: 564 return ["front_left", "front_center", "front_right", 565 "side_left", "side_right", 566 "back_left", "back_right", "low_frequency"] 567 else: 568 return [] 569