1#!/usr/bin/python 2 3# Audio Tools, a module and set of tools for manipulating audio data 4# Copyright (C) 2007-2014 Brian Langenberger 5 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 19 20 21from audiotools import WaveContainer, InvalidFile 22from audiotools.ape import ApeTaggedAudio, ApeGainedAudio 23 24 25class InvalidWavPack(InvalidFile): 26 pass 27 28 29def __riff_chunk_ids__(data_size, data): 30 (riff, size, wave) = data.parse("4b 32u 4b") 31 if riff != b"RIFF": 32 return 33 elif wave != b"WAVE": 34 return 35 else: 36 data_size -= 12 37 38 while data_size > 0: 39 (chunk_id, chunk_size) = data.parse("4b 32u") 40 data_size -= 8 41 if (chunk_size % 2) == 1: 42 chunk_size += 1 43 yield chunk_id 44 if chunk_id != b"data": 45 data.skip_bytes(chunk_size) 46 data_size -= chunk_size 47 48 49class WavPackAudio(ApeTaggedAudio, ApeGainedAudio, WaveContainer): 50 """a WavPack audio file""" 51 52 from audiotools.text import (COMP_WAVPACK_VERYFAST, 53 COMP_WAVPACK_VERYHIGH) 54 55 SUFFIX = "wv" 56 NAME = SUFFIX 57 DESCRIPTION = u"WavPack" 58 DEFAULT_COMPRESSION = "standard" 59 COMPRESSION_MODES = ("veryfast", "fast", "standard", "high", "veryhigh") 60 COMPRESSION_DESCRIPTIONS = {"veryfast": COMP_WAVPACK_VERYFAST, 61 "veryhigh": COMP_WAVPACK_VERYHIGH} 62 63 BITS_PER_SAMPLE = (8, 16, 24, 32) 64 SAMPLING_RATE = (6000, 8000, 9600, 11025, 65 12000, 16000, 22050, 24000, 66 32000, 44100, 48000, 64000, 67 88200, 96000, 192000, 0) 68 69 __options__ = {"veryfast": {"block_size": 44100, 70 "joint_stereo": True, 71 "false_stereo": True, 72 "wasted_bits": True, 73 "correlation_passes": 1}, 74 "fast": {"block_size": 44100, 75 "joint_stereo": True, 76 "false_stereo": True, 77 "wasted_bits": True, 78 "correlation_passes": 2}, 79 "standard": {"block_size": 44100, 80 "joint_stereo": True, 81 "false_stereo": True, 82 "wasted_bits": True, 83 "correlation_passes": 5}, 84 "high": {"block_size": 44100, 85 "joint_stereo": True, 86 "false_stereo": True, 87 "wasted_bits": True, 88 "correlation_passes": 10}, 89 "veryhigh": {"block_size": 44100, 90 "joint_stereo": True, 91 "false_stereo": True, 92 "wasted_bits": True, 93 "correlation_passes": 16}} 94 95 def __init__(self, filename): 96 """filename is a plain string""" 97 98 WaveContainer.__init__(self, filename) 99 self.__samplerate__ = 0 100 self.__channels__ = 0 101 self.__bitspersample__ = 0 102 self.__total_frames__ = 0 103 104 try: 105 self.__read_info__() 106 except IOError as msg: 107 raise InvalidWavPack(str(msg)) 108 109 def lossless(self): 110 """returns True""" 111 112 return True 113 114 def channel_mask(self): 115 """returns a ChannelMask object of this track's channel layout""" 116 117 return self.__channel_mask__ 118 119 @classmethod 120 def supports_metadata(cls): 121 """returns True if this audio type supports MetaData""" 122 123 return True 124 125 def get_metadata(self): 126 """returns a MetaData object, or None 127 128 raises IOError if unable to read the file""" 129 130 metadata = ApeTaggedAudio.get_metadata(self) 131 if metadata is not None: 132 metadata.frame_count = self.total_frames() 133 return metadata 134 135 def has_foreign_wave_chunks(self): 136 """returns True if the audio file contains non-audio RIFF chunks 137 138 during transcoding, if the source audio file has foreign RIFF chunks 139 and the target audio format supports foreign RIFF chunks, 140 conversion should be routed through .wav conversion 141 to avoid losing those chunks""" 142 143 for (sub_header, nondecoder, data_size, data) in self.sub_blocks(): 144 if (sub_header == 1) and nondecoder: 145 if (set(__riff_chunk_ids__(data_size, 146 data)) != {b"fmt ", b"data"}): 147 return True 148 elif (sub_header == 2) and nondecoder: 149 return True 150 else: 151 return False 152 153 def wave_header_footer(self): 154 """returns (header, footer) tuple of strings 155 containing all data before and after the PCM stream 156 157 may raise ValueError if there's a problem with 158 the header or footer data 159 may raise IOError if there's a problem reading 160 header or footer data from the file 161 """ 162 163 head = None 164 tail = None 165 166 for (sub_block_id, nondecoder, data_size, data) in self.sub_blocks(): 167 if (sub_block_id == 1) and nondecoder: 168 head = data.read_bytes(data_size) 169 elif (sub_block_id == 2) and nondecoder: 170 tail = data.read_bytes(data_size) 171 172 if head is not None: 173 return (head, tail if tail is not None else b"") 174 else: 175 raise ValueError("no wave header found") 176 177 @classmethod 178 def from_wave(cls, filename, header, pcmreader, footer, compression=None, 179 encoding_function=None): 180 """encodes a new file from wave data 181 182 takes a filename string, header string, 183 PCMReader object, footer string 184 and optional compression level string 185 encodes a new audio file from pcmreader's data 186 at the given filename with the specified compression level 187 and returns a new WaveAudio object 188 189 header + pcm data + footer should always result 190 in the original wave file being restored 191 without need for any padding bytes 192 193 may raise EncodingError if some problem occurs when 194 encoding the input file""" 195 196 from audiotools.encoders import encode_wavpack 197 from audiotools import BufferedPCMReader 198 from audiotools import CounterPCMReader 199 from audiotools.wav import (validate_header, validate_footer) 200 from audiotools import EncodingError 201 from audiotools import __default_quality__ 202 203 if (((compression is None) or 204 (compression not in cls.COMPRESSION_MODES))): 205 compression = __default_quality__(cls.NAME) 206 207 # ensure header is valid 208 try: 209 (total_size, data_size) = validate_header(header) 210 except ValueError as err: 211 raise EncodingError(str(err)) 212 213 counter = CounterPCMReader(pcmreader) 214 215 try: 216 (encode_wavpack if encoding_function is None 217 else encoding_function)(filename, 218 BufferedPCMReader(counter), 219 wave_header=header, 220 wave_footer=footer, 221 **cls.__options__[compression]) 222 223 counter.close() 224 225 data_bytes_written = counter.bytes_written() 226 227 # ensure output data size matches the "data" chunk's size 228 if data_size != data_bytes_written: 229 from audiotools.text import ERR_WAV_TRUNCATED_DATA_CHUNK 230 raise EncodingError(ERR_WAV_TRUNCATED_DATA_CHUNK) 231 232 # ensure footer validates correctly 233 try: 234 validate_footer(footer, data_bytes_written) 235 except ValueError as err: 236 raise EncodingError(str(err)) 237 238 # ensure total size is correct 239 if (len(header) + data_size + len(footer)) != total_size: 240 from audiotools.text import ERR_WAV_INVALID_SIZE 241 raise EncodingError(ERR_WAV_INVALID_SIZE) 242 243 return cls(filename) 244 except (ValueError, IOError) as msg: 245 counter.close() 246 cls.__unlink__(filename) 247 raise EncodingError(str(msg)) 248 except Exception as err: 249 counter.close() 250 cls.__unlink__(filename) 251 raise err 252 253 def blocks(self, reader=None): 254 """yields (length, reader) tuples of WavPack frames 255 256 length is the total length of all the substreams 257 reader is a BitstreamReader which can be parsed 258 """ 259 260 def blocks_iter(reader): 261 try: 262 while True: 263 (wvpk, block_size) = reader.parse("4b 32u 192p") 264 if wvpk == b"wvpk": 265 yield (block_size - 24, 266 reader.substream(block_size - 24)) 267 else: 268 return 269 except IOError: 270 return 271 272 if reader is None: 273 from audiotools.bitstream import BitstreamReader 274 275 with BitstreamReader(open(self.filename, "rb"), True) as reader: 276 for block in blocks_iter(reader): 277 yield block 278 else: 279 for block in blocks_iter(reader): 280 yield block 281 282 def sub_blocks(self, reader=None): 283 """yields (function, nondecoder, data_size, data) tuples 284 285 function is an integer 286 nondecoder is a boolean indicating non-decoder data 287 data is a BitstreamReader which can be parsed 288 """ 289 290 for (block_size, block_data) in self.blocks(reader): 291 while block_size > 0: 292 (metadata_function, 293 nondecoder_data, 294 actual_size_1_less, 295 large_block) = block_data.parse("5u 1u 1u 1u") 296 297 if large_block: 298 sub_block_size = block_data.read(24) 299 block_size -= 4 300 else: 301 sub_block_size = block_data.read(8) 302 block_size -= 2 303 304 if actual_size_1_less: 305 yield (metadata_function, 306 nondecoder_data, 307 sub_block_size * 2 - 1, 308 block_data.substream(sub_block_size * 2 - 1)) 309 block_data.skip(8) 310 else: 311 yield (metadata_function, 312 nondecoder_data, 313 sub_block_size * 2, 314 block_data.substream(sub_block_size * 2)) 315 316 block_size -= sub_block_size * 2 317 318 def __read_info__(self): 319 from audiotools.bitstream import BitstreamReader 320 from audiotools import ChannelMask 321 322 with BitstreamReader(open(self.filename, "rb"), True) as reader: 323 pos = reader.getpos() 324 325 (block_id, 326 total_samples, 327 bits_per_sample, 328 mono_output, 329 initial_block, 330 final_block, 331 sample_rate) = reader.parse( 332 "4b 64p 32u 64p 2u 1u 8p 1u 1u 5p 5p 4u 37p") 333 334 if block_id != b"wvpk": 335 from audiotools.text import ERR_WAVPACK_INVALID_HEADER 336 raise InvalidWavPack(ERR_WAVPACK_INVALID_HEADER) 337 338 if sample_rate != 0xF: 339 self.__samplerate__ = WavPackAudio.SAMPLING_RATE[sample_rate] 340 else: 341 # if unknown, pull from SAMPLE_RATE sub-block 342 for (block_id, 343 nondecoder, 344 data_size, 345 data) in self.sub_blocks(reader): 346 if (block_id == 0x7) and nondecoder: 347 self.__samplerate__ = data.read(data_size * 8) 348 break 349 else: 350 # no SAMPLE RATE sub-block found 351 # so pull info from FMT chunk 352 reader.setpos(pos) 353 (self.__samplerate__,) = self.fmt_chunk(reader).parse( 354 "32p 32u") 355 356 self.__bitspersample__ = [8, 16, 24, 32][bits_per_sample] 357 self.__total_frames__ = total_samples 358 359 if initial_block and final_block: 360 if mono_output: 361 self.__channels__ = 1 362 self.__channel_mask__ = ChannelMask(0x4) 363 else: 364 self.__channels__ = 2 365 self.__channel_mask__ = ChannelMask(0x3) 366 else: 367 # if not mono or stereo, pull from CHANNEL INFO sub-block 368 reader.setpos(pos) 369 for (block_id, 370 nondecoder, 371 data_size, 372 data) in self.sub_blocks(reader): 373 if (block_id == 0xD) and not nondecoder: 374 self.__channels__ = data.read(8) 375 self.__channel_mask__ = ChannelMask( 376 data.read((data_size - 1) * 8)) 377 break 378 else: 379 # no CHANNEL INFO sub-block found 380 # so pull info from FMT chunk 381 reader.setpos(pos) 382 fmt = self.fmt_chunk(reader) 383 compression_code = fmt.read(16) 384 self.__channels__ = fmt.read(16) 385 if compression_code == 1: 386 # this is theoretically possible 387 # with very old .wav files, 388 # but shouldn't happen in practice 389 self.__channel_mask__ = \ 390 {1: ChannelMask.from_fields(front_center=True), 391 2: ChannelMask.from_fields(front_left=True, 392 front_right=True), 393 3: ChannelMask.from_fields(front_left=True, 394 front_right=True, 395 front_center=True), 396 4: ChannelMask.from_fields(front_left=True, 397 front_right=True, 398 back_left=True, 399 back_right=True), 400 5: ChannelMask.from_fields(front_left=True, 401 front_right=True, 402 back_left=True, 403 back_right=True, 404 front_center=True), 405 6: ChannelMask.from_fields(front_left=True, 406 front_right=True, 407 back_left=True, 408 back_right=True, 409 front_center=True, 410 low_frequency=True) 411 }.get(self.__channels__, ChannelMask(0)) 412 elif compression_code == 0xFFFE: 413 fmt.skip(128) 414 mask = fmt.read(32) 415 self.__channel_mask__ = ChannelMask(mask) 416 else: 417 from audiotools.text import ERR_WAVPACK_UNSUPPORTED_FMT 418 raise InvalidWavPack(ERR_WAVPACK_UNSUPPORTED_FMT) 419 420 def bits_per_sample(self): 421 """returns an integer number of bits-per-sample this track contains""" 422 423 return self.__bitspersample__ 424 425 def channels(self): 426 """returns an integer number of channels this track contains""" 427 428 return self.__channels__ 429 430 def total_frames(self): 431 """returns the total PCM frames of the track as an integer""" 432 433 return self.__total_frames__ 434 435 def sample_rate(self): 436 """returns the rate of the track's audio as an integer number of Hz""" 437 438 return self.__samplerate__ 439 440 def seekable(self): 441 """returns True if the file is seekable""" 442 443 return True 444 445 @classmethod 446 def from_pcm(cls, filename, pcmreader, 447 compression=None, 448 total_pcm_frames=None, 449 encoding_function=None): 450 """encodes a new file from PCM data 451 452 takes a filename string, PCMReader object, 453 optional compression level string and 454 optional total_pcm_frames integer 455 encodes a new audio file from pcmreader's data 456 at the given filename with the specified compression level 457 and returns a new WavPackAudio object""" 458 459 from audiotools.encoders import encode_wavpack 460 from audiotools import BufferedPCMReader 461 from audiotools import CounterPCMReader 462 from audiotools import EncodingError 463 from audiotools import __default_quality__ 464 465 if (((compression is None) or 466 (compression not in cls.COMPRESSION_MODES))): 467 compression = __default_quality__(cls.NAME) 468 469 counter = CounterPCMReader(pcmreader) 470 471 try: 472 (encode_wavpack if encoding_function is None 473 else encoding_function)( 474 filename, 475 BufferedPCMReader(counter), 476 total_pcm_frames=(total_pcm_frames if 477 total_pcm_frames is not None else 0), 478 **cls.__options__[compression]) 479 counter.close() 480 except (ValueError, IOError) as msg: 481 counter.close() 482 cls.__unlink__(filename) 483 raise EncodingError(str(msg)) 484 except Exception: 485 counter.close() 486 cls.__unlink__(filename) 487 raise 488 489 # ensure actual total PCM frames matches argument, if any 490 if (((total_pcm_frames is not None) and 491 (counter.frames_written != total_pcm_frames))): 492 cls.__unlink__(filename) 493 from audiotools.text import ERR_TOTAL_PCM_FRAMES_MISMATCH 494 raise EncodingError(ERR_TOTAL_PCM_FRAMES_MISMATCH) 495 496 return cls(filename) 497 498 def to_pcm(self): 499 """returns a PCMReader object containing the track's PCM data""" 500 501 from audiotools import decoders 502 from audiotools import PCMReaderError 503 504 try: 505 f = open(self.filename, "rb") 506 except IOError as msg: 507 return PCMReaderError(error_message=str(msg), 508 sample_rate=self.__samplerate__, 509 channels=self.__channels__, 510 channel_mask=int(self.channel_mask()), 511 bits_per_sample=self.__bitspersample__) 512 513 try: 514 return decoders.WavPackDecoder(f) 515 except (IOError, ValueError) as msg: 516 f.close() 517 return PCMReaderError(error_message=str(msg), 518 sample_rate=self.__samplerate__, 519 channels=self.__channels__, 520 channel_mask=int(self.channel_mask()), 521 bits_per_sample=self.__bitspersample__) 522 523 def fmt_chunk(self, reader=None): 524 """returns the 'fmt' chunk as a BitstreamReader""" 525 526 for (block_id, 527 nondecoder, 528 data_size, 529 data) in self.sub_blocks(reader): 530 if (block_id == 1) and nondecoder: 531 (riff, wave) = data.parse("4b 32p 4b") 532 if (riff != b"RIFF") or (wave != b"WAVE"): 533 from audiotools.text import ERR_WAVPACK_INVALID_FMT 534 raise InvalidWavPack(ERR_WAVPACK_INVALID_FMT) 535 else: 536 while True: 537 (chunk_id, chunk_size) = data.parse("4b 32u") 538 if chunk_id == b"fmt ": 539 return data.substream(chunk_size) 540 elif chunk_id == b"data": 541 from audiotools.text import ERR_WAVPACK_INVALID_FMT 542 raise InvalidWavPack(ERR_WAVPACK_INVALID_FMT) 543 else: 544 data.skip_bytes(chunk_size) 545 else: 546 from audiotools.text import ERR_WAVPACK_NO_FMT 547 raise InvalidWavPack(ERR_WAVPACK_NO_FMT) 548 549 @classmethod 550 def supports_cuesheet(cls): 551 return True 552 553 def get_cuesheet(self): 554 """returns the embedded Cuesheet-compatible object, or None 555 556 raises IOError if a problem occurs when reading the file""" 557 558 from audiotools import cue as cue 559 from audiotools import SheetException 560 561 metadata = self.get_metadata() 562 563 if (metadata is not None) and (b'Cuesheet' in metadata.keys()): 564 try: 565 return cue.read_cuesheet_string( 566 metadata[b'Cuesheet'].__unicode__()) 567 except SheetException: 568 # unlike FLAC, just because a cuesheet is embedded 569 # does not mean it is compliant 570 return None 571 else: 572 return None 573 574 def set_cuesheet(self, cuesheet): 575 """imports cuesheet data from a Sheet object 576 577 Raises IOError if an error occurs setting the cuesheet""" 578 579 import os.path 580 from io import BytesIO 581 from audiotools import (MetaData, Filename, FS_ENCODING) 582 from audiotools import cue as cue 583 from audiotools.cue import write_cuesheet 584 from audiotools.ape import ApeTag 585 586 if cuesheet is None: 587 return self.delete_cuesheet() 588 589 metadata = self.get_metadata() 590 if metadata is None: 591 metadata = ApeTag([]) 592 593 cuesheet_data = BytesIO() 594 write_cuesheet(cuesheet, 595 u"%s" % (Filename(self.filename).basename(),), 596 cuesheet_data) 597 598 metadata[b'Cuesheet'] = ApeTag.ITEM.string( 599 b'Cuesheet', 600 cuesheet_data.getvalue().decode(FS_ENCODING, 'replace')) 601 602 self.update_metadata(metadata) 603 604 def delete_cuesheet(self): 605 """deletes embedded Sheet object, if any 606 607 Raises IOError if a problem occurs when updating the file""" 608 609 metadata = self.get_metadata() 610 if (metadata is not None) and (b'Cuesheet' in metadata): 611 del(metadata[b'Cuesheet']) 612 self.update_metadata(metadata) 613