1#!/usr/bin/python 2 3# Audio Tools, a module and set of tools for manipulating audio data 4# Copyright (C) 2007-2014 Brian Langenberger 5 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 19 20from audiotools import (AudioFile, InvalidFile) 21from audiotools.vorbis import (VorbisAudio, VorbisChannelMask) 22from audiotools.vorbiscomment import VorbisComment 23 24 25class InvalidOpus(InvalidFile): 26 pass 27 28 29####################### 30# Vorbis File 31####################### 32 33class OpusAudio(VorbisAudio): 34 """an Opus file""" 35 36 SUFFIX = "opus" 37 NAME = "opus" 38 DESCRIPTION = u"Opus Audio Codec" 39 DEFAULT_COMPRESSION = "10" 40 COMPRESSION_MODES = tuple(map(str, range(0, 11))) 41 COMPRESSION_DESCRIPTIONS = {"0": u"lowest quality, fastest encode", 42 "10": u"best quality, slowest encode"} 43 44 def __init__(self, filename): 45 """filename is a plain string""" 46 47 AudioFile.__init__(self, filename) 48 self.__channels__ = 0 49 self.__channel_mask__ = 0 50 51 # get channel count and channel mask from first packet 52 from audiotools.bitstream import BitstreamReader 53 try: 54 with BitstreamReader(open(filename, "rb"), True) as ogg_reader: 55 (magic_number, 56 version, 57 header_type, 58 granule_position, 59 self.__serial_number__, 60 page_sequence_number, 61 checksum, 62 segment_count) = ogg_reader.parse( 63 "4b 8u 8u 64S 32u 32u 32u 8u") 64 65 if magic_number != b'OggS': 66 from audiotools.text import ERR_OGG_INVALID_MAGIC_NUMBER 67 raise InvalidOpus(ERR_OGG_INVALID_MAGIC_NUMBER) 68 if version != 0: 69 from audiotools.text import ERR_OGG_INVALID_VERSION 70 raise InvalidOpus(ERR_OGG_INVALID_VERSION) 71 72 segment_length = ogg_reader.read(8) 73 74 (opushead, 75 version, 76 self.__channels__, 77 pre_skip, 78 input_sample_rate, 79 output_gain, 80 mapping_family) = ogg_reader.parse( 81 "8b 8u 8u 16u 32u 16s 8u") 82 83 if opushead != b"OpusHead": 84 from audiotools.text import ERR_OPUS_INVALID_TYPE 85 raise InvalidOpus(ERR_OPUS_INVALID_TYPE) 86 if version != 1: 87 from audiotools.text import ERR_OPUS_INVALID_VERSION 88 raise InvalidOpus(ERR_OPUS_INVALID_VERSION) 89 if self.__channels__ == 0: 90 from audiotools.text import ERR_OPUS_INVALID_CHANNELS 91 raise InvalidOpus(ERR_OPUS_INVALID_CHANNELS) 92 93 # FIXME - assign channel mask from mapping family 94 if mapping_family == 0: 95 if self.__channels__ == 1: 96 self.__channel_mask__ = VorbisChannelMask(0x4) 97 elif self.__channels__ == 2: 98 self.__channel_mask__ = VorbisChannelMask(0x3) 99 else: 100 self.__channel_mask__ = VorbisChannelMask(0) 101 else: 102 (stream_count, 103 coupled_stream_count) = ogg_reader.parse("8u 8u") 104 if (self.__channels__ != 105 ((coupled_stream_count * 2) + 106 (stream_count - coupled_stream_count))): 107 from audiotools.text import ERR_OPUS_INVALID_CHANNELS 108 raise InvalidOpus(ERR_OPUS_INVALID_CHANNELS) 109 channel_mapping = [ogg_reader.read(8) 110 for i in range(self.__channels__)] 111 except IOError as msg: 112 raise InvalidOpus(str(msg)) 113 114 @classmethod 115 def supports_replay_gain(cls): 116 """returns True if this class supports ReplayGain""" 117 118 return False 119 120 def get_replay_gain(self): 121 """returns a ReplayGain object of our ReplayGain values 122 123 returns None if we have no values 124 125 may raise IOError if unable to read the file""" 126 127 return None 128 129 def set_replay_gain(self, replaygain): 130 """given a ReplayGain object, sets the track's gain to those values 131 132 may raise IOError if unable to modify the file""" 133 134 pass 135 136 def delete_replay_gain(self): 137 """removes ReplayGain values from file, if any 138 139 may raise IOError if unable to modify the file""" 140 141 pass 142 143 def total_frames(self): 144 """returns the total PCM frames of the track as an integer""" 145 146 from audiotools._ogg import PageReader 147 148 try: 149 with PageReader(open(self.filename, "rb")) as reader: 150 page = reader.read() 151 pcm_samples = page.granule_position 152 153 while not page.stream_end: 154 page = reader.read() 155 pcm_samples = max(pcm_samples, page.granule_position) 156 157 return pcm_samples 158 except (IOError, ValueError): 159 return 0 160 161 def sample_rate(self): 162 """returns the rate of the track's audio as an integer number of Hz""" 163 164 return 48000 165 166 def to_pcm(self): 167 """returns a PCMReader object containing the track's PCM data 168 169 if an error occurs initializing a decoder, this should 170 return a PCMReaderError with an appropriate error message""" 171 172 from audiotools.decoders import OpusDecoder 173 174 try: 175 return OpusDecoder(self.filename) 176 except ValueError as err: 177 from audiotools import PCMReaderError 178 return PCMReaderError(error_message=str(err), 179 sample_rate=self.sample_rate(), 180 channels=self.channels(), 181 channel_mask=int(self.channel_mask()), 182 bits_per_sample=self.bits_per_sample()) 183 184 @classmethod 185 def from_pcm(cls, filename, pcmreader, 186 compression=None, total_pcm_frames=None): 187 """encodes a new file from PCM data 188 189 takes a filename string, PCMReader object, 190 optional compression level string and 191 optional total_pcm_frames integer 192 encodes a new audio file from pcmreader's data 193 at the given filename with the specified compression level 194 and returns a new AudioFile-compatible object 195 196 may raise EncodingError if some problem occurs when 197 encoding the input file. This includes an error 198 in the input stream, a problem writing the output file, 199 or even an EncodingError subclass such as 200 "UnsupportedBitsPerSample" if the input stream 201 is formatted in a way this class is unable to support 202 """ 203 204 from audiotools import (BufferedPCMReader, 205 PCMConverter, 206 __default_quality__, 207 EncodingError) 208 from audiotools.encoders import encode_opus 209 210 if (((compression is None) or 211 (compression not in cls.COMPRESSION_MODES))): 212 compression = __default_quality__(cls.NAME) 213 214 if (pcmreader.channels > 2) and (pcmreader.channels <= 8): 215 if ((pcmreader.channel_mask != 0) and 216 (pcmreader.channel_mask not in 217 {0x7, # FR, FC, FL 218 0x33, # FR, FL, BR, BL 219 0x37, # FR, FC, FL, BL, BR 220 0x3f, # FR, FC, FL, BL, BR, LFE 221 0x70f, # FL, FC, FR, SL, SR, BC, LFE 222 0x63f})): # FL, FC, FR, SL, SR, BL, BR, LFE 223 from audiotools import UnsupportedChannelMask 224 pcmreader.close() 225 raise UnsupportedChannelMask(filename, channel_mask) 226 227 try: 228 if total_pcm_frames is not None: 229 from audiotools import CounterPCMReader 230 pcmreader = CounterPCMReader(pcmreader) 231 232 encode_opus(filename, 233 BufferedPCMReader( 234 PCMConverter(pcmreader, 235 sample_rate=48000, 236 channels=pcmreader.channels, 237 channel_mask=pcmreader.channel_mask, 238 bits_per_sample=16)), 239 quality=int(compression), 240 original_sample_rate=pcmreader.sample_rate) 241 242 pcmreader.close() 243 244 if ((total_pcm_frames is not None) and 245 (total_pcm_frames != pcmreader.frames_written)): 246 from audiotools.text import ERR_TOTAL_PCM_FRAMES_MISMATCH 247 cls.__unlink__(filename) 248 raise EncodingError(ERR_TOTAL_PCM_FRAMES_MISMATCH) 249 250 return cls(filename) 251 except (ValueError, IOError) as err: 252 pcmreader.close() 253 cls.__unlink__(filename) 254 raise EncodingError(err) 255 256 def update_metadata(self, metadata): 257 """takes this track's current MetaData object 258 as returned by get_metadata() and sets this track's metadata 259 with any fields updated in that object 260 261 raises IOError if unable to write the file 262 """ 263 264 import os 265 from audiotools import TemporaryFile 266 from audiotools.ogg import (PageReader, 267 PacketReader, 268 PageWriter, 269 packet_to_pages, 270 packets_to_pages) 271 from audiotools.bitstream import BitstreamRecorder 272 273 if metadata is None: 274 return 275 elif not isinstance(metadata, VorbisComment): 276 from audiotools.text import ERR_FOREIGN_METADATA 277 raise ValueError(ERR_FOREIGN_METADATA) 278 elif not os.access(self.filename, os.W_OK): 279 raise IOError(self.filename) 280 281 original_ogg = PacketReader(PageReader(open(self.filename, "rb"))) 282 new_ogg = PageWriter(TemporaryFile(self.filename)) 283 284 sequence_number = 0 285 286 # transfer current file's identification packet in its own page 287 identification_packet = original_ogg.read_packet() 288 for (i, page) in enumerate(packet_to_pages( 289 identification_packet, 290 self.__serial_number__, 291 starting_sequence_number=sequence_number)): 292 page.stream_beginning = (i == 0) 293 new_ogg.write(page) 294 sequence_number += 1 295 296 # discard the current file's comment packet 297 comment_packet = original_ogg.read_packet() 298 299 # generate new comment packet 300 comment_writer = BitstreamRecorder(True) 301 comment_writer.write_bytes(b"OpusTags") 302 vendor_string = metadata.vendor_string.encode('utf-8') 303 comment_writer.build("32u %db" % (len(vendor_string)), 304 (len(vendor_string), vendor_string)) 305 comment_writer.write(32, len(metadata.comment_strings)) 306 for comment_string in metadata.comment_strings: 307 comment_string = comment_string.encode('utf-8') 308 comment_writer.build("32u %db" % (len(comment_string)), 309 (len(comment_string), comment_string)) 310 311 for page in packet_to_pages( 312 comment_writer.data(), 313 self.__serial_number__, 314 starting_sequence_number=sequence_number): 315 new_ogg.write(page) 316 sequence_number += 1 317 318 # transfer remaining pages after re-sequencing 319 page = original_ogg.read_page() 320 page.sequence_number = sequence_number 321 sequence_number += 1 322 new_ogg.write(page) 323 while not page.stream_end: 324 page = original_ogg.read_page() 325 page.sequence_number = sequence_number 326 page.bitstream_serial_number = self.__serial_number__ 327 sequence_number += 1 328 new_ogg.write(page) 329 330 original_ogg.close() 331 new_ogg.close() 332 333 def set_metadata(self, metadata): 334 """takes a MetaData object and sets this track's metadata 335 336 this metadata includes track name, album name, and so on 337 raises IOError if unable to write the file""" 338 339 if metadata is None: 340 return self.delete_metadata() 341 342 metadata = VorbisComment.converted(metadata) 343 344 old_metadata = self.get_metadata() 345 346 metadata.vendor_string = old_metadata.vendor_string 347 348 # port REPLAYGAIN and ENCODER from old metadata to new metadata 349 for key in [u"REPLAYGAIN_TRACK_GAIN", 350 u"REPLAYGAIN_TRACK_PEAK", 351 u"REPLAYGAIN_ALBUM_GAIN", 352 u"REPLAYGAIN_ALBUM_PEAK", 353 u"REPLAYGAIN_REFERENCE_LOUDNESS", 354 u"ENCODER"]: 355 try: 356 metadata[key] = old_metadata[key] 357 except KeyError: 358 metadata[key] = [] 359 360 self.update_metadata(metadata) 361 362 @classmethod 363 def supports_metadata(cls): 364 """returns True if this audio type supports MetaData""" 365 366 return True 367 368 def get_metadata(self): 369 """returns a MetaData object, or None 370 371 raises IOError if unable to read the file""" 372 373 from io import BytesIO 374 from audiotools.bitstream import BitstreamReader 375 from audiotools.ogg import PacketReader, PageReader 376 377 with PacketReader(PageReader(open(self.filename, "rb"))) as reader: 378 identification = reader.read_packet() 379 comment = BitstreamReader(BytesIO(reader.read_packet()), True) 380 381 if comment.read_bytes(8) == b"OpusTags": 382 vendor_string = \ 383 comment.read_bytes(comment.read(32)).decode('utf-8') 384 comment_strings = [ 385 comment.read_bytes(comment.read(32)).decode('utf-8') 386 for i in range(comment.read(32))] 387 388 return VorbisComment(comment_strings, vendor_string) 389 else: 390 return None 391 392 def delete_metadata(self): 393 """deletes the track's MetaData 394 395 this removes or unsets tags as necessary in order to remove all data 396 raises IOError if unable to write the file""" 397 398 from audiotools import MetaData 399 400 # the vorbis comment packet is required, 401 # so simply zero out its contents 402 self.set_metadata(MetaData()) 403 404 def verify(self, progress=None): 405 """verifies the current file for correctness 406 407 returns True if the file is okay 408 raises an InvalidFile with an error message if there is 409 some problem with the file""" 410 411 # Checking for a truncated Ogg stream typically involves 412 # verifying that the "end of stream" flag is set on the last 413 # Ogg page in the stream in the event that one or more whole 414 # pages is lost. But since the OpusFile decoder doesn't perform 415 # this check and doesn't provide any access to its internal 416 # Ogg decoder (unlike Vorbis), we'll perform that check externally. 417 # 418 # And since it's a fast check, we won't bother to update progress. 419 420 from audiotools.ogg import PageReader 421 import os.path 422 423 try: 424 f = open(self.filename, "rb") 425 except IOError as err: 426 raise InvalidOpus(str(err)) 427 try: 428 reader = PageReader(f) 429 except IOError as err: 430 f.close() 431 raise InvalidOpus(str(err)) 432 433 try: 434 page = reader.read() 435 while not page.stream_end: 436 page = reader.read() 437 except (IOError, ValueError) as err: 438 raise InvalidOpus(str(err)) 439 finally: 440 reader.close() 441 442 return AudioFile.verify(self, progress) 443 444 @classmethod 445 def available(cls, system_binaries): 446 """returns True if all necessary compenents are available 447 to support format""" 448 449 try: 450 from audiotools.decoders import OpusDecoder 451 from audiotools.encoders import encode_opus 452 453 return True 454 except ImportError: 455 return False 456 457 @classmethod 458 def missing_components(cls, messenger): 459 """given a Messenger object, displays missing binaries or libraries 460 needed to support this format and where to get them""" 461 462 from audiotools.text import (ERR_LIBRARY_NEEDED, 463 ERR_LIBRARY_DOWNLOAD_URL, 464 ERR_PROGRAM_PACKAGE_MANAGER) 465 466 format_ = cls.NAME.decode('ascii') 467 468 # display where to get vorbisfile 469 messenger.info( 470 ERR_LIBRARY_NEEDED % 471 {"library": u"\"libopus\" and \"opusfile\"", 472 "format": format_}) 473 messenger.info( 474 ERR_LIBRARY_DOWNLOAD_URL % 475 {"library": u"libopus and opusfile", 476 "url": "http://www.opus-codec.org/"}) 477 478 messenger.info(ERR_PROGRAM_PACKAGE_MANAGER) 479