1#!/usr/bin/python 2 3# Audio Tools, a module and set of tools for manipulating audio data 4# Copyright (C) 2007-2014 Brian Langenberger 5 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 19 20from audiotools import (AudioFile, ChannelMask, InvalidFile, 21 WaveContainer, AiffContainer) 22import sys 23import os.path 24 25 26class InvalidShorten(InvalidFile): 27 pass 28 29 30class ShortenAudio(WaveContainer, AiffContainer): 31 """a Shorten audio file""" 32 33 SUFFIX = "shn" 34 NAME = SUFFIX 35 DESCRIPTION = u"Shorten" 36 37 def __init__(self, filename): 38 """filename is a plain string""" 39 40 from audiotools.bitstream import BitstreamReader 41 from audiotools import ChannelMask 42 from io import BytesIO 43 44 def read_unsigned(r, c): 45 MSB = r.unary(1) 46 LSB = r.read(c) 47 return MSB * 2 ** c + LSB 48 49 def read_long(r): 50 return read_unsigned(r, read_unsigned(r, 2)) 51 52 WaveContainer.__init__(self, filename) 53 try: 54 reader = BitstreamReader(open(filename, "rb"), False) 55 except IOError as msg: 56 raise InvalidShorten(str(msg)) 57 try: 58 if reader.parse("4b 8u") != [b"ajkg", 2]: 59 raise InvalidShorten("invalid Shorten header") 60 61 # populate channels and bits_per_sample from Shorten header 62 (file_type, 63 self.__channels__, 64 block_length, 65 max_LPC, 66 number_of_means, 67 bytes_to_skip) = [read_long(reader) for i in range(6)] 68 69 if (1 <= file_type) and (file_type <= 2): 70 self.__bits_per_sample__ = 8 71 elif (3 <= file_type) and (file_type <= 6): 72 self.__bits_per_sample__ = 16 73 else: 74 # FIXME 75 raise InvalidShorten("unsupported Shorten file type") 76 77 # setup some default dummy metadata 78 self.__sample_rate__ = 44100 79 if self.__channels__ == 1: 80 self.__channel_mask__ = ChannelMask(0x4) 81 elif self.__channels__ == 2: 82 self.__channel_mask__ = ChannelMask(0x3) 83 else: 84 self.__channel_mask__ = ChannelMask(0) 85 self.__total_frames__ = 0 86 87 # populate sample_rate and total_frames 88 # from first VERBATIM command 89 command = read_unsigned(reader, 2) 90 if command == 9: 91 if sys.version_info[0] >= 3: 92 verbatim_bytes = \ 93 bytes([read_unsigned(reader, 8) & 0xFF 94 for i in range(read_unsigned(reader, 5))]) 95 else: 96 verbatim_bytes = \ 97 b"".join([chr(read_unsigned(reader, 8) & 0xFF) 98 for i in range(read_unsigned(reader, 5))]) 99 try: 100 wave = BitstreamReader(BytesIO(verbatim_bytes), True) 101 header = wave.read_bytes(12) 102 if (header.startswith(b"RIFF") and 103 header.endswith(b"WAVE")): 104 # got RIFF/WAVE header, 105 # so parse wave blocks as needed 106 total_size = len(verbatim_bytes) - 12 107 while total_size >= 8: 108 (chunk_id, chunk_size) = wave.parse("4b 32u") 109 total_size -= 8 110 if chunk_id == b'fmt ': 111 from audiotools.wav import parse_fmt 112 113 (channels, 114 self.__sample_rate__, 115 bits_per_sample, 116 self.__channel_mask__) = parse_fmt( 117 wave.substream(chunk_size)) 118 elif chunk_id == b'data': 119 self.__total_frames__ = \ 120 (chunk_size // 121 (self.__channels__ * 122 (self.__bits_per_sample__ // 8))) 123 else: 124 if chunk_size % 2: 125 wave.read_bytes(chunk_size + 1) 126 total_size -= (chunk_size + 1) 127 else: 128 wave.read_bytes(chunk_size) 129 total_size -= chunk_size 130 except (IOError, ValueError): 131 pass 132 133 try: 134 aiff = BitstreamReader(BytesIO(verbatim_bytes), False) 135 header = aiff.read_bytes(12) 136 if (header.startswith(b"FORM") and 137 header.endswith(b"AIFF")): 138 # got FORM/AIFF header 139 # so parse aiff blocks as needed 140 total_size = len(verbatim_bytes) - 12 141 while total_size >= 8: 142 (chunk_id, chunk_size) = aiff.parse("4b 32u") 143 total_size -= 8 144 if chunk_id == b'COMM': 145 from audiotools.aiff import parse_comm 146 147 (channels, 148 total_sample_frames, 149 bits_per_sample, 150 self.__sample_rate__, 151 self.__channel_mask__) = parse_comm( 152 aiff.substream(chunk_size)) 153 elif chunk_id == b'SSND': 154 # subtract 8 bytes for 155 # "offset" and "block size" 156 self.__total_frames__ = \ 157 ((chunk_size - 8) // 158 (self.__channels__ * 159 (self.__bits_per_sample__ // 8))) 160 else: 161 if chunk_size % 2: 162 aiff.read_bytes(chunk_size + 1) 163 total_size -= (chunk_size + 1) 164 else: 165 aiff.read_bytes(chunk_size) 166 total_size -= chunk_size 167 except IOError: 168 pass 169 except IOError as msg: 170 raise InvalidShorten(str(msg)) 171 finally: 172 reader.close() 173 174 def bits_per_sample(self): 175 """returns an integer number of bits-per-sample this track contains""" 176 177 return self.__bits_per_sample__ 178 179 def channels(self): 180 """returns an integer number of channels this track contains""" 181 182 return self.__channels__ 183 184 def channel_mask(self): 185 """returns a ChannelMask object of this track's channel layout""" 186 187 return self.__channel_mask__ 188 189 def lossless(self): 190 """returns True""" 191 192 return True 193 194 def total_frames(self): 195 """returns the total PCM frames of the track as an integer""" 196 197 return self.__total_frames__ 198 199 def sample_rate(self): 200 """returns the rate of the track's audio as an integer number of Hz""" 201 202 return self.__sample_rate__ 203 204 def to_pcm(self): 205 """returns a PCMReader object containing the track's PCM data""" 206 207 from audiotools.decoders import SHNDecoder 208 from audiotools import PCMReaderError 209 210 try: 211 f = open(self.filename, "rb") 212 except IOError as msg: 213 return PCMReaderError(error_message=str(msg), 214 sample_rate=self.sample_rate(), 215 channels=self.channels(), 216 channel_mask=int(self.channel_mask()), 217 bits_per_sample=self.bits_per_sample()) 218 219 try: 220 return SHNDecoder(f) 221 except (IOError, ValueError) as msg: 222 f.close() 223 return PCMReaderError(error_message=str(msg), 224 sample_rate=self.sample_rate(), 225 channels=self.channels(), 226 channel_mask=int(self.channel_mask()), 227 bits_per_sample=self.bits_per_sample()) 228 229 @classmethod 230 def from_pcm(cls, filename, pcmreader, 231 compression=None, 232 total_pcm_frames=None, 233 block_size=256, 234 encoding_function=None): 235 """encodes a new file from PCM data 236 237 takes a filename string, PCMReader object, 238 optional compression level string and 239 optional total_pcm_frames integer 240 encodes a new audio file from pcmreader's data 241 at the given filename with the specified compression level 242 and returns a new ShortenAudio object""" 243 244 # can't build artificial header because we don't know 245 # how long the PCMReader will be and there's no way 246 # to go back and write one later because all the byte values 247 # are stored variable-sized 248 # so we have to build a temporary Wave file instead 249 250 from audiotools import UnsupportedBitsPerSample 251 252 if pcmreader.bits_per_sample not in {8, 16}: 253 pcmreader.close() 254 raise UnsupportedBitsPerSample(filename, pcmreader.bits_per_sample) 255 256 if total_pcm_frames is not None: 257 from audiotools.wav import wave_header 258 259 return cls.from_wave(filename, 260 wave_header(pcmreader.sample_rate, 261 pcmreader.channels, 262 pcmreader.channel_mask, 263 pcmreader.bits_per_sample, 264 total_pcm_frames), 265 pcmreader, 266 b"\x00" * (((pcmreader.bits_per_sample // 8) * 267 pcmreader.channels * 268 total_pcm_frames) % 2), 269 compression, 270 block_size, 271 encoding_function) 272 else: 273 from audiotools import WaveAudio 274 import tempfile 275 276 f = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) 277 wave_name = f.name 278 try: 279 w = WaveAudio.from_pcm(wave_name, pcmreader) 280 (header, footer) = w.wave_header_footer() 281 return cls.from_wave(filename, 282 header, 283 w.to_pcm(), 284 footer, 285 compression, 286 block_size, 287 encoding_function) 288 finally: 289 f.close() 290 if os.path.isfile(wave_name): 291 os.unlink(wave_name) 292 293 def has_foreign_wave_chunks(self): 294 """returns True if the audio file contains non-audio RIFF chunks 295 296 during transcoding, if the source audio file has foreign RIFF chunks 297 and the target audio format supports foreign RIFF chunks, 298 conversion should be routed through .wav conversion 299 to avoid losing those chunks""" 300 301 from audiotools import decoders 302 from audiotools import bitstream 303 from io import BytesIO 304 305 try: 306 with decoders.SHNDecoder(open(self.filename, "rb")) as decoder: 307 (head, tail) = decoder.pcm_split() 308 header = bitstream.BitstreamReader(BytesIO(head), True) 309 (RIFF, SIZE, WAVE) = header.parse("4b 32u 4b") 310 if (RIFF != b'RIFF') or (WAVE != b'WAVE'): 311 return False 312 313 # if the tail has room for chunks, there must be some foreign ones 314 if len(tail) >= 8: 315 return True 316 317 # otherwise, check the header for foreign chunks 318 total_size = len(head) - bitstream.format_byte_size("4b 32u 4b") 319 while total_size >= 8: 320 (chunk_id, chunk_size) = header.parse("4b 32u") 321 total_size -= bitstream.format_byte_size("4b 32u") 322 if chunk_id not in (b'fmt ', b'data'): 323 return True 324 else: 325 if chunk_size % 2: 326 header.skip_bytes(chunk_size + 1) 327 total_size -= chunk_size + 1 328 else: 329 header.skip_bytes(chunk_size) 330 total_size -= chunk_size 331 else: 332 # no foreign chunks found 333 return False 334 except IOError: 335 return False 336 337 def wave_header_footer(self): 338 """returns (header, footer) tuple of strings 339 containing all data before and after the PCM stream 340 341 may raise ValueError if there's a problem with 342 the header or footer data 343 may raise IOError if there's a problem reading 344 header or footer data from the file""" 345 346 from audiotools import decoders 347 348 decoder = decoders.SHNDecoder(open(self.filename, "rb")) 349 (head, tail) = decoder.pcm_split() 350 decoder.close() 351 if (head[0:4] == b"RIFF") and (head[8:12] == b"WAVE"): 352 return (head, tail) 353 else: 354 raise ValueError("invalid wave header") 355 356 @classmethod 357 def from_wave(cls, filename, header, pcmreader, footer, compression=None, 358 block_size=256, encoding_function=None): 359 """encodes a new file from wave data 360 361 takes a filename string, header string, 362 PCMReader object, footer string 363 and optional compression level string 364 encodes a new audio file from pcmreader's data 365 at the given filename with the specified compression level 366 and returns a new WaveAudio object 367 368 header + pcm data + footer should always result 369 in the original wave file being restored 370 without need for any padding bytes 371 372 may raise EncodingError if some problem occurs when 373 encoding the input file""" 374 375 from audiotools import (CounterPCMReader, 376 BufferedPCMReader, 377 UnsupportedBitsPerSample, 378 EncodingError) 379 from audiotools.wav import (validate_header, validate_footer) 380 381 if encoding_function is None: 382 from audiotools.encoders import encode_shn 383 else: 384 encode_shn = encoding_function 385 386 if pcmreader.bits_per_sample not in {8, 16}: 387 pcmreader.close() 388 raise UnsupportedBitsPerSample(filename, pcmreader.bits_per_sample) 389 390 # ensure header is valid 391 try: 392 (total_size, data_size) = validate_header(header) 393 except ValueError as err: 394 pcmreader.close() 395 raise EncodingError(str(err)) 396 397 counter = CounterPCMReader(pcmreader) 398 399 try: 400 if len(footer) == 0: 401 encode_shn(filename=filename, 402 pcmreader=BufferedPCMReader(counter), 403 is_big_endian=False, 404 signed_samples=pcmreader.bits_per_sample == 16, 405 header_data=header, 406 block_size=block_size) 407 else: 408 encode_shn(filename=filename, 409 pcmreader=BufferedPCMReader(counter), 410 is_big_endian=False, 411 signed_samples=pcmreader.bits_per_sample == 16, 412 header_data=header, 413 footer_data=footer, 414 block_size=block_size) 415 416 counter.close() 417 data_bytes_written = counter.bytes_written() 418 419 # ensure output data size matches the "data" chunk's size 420 if data_size != data_bytes_written: 421 from audiotools.text import ERR_WAV_TRUNCATED_DATA_CHUNK 422 raise EncodingError(ERR_WAV_TRUNCATED_DATA_CHUNK) 423 424 # ensure footer validates correctly 425 try: 426 validate_footer(footer, data_bytes_written) 427 except ValueError as err: 428 raise EncodingError(str(err)) 429 430 # ensure total size is correct 431 if (len(header) + data_size + len(footer)) != total_size: 432 from audiotools.text import ERR_WAV_INVALID_SIZE 433 raise EncodingError(ERR_WAV_INVALID_SIZE) 434 435 return cls(filename) 436 except (IOError, ValueError) as err: 437 counter.close() 438 cls.__unlink__(filename) 439 raise EncodingError(str(err)) 440 except Exception as err: 441 counter.close() 442 cls.__unlink__(filename) 443 raise err 444 445 def has_foreign_aiff_chunks(self): 446 """returns True if the audio file contains non-audio AIFF chunks 447 448 during transcoding, if the source audio file has foreign AIFF chunks 449 and the target audio format supports foreign AIFF chunks, 450 conversion should be routed through .aiff conversion 451 to avoid losing those chunks""" 452 453 from audiotools import decoders 454 from audiotools import bitstream 455 from io import BytesIO 456 457 try: 458 with decoders.SHNDecoder(open(self.filename, "rb")) as decoder: 459 (head, tail) = decoder.pcm_split() 460 header = bitstream.BitstreamReader(BytesIO(head), False) 461 (FORM, SIZE, AIFF) = header.parse("4b 32u 4b") 462 if (FORM != b'FORM') or (AIFF != b'AIFF'): 463 return False 464 465 # if the tail has room for chunks, there must be some foreign ones 466 if len(tail) >= 8: 467 return True 468 469 # otherwise, check the header for foreign chunks 470 total_size = len(head) - bitstream.format_byte_size("4b 32u 4b") 471 while total_size >= 8: 472 (chunk_id, chunk_size) = header.parse("4b 32u") 473 total_size -= bitstream.format_byte_size("4b 32u") 474 if chunk_id not in (b'COMM', b'SSND'): 475 return True 476 else: 477 if chunk_size % 2: 478 header.skip_bytes(chunk_size + 1) 479 total_size -= chunk_size + 1 480 else: 481 header.skip_bytes(chunk_size) 482 total_size -= chunk_size 483 else: 484 # no foreign chunks found 485 return False 486 except IOError: 487 return False 488 489 def aiff_header_footer(self): 490 """returns (header, footer) tuple of strings 491 containing all data before and after the PCM stream 492 493 if self.has_foreign_aiff_chunks() is False, 494 may raise ValueError if the file has no header and footer 495 for any reason""" 496 497 from audiotools import decoders 498 from audiotools import bitstream 499 from io import BytesIO 500 501 decoder = decoders.SHNDecoder(open(self.filename, "rb")) 502 (head, tail) = decoder.pcm_split() 503 decoder.close() 504 if (head[0:4] == b"FORM") and (head[8:12] == b"AIFF"): 505 return (head, tail) 506 else: 507 raise ValueError("invalid AIFF header") 508 509 @classmethod 510 def from_aiff(cls, filename, header, pcmreader, footer, compression=None, 511 block_size=256, encoding_function=None): 512 """encodes a new file from AIFF data 513 514 takes a filename string, header string, 515 PCMReader object, footer string 516 and optional compression level string 517 encodes a new audio file from pcmreader's data 518 at the given filename with the specified compression level 519 and returns a new AiffAudio object 520 521 header + pcm data + footer should always result 522 in the original AIFF file being restored 523 without need for any padding bytes 524 525 may raise EncodingError if some problem occurs when 526 encoding the input file""" 527 528 from audiotools import (CounterPCMReader, 529 BufferedPCMReader, 530 UnsupportedBitsPerSample, 531 EncodingError) 532 from audiotools.aiff import (validate_header, validate_footer) 533 534 if encoding_function is None: 535 from audiotools.encoders import encode_shn 536 else: 537 encode_shn = encoding_function 538 539 if pcmreader.bits_per_sample not in {8, 16}: 540 pcmreader.close() 541 raise UnsupportedBitsPerSample(filename, pcmreader.bits_per_sample) 542 543 # ensure header is valid 544 try: 545 (total_size, ssnd_size) = validate_header(header) 546 except ValueError as err: 547 pcmreader.close() 548 raise EncodingError(str(err)) 549 550 counter = CounterPCMReader(pcmreader) 551 552 try: 553 if len(footer) == 0: 554 encode_shn(filename=filename, 555 pcmreader=BufferedPCMReader(counter), 556 is_big_endian=True, 557 signed_samples=True, 558 header_data=header, 559 block_size=block_size) 560 else: 561 encode_shn(filename=filename, 562 pcmreader=BufferedPCMReader(counter), 563 is_big_endian=True, 564 signed_samples=True, 565 header_data=header, 566 footer_data=footer, 567 block_size=block_size) 568 569 counter.close() 570 ssnd_bytes_written = counter.bytes_written() 571 572 # ensure output data size matches the "SSND" chunk's size 573 if ssnd_size != ssnd_bytes_written: 574 from audiotools.text import ERR_AIFF_TRUNCATED_SSND_CHUNK 575 raise EncodingError(ERR_AIFF_TRUNCATED_SSND_CHUNK) 576 577 # ensure footer validates correctly 578 try: 579 validate_footer(footer, ssnd_bytes_written) 580 except ValueError as err: 581 raise EncodingError(str(err)) 582 583 # ensure total size is correct 584 if (len(header) + ssnd_size + len(footer)) != total_size: 585 from audiotools.text import ERR_AIFF_INVALID_SIZE 586 raise EncodingError(ERR_AIFF_INVALID_SIZE) 587 588 return cls(filename) 589 except IOError as err: 590 cls.__unlink__(filename) 591 raise EncodingError(str(err)) 592 except Exception as err: 593 cls.__unlink__(filename) 594 raise err 595 596 def convert(self, target_path, target_class, compression=None, 597 progress=None): 598 """encodes a new AudioFile from existing AudioFile 599 600 take a filename string, target class and optional compression string 601 encodes a new AudioFile in the target class and returns 602 the resulting object 603 may raise EncodingError if some problem occurs during encoding""" 604 605 # A Shorten file cannot contain both RIFF and AIFF chunks 606 # at the same time. 607 608 from audiotools import WaveAudio 609 from audiotools import AiffAudio 610 from audiotools import to_pcm_progress 611 612 if ((self.has_foreign_wave_chunks() and 613 hasattr(target_class, "from_wave") and 614 callable(target_class.from_wave))): 615 return WaveContainer.convert(self, 616 target_path, 617 target_class, 618 compression, 619 progress) 620 elif (self.has_foreign_aiff_chunks() and 621 hasattr(target_class, "from_aiff") and 622 callable(target_class.from_aiff)): 623 return AiffContainer.convert(self, 624 target_path, 625 target_class, 626 compression, 627 progress) 628 else: 629 return target_class.from_pcm( 630 target_path, 631 to_pcm_progress(self, progress), 632 compression, 633 total_pcm_frames=self.total_frames()) 634