1#!/usr/bin/python2.5
2#
3# Copyright 2013 Olivier Gillet.
4#
5# Author: Olivier Gillet (ol.gillet@gmail.com)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the "Software"), to deal
9# in the Software without restriction, including without limitation the rights
10# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11# copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23# THE SOFTWARE.
24#
25# See http://creativecommons.org/licenses/MIT/ for more information.
26#
27# -----------------------------------------------------------------------------
28#
29# Streaming .wav file writer.
30
31import copy
32import numpy
33import struct
34
35_DATA_CHUNK_HEADER_SIZE = 8
36_FMT_CHUNK_DATA_SIZE = 16
37_FMT_CHUNK_HEADER_SIZE = 8
38_RIFF_FORMAT_DESCRIPTOR_SIZE = 4
39
40_UNSIGNED_CHAR_TO_FLOAT_SCALE = 128.0
41_FLOAT_TO_UNSIGNED_CHAR_SCALE = 127.0
42
43
44class AudioStreamWriter(object):
45
46  def __init__(self, file_name, sample_rate, bitdepth=16, num_channels=1):
47    self._sr = sample_rate
48    self._file_name = file_name
49    self._sample_rate = sample_rate
50    self._bitdepth = bitdepth
51    self._num_channels = num_channels
52    self._f = file(file_name, 'wb')
53    self._num_bytes = 0
54    self._write_header(False)
55
56  @staticmethod
57  def quantize(signal, bitdepth):
58    """Convert an array of float to an array of integers.
59
60    Args:
61      signal: numpy array. source signal.
62      bitdepth: int. size of the integer in bits.
63
64    Returns:
65      array of integers.
66    """
67    norm = numpy.abs(signal).max()
68
69    # Normalization or clipping.
70    scaled_signal = copy.copy(signal)
71    if norm > 1.0:
72      logging.warning('Some samples will be clipped.')
73      # Clip samples above 1 and below -1.
74      scaled_signal[scaled_signal < -1] = -1
75      scaled_signal[scaled_signal > 1] = 1
76
77    if bitdepth == 8:
78      scaled_signal = (scaled_signal + 1.0) * _FLOAT_TO_UNSIGNED_CHAR_SCALE
79      scaled_signal = numpy.array(scaled_signal, dtype=numpy.uint8)
80    else:
81      scale = (1 << (bitdepth - 1)) - 1
82      # pylint: disable-msg=C6407
83      scaled_signal = scaled_signal * scale
84      scaled_signal = numpy.array(scaled_signal, dtype='i%d' % (bitdepth / 8))
85
86    return scaled_signal
87
88  def _write_header(self, restore_position):
89    f = self._f
90
91    total_size = _RIFF_FORMAT_DESCRIPTOR_SIZE
92    total_size += _FMT_CHUNK_HEADER_SIZE + _FMT_CHUNK_DATA_SIZE
93    total_size += _DATA_CHUNK_HEADER_SIZE + self._num_bytes
94
95    current_position = f.tell()
96    f.seek(0)
97    f.write('RIFF')
98    f.write(struct.pack('<L', total_size))
99    f.write('WAVEfmt ')
100    bitrate = self._sample_rate * self._num_channels * (self._bitdepth / 8)
101    bits_per_sample = self._num_channels * (self._bitdepth / 8)
102    f.write(struct.pack(
103        '<LHHLLHH',
104        16,
105        1,
106        self._num_channels,
107        self._sample_rate,
108        bitrate,
109        bits_per_sample,
110        self._bitdepth))
111    f.write('data')
112    f.write(struct.pack('<L', self._num_bytes))
113    if restore_position:
114      f.seek(current_position)
115
116  def append(self, signal):
117    if signal.dtype == numpy.uint8 or signal.dtype == numpy.int16:
118      assert self._bitdepth == signal.dtype.itemsize * 8
119      scaled_signal = signal
120    else:
121      scaled_signal = self.quantize(signal, self._bitdepth)
122
123    if scaled_signal.ndim == 1:
124      assert self._num_channels == 1
125    else:
126      assert self._num_channels == scaled_signal.shape[1]
127    scaled_signal.tofile(self._f)
128    self._num_bytes += scaled_signal.nbytes
129    self._write_header(True)
130