1#!/usr/bin/python
2
3# Audio Tools, a module and set of tools for manipulating audio data
4# Copyright (C) 2007-2014  Brian Langenberger
5
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19
20
21from audiotools import (AudioFile, InvalidFile, PCMReader)
22from audiotools.pcm import FrameList
23
24
25class InvalidAU(InvalidFile):
26    pass
27
28
29class AuReader(object):
30    def __init__(self, au_filename):
31        from audiotools.bitstream import BitstreamReader
32        from audiotools.text import (ERR_AU_INVALID_HEADER,
33                                     ERR_AU_UNSUPPORTED_FORMAT)
34
35        self.stream = BitstreamReader(open(au_filename, "rb"), False)
36        (magic_number,
37         self.data_offset,
38         data_size,
39         encoding_format,
40         self.sample_rate,
41         self.channels) = self.stream.parse("4b 5* 32u")
42
43        if magic_number != b'.snd':
44            self.stream.close()
45            raise ValueError(ERR_AU_INVALID_HEADER)
46        try:
47            self.bits_per_sample = {2: 8, 3: 16, 4: 24}[encoding_format]
48        except KeyError:
49            self.stream.close()
50            raise ValueError(ERR_AU_UNSUPPORTED_FORMAT)
51
52        self.channel_mask = {1: 0x4, 2: 0x3}.get(self.channels, 0)
53        self.bytes_per_pcm_frame = ((self.bits_per_sample // 8) *
54                                    self.channels)
55        self.total_pcm_frames = (data_size // self.bytes_per_pcm_frame)
56        self.remaining_pcm_frames = self.total_pcm_frames
57
58    def __enter__(self):
59        return self
60
61    def __exit__(self, exc_type, exc_value, traceback):
62        self.close()
63
64    def read(self, pcm_frames):
65        # try to read requested PCM frames or remaining frames
66        requested_pcm_frames = min(max(pcm_frames, 1),
67                                   self.remaining_pcm_frames)
68        requested_bytes = (self.bytes_per_pcm_frame *
69                           requested_pcm_frames)
70        pcm_data = self.stream.read_bytes(requested_bytes)
71
72        # raise exception if data block exhausted early
73        if len(pcm_data) < requested_bytes:
74            from audiotools.text import ERR_AU_TRUNCATED_DATA
75            raise IOError(ERR_AU_TRUNCATED_DATA)
76        else:
77            self.remaining_pcm_frames -= requested_pcm_frames
78
79            # return parsed chunk
80            return FrameList(pcm_data,
81                             self.channels,
82                             self.bits_per_sample,
83                             True,
84                             True)
85
86    def read_closed(self, pcm_frames):
87        raise ValueError("cannot read closed stream")
88
89    def seek(self, pcm_frame_offset):
90        if pcm_frame_offset < 0:
91            from audiotools.text import ERR_NEGATIVE_SEEK
92            raise ValueError(ERR_NEGATIVE_SEEK)
93
94        # ensure one doesn't walk off the end of the file
95        pcm_frame_offset = min(pcm_frame_offset,
96                               self.total_pcm_frames)
97
98        # position file in data block
99        self.stream.seek(self.data_offset +
100                         (pcm_frame_offset *
101                          self.bytes_per_pcm_frame), 0)
102        self.remaining_pcm_frames = (self.total_pcm_frames -
103                                     pcm_frame_offset)
104
105        return pcm_frame_offset
106
107    def seek_closed(self, pcm_frame_offset):
108        raise ValueError("cannot seek closed stream")
109
110    def close(self):
111        self.stream.close()
112        self.read = self.read_closed
113        self.seek = self.seek_closed
114
115
116def au_header(sample_rate,
117              channels,
118              bits_per_sample,
119              total_pcm_frames):
120    """given a set of integer stream attributes,
121    returns header string of entire Au header
122
123    may raise ValueError if the total size of the file is too large"""
124
125    from audiotools.bitstream import build
126
127    if ((channels *
128         (bits_per_sample // 8) *
129         total_pcm_frames) >= 2 ** 32):
130        raise ValueError("PCM data too large for Sun AU file")
131    else:
132        return build("4b 5*32u",
133                     False,
134                     (b".snd",
135                      24,
136                      (channels *
137                       (bits_per_sample // 8) *
138                       total_pcm_frames),
139                      {8: 2, 16: 3, 24: 4}[bits_per_sample],
140                      sample_rate,
141                      channels))
142
143
144class AuAudio(AudioFile):
145    """a Sun AU audio file"""
146
147    SUFFIX = "au"
148    NAME = SUFFIX
149    DESCRIPTION = u"Sun Au"
150
151    def __init__(self, filename):
152        AudioFile.__init__(self, filename)
153
154        from audiotools.bitstream import parse
155        from audiotools.text import (ERR_AU_INVALID_HEADER,
156                                     ERR_AU_UNSUPPORTED_FORMAT)
157
158        try:
159            with open(filename, "rb") as f:
160                (magic_number,
161                 self.__data_offset__,
162                 self.__data_size__,
163                 encoding_format,
164                 self.__sample_rate__,
165                 self.__channels__) = parse("4b 5* 32u", False, f.read(24))
166        except IOError as msg:
167            raise InvalidAU(str(msg))
168
169        if magic_number != b'.snd':
170            raise InvalidAU(ERR_AU_INVALID_HEADER)
171        try:
172            self.__bits_per_sample__ = {2: 8, 3: 16, 4: 24}[encoding_format]
173        except KeyError:
174            raise InvalidAU(ERR_AU_UNSUPPORTED_FORMAT)
175
176    def lossless(self):
177        """returns True"""
178
179        return True
180
181    def bits_per_sample(self):
182        """returns an integer number of bits-per-sample this track contains"""
183
184        return self.__bits_per_sample__
185
186    def channels(self):
187        """returns an integer number of channels this track contains"""
188
189        return self.__channels__
190
191    def channel_mask(self):
192        from audiotools import ChannelMask
193
194        """returns a ChannelMask object of this track's channel layout"""
195
196        if self.channels() <= 2:
197            return ChannelMask.from_channels(self.channels())
198        else:
199            return ChannelMask(0)
200
201    def sample_rate(self):
202        """returns the rate of the track's audio as an integer number of Hz"""
203
204        return self.__sample_rate__
205
206    def total_frames(self):
207        """returns the total PCM frames of the track as an integer"""
208
209        return (self.__data_size__ //
210                (self.__bits_per_sample__ // 8) //
211                self.__channels__)
212
213    def seekable(self):
214        """returns True if the file is seekable"""
215
216        return True
217
218    def to_pcm(self):
219        """returns a PCMReader object containing the track's PCM data"""
220
221        return AuReader(self.filename)
222
223    def pcm_split(self):
224        """returns a pair of data strings before and after PCM data
225
226        the first contains all data before the PCM content of the data chunk
227        the second containing all data after the data chunk"""
228
229        import struct
230
231        f = open(self.filename, 'rb')
232        (magic_number, data_offset) = struct.unpack(">4sI", f.read(8))
233        header = f.read(data_offset - struct.calcsize(">4sI"))
234        return (struct.pack(">4sI%ds" % (len(header)),
235                            magic_number, data_offset, header), "")
236
237    @classmethod
238    def from_pcm(cls, filename, pcmreader,
239                 compression=None, total_pcm_frames=None):
240        """encodes a new file from PCM data
241
242        takes a filename string, PCMReader object,
243        optional compression level string and
244        optional total_pcm_frames integer
245        encodes a new audio file from pcmreader's data
246        at the given filename with the specified compression level
247        and returns a new AuAudio object"""
248
249        from audiotools import EncodingError
250        from audiotools import DecodingError
251        from audiotools import CounterPCMReader
252        from audiotools import transfer_framelist_data
253
254        if pcmreader.bits_per_sample not in (8, 16, 24):
255            from audiotools import Filename
256            from audiotools import UnsupportedBitsPerSample
257            from audiotools.text import ERR_UNSUPPORTED_BITS_PER_SAMPLE
258            pcmreader.close()
259            raise UnsupportedBitsPerSample(
260                ERR_UNSUPPORTED_BITS_PER_SAMPLE %
261                {"target_filename": Filename(filename),
262                 "bps": pcmreader.bits_per_sample})
263
264        try:
265            header = au_header(pcmreader.sample_rate,
266                               pcmreader.channels,
267                               pcmreader.bits_per_sample,
268                               total_pcm_frames if total_pcm_frames
269                               is not None else 0)
270        except ValueError as err:
271            raise EncodingError(str(err))
272
273        try:
274            f = open(filename, "wb")
275        except IOError as err:
276            pcmreader.close()
277            raise EncodingError(str(err))
278
279        counter = CounterPCMReader(pcmreader)
280        f.write(header)
281        try:
282            transfer_framelist_data(counter, f.write, True, True)
283        except (IOError, ValueError) as err:
284            f.close()
285            cls.__unlink__(filename)
286            raise EncodingError(str(err))
287
288        if total_pcm_frames is not None:
289            f.close()
290            if total_pcm_frames != counter.frames_written:
291                # ensure written number of PCM frames
292                # matches total_pcm_frames argument
293                from audiotools.text import ERR_TOTAL_PCM_FRAMES_MISMATCH
294                cls.__unlink__(filename)
295                raise EncodingError(ERR_TOTAL_PCM_FRAMES_MISMATCH)
296        else:
297            # go back and rewrite populated header
298            # with counted number of PCM frames
299            f.seek(0, 0)
300            f.write(au_header(pcmreader.sample_rate,
301                              pcmreader.channels,
302                              pcmreader.bits_per_sample,
303                              counter.frames_written))
304            f.close()
305
306        return AuAudio(filename)
307
308    @classmethod
309    def track_name(cls, file_path, track_metadata=None, format=None,
310                   suffix=None):
311        """constructs a new filename string
312
313        given a plain string to an existing path,
314        a MetaData-compatible object (or None),
315        a UTF-8-encoded Python format string
316        and an ASCII-encoded suffix string (such as "mp3")
317        returns a plain string of a new filename with format's
318        fields filled-in and encoded as FS_ENCODING
319        raises UnsupportedTracknameField if the format string
320        contains invalid template fields"""
321
322        if format is None:
323            format = "track%(track_number)2.2d.au"
324        return AudioFile.track_name(file_path, track_metadata, format,
325                                    suffix=cls.SUFFIX)
326