1#!/usr/bin/python 2 3# Audio Tools, a module and set of tools for manipulating audio data 4# Copyright (C) 2007-2014 Brian Langenberger 5 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 19 20 21from audiotools import (AudioFile, InvalidFile, PCMReader) 22from audiotools.pcm import FrameList 23 24 25class InvalidAU(InvalidFile): 26 pass 27 28 29class AuReader(object): 30 def __init__(self, au_filename): 31 from audiotools.bitstream import BitstreamReader 32 from audiotools.text import (ERR_AU_INVALID_HEADER, 33 ERR_AU_UNSUPPORTED_FORMAT) 34 35 self.stream = BitstreamReader(open(au_filename, "rb"), False) 36 (magic_number, 37 self.data_offset, 38 data_size, 39 encoding_format, 40 self.sample_rate, 41 self.channels) = self.stream.parse("4b 5* 32u") 42 43 if magic_number != b'.snd': 44 self.stream.close() 45 raise ValueError(ERR_AU_INVALID_HEADER) 46 try: 47 self.bits_per_sample = {2: 8, 3: 16, 4: 24}[encoding_format] 48 except KeyError: 49 self.stream.close() 50 raise ValueError(ERR_AU_UNSUPPORTED_FORMAT) 51 52 self.channel_mask = {1: 0x4, 2: 0x3}.get(self.channels, 0) 53 self.bytes_per_pcm_frame = ((self.bits_per_sample // 8) * 54 self.channels) 55 self.total_pcm_frames = (data_size // self.bytes_per_pcm_frame) 56 self.remaining_pcm_frames = self.total_pcm_frames 57 58 def __enter__(self): 59 return self 60 61 def __exit__(self, exc_type, exc_value, traceback): 62 self.close() 63 64 def read(self, pcm_frames): 65 # try to read requested PCM frames or remaining frames 66 requested_pcm_frames = min(max(pcm_frames, 1), 67 self.remaining_pcm_frames) 68 requested_bytes = (self.bytes_per_pcm_frame * 69 requested_pcm_frames) 70 pcm_data = self.stream.read_bytes(requested_bytes) 71 72 # raise exception if data block exhausted early 73 if len(pcm_data) < requested_bytes: 74 from audiotools.text import ERR_AU_TRUNCATED_DATA 75 raise IOError(ERR_AU_TRUNCATED_DATA) 76 else: 77 self.remaining_pcm_frames -= requested_pcm_frames 78 79 # return parsed chunk 80 return FrameList(pcm_data, 81 self.channels, 82 self.bits_per_sample, 83 True, 84 True) 85 86 def read_closed(self, pcm_frames): 87 raise ValueError("cannot read closed stream") 88 89 def seek(self, pcm_frame_offset): 90 if pcm_frame_offset < 0: 91 from audiotools.text import ERR_NEGATIVE_SEEK 92 raise ValueError(ERR_NEGATIVE_SEEK) 93 94 # ensure one doesn't walk off the end of the file 95 pcm_frame_offset = min(pcm_frame_offset, 96 self.total_pcm_frames) 97 98 # position file in data block 99 self.stream.seek(self.data_offset + 100 (pcm_frame_offset * 101 self.bytes_per_pcm_frame), 0) 102 self.remaining_pcm_frames = (self.total_pcm_frames - 103 pcm_frame_offset) 104 105 return pcm_frame_offset 106 107 def seek_closed(self, pcm_frame_offset): 108 raise ValueError("cannot seek closed stream") 109 110 def close(self): 111 self.stream.close() 112 self.read = self.read_closed 113 self.seek = self.seek_closed 114 115 116def au_header(sample_rate, 117 channels, 118 bits_per_sample, 119 total_pcm_frames): 120 """given a set of integer stream attributes, 121 returns header string of entire Au header 122 123 may raise ValueError if the total size of the file is too large""" 124 125 from audiotools.bitstream import build 126 127 if ((channels * 128 (bits_per_sample // 8) * 129 total_pcm_frames) >= 2 ** 32): 130 raise ValueError("PCM data too large for Sun AU file") 131 else: 132 return build("4b 5*32u", 133 False, 134 (b".snd", 135 24, 136 (channels * 137 (bits_per_sample // 8) * 138 total_pcm_frames), 139 {8: 2, 16: 3, 24: 4}[bits_per_sample], 140 sample_rate, 141 channels)) 142 143 144class AuAudio(AudioFile): 145 """a Sun AU audio file""" 146 147 SUFFIX = "au" 148 NAME = SUFFIX 149 DESCRIPTION = u"Sun Au" 150 151 def __init__(self, filename): 152 AudioFile.__init__(self, filename) 153 154 from audiotools.bitstream import parse 155 from audiotools.text import (ERR_AU_INVALID_HEADER, 156 ERR_AU_UNSUPPORTED_FORMAT) 157 158 try: 159 with open(filename, "rb") as f: 160 (magic_number, 161 self.__data_offset__, 162 self.__data_size__, 163 encoding_format, 164 self.__sample_rate__, 165 self.__channels__) = parse("4b 5* 32u", False, f.read(24)) 166 except IOError as msg: 167 raise InvalidAU(str(msg)) 168 169 if magic_number != b'.snd': 170 raise InvalidAU(ERR_AU_INVALID_HEADER) 171 try: 172 self.__bits_per_sample__ = {2: 8, 3: 16, 4: 24}[encoding_format] 173 except KeyError: 174 raise InvalidAU(ERR_AU_UNSUPPORTED_FORMAT) 175 176 def lossless(self): 177 """returns True""" 178 179 return True 180 181 def bits_per_sample(self): 182 """returns an integer number of bits-per-sample this track contains""" 183 184 return self.__bits_per_sample__ 185 186 def channels(self): 187 """returns an integer number of channels this track contains""" 188 189 return self.__channels__ 190 191 def channel_mask(self): 192 from audiotools import ChannelMask 193 194 """returns a ChannelMask object of this track's channel layout""" 195 196 if self.channels() <= 2: 197 return ChannelMask.from_channels(self.channels()) 198 else: 199 return ChannelMask(0) 200 201 def sample_rate(self): 202 """returns the rate of the track's audio as an integer number of Hz""" 203 204 return self.__sample_rate__ 205 206 def total_frames(self): 207 """returns the total PCM frames of the track as an integer""" 208 209 return (self.__data_size__ // 210 (self.__bits_per_sample__ // 8) // 211 self.__channels__) 212 213 def seekable(self): 214 """returns True if the file is seekable""" 215 216 return True 217 218 def to_pcm(self): 219 """returns a PCMReader object containing the track's PCM data""" 220 221 return AuReader(self.filename) 222 223 def pcm_split(self): 224 """returns a pair of data strings before and after PCM data 225 226 the first contains all data before the PCM content of the data chunk 227 the second containing all data after the data chunk""" 228 229 import struct 230 231 f = open(self.filename, 'rb') 232 (magic_number, data_offset) = struct.unpack(">4sI", f.read(8)) 233 header = f.read(data_offset - struct.calcsize(">4sI")) 234 return (struct.pack(">4sI%ds" % (len(header)), 235 magic_number, data_offset, header), "") 236 237 @classmethod 238 def from_pcm(cls, filename, pcmreader, 239 compression=None, total_pcm_frames=None): 240 """encodes a new file from PCM data 241 242 takes a filename string, PCMReader object, 243 optional compression level string and 244 optional total_pcm_frames integer 245 encodes a new audio file from pcmreader's data 246 at the given filename with the specified compression level 247 and returns a new AuAudio object""" 248 249 from audiotools import EncodingError 250 from audiotools import DecodingError 251 from audiotools import CounterPCMReader 252 from audiotools import transfer_framelist_data 253 254 if pcmreader.bits_per_sample not in (8, 16, 24): 255 from audiotools import Filename 256 from audiotools import UnsupportedBitsPerSample 257 from audiotools.text import ERR_UNSUPPORTED_BITS_PER_SAMPLE 258 pcmreader.close() 259 raise UnsupportedBitsPerSample( 260 ERR_UNSUPPORTED_BITS_PER_SAMPLE % 261 {"target_filename": Filename(filename), 262 "bps": pcmreader.bits_per_sample}) 263 264 try: 265 header = au_header(pcmreader.sample_rate, 266 pcmreader.channels, 267 pcmreader.bits_per_sample, 268 total_pcm_frames if total_pcm_frames 269 is not None else 0) 270 except ValueError as err: 271 raise EncodingError(str(err)) 272 273 try: 274 f = open(filename, "wb") 275 except IOError as err: 276 pcmreader.close() 277 raise EncodingError(str(err)) 278 279 counter = CounterPCMReader(pcmreader) 280 f.write(header) 281 try: 282 transfer_framelist_data(counter, f.write, True, True) 283 except (IOError, ValueError) as err: 284 f.close() 285 cls.__unlink__(filename) 286 raise EncodingError(str(err)) 287 288 if total_pcm_frames is not None: 289 f.close() 290 if total_pcm_frames != counter.frames_written: 291 # ensure written number of PCM frames 292 # matches total_pcm_frames argument 293 from audiotools.text import ERR_TOTAL_PCM_FRAMES_MISMATCH 294 cls.__unlink__(filename) 295 raise EncodingError(ERR_TOTAL_PCM_FRAMES_MISMATCH) 296 else: 297 # go back and rewrite populated header 298 # with counted number of PCM frames 299 f.seek(0, 0) 300 f.write(au_header(pcmreader.sample_rate, 301 pcmreader.channels, 302 pcmreader.bits_per_sample, 303 counter.frames_written)) 304 f.close() 305 306 return AuAudio(filename) 307 308 @classmethod 309 def track_name(cls, file_path, track_metadata=None, format=None, 310 suffix=None): 311 """constructs a new filename string 312 313 given a plain string to an existing path, 314 a MetaData-compatible object (or None), 315 a UTF-8-encoded Python format string 316 and an ASCII-encoded suffix string (such as "mp3") 317 returns a plain string of a new filename with format's 318 fields filled-in and encoded as FS_ENCODING 319 raises UnsupportedTracknameField if the format string 320 contains invalid template fields""" 321 322 if format is None: 323 format = "track%(track_number)2.2d.au" 324 return AudioFile.track_name(file_path, track_metadata, format, 325 suffix=cls.SUFFIX) 326