1#!/usr/bin/python2.5 2# 3# Copyright 2013 Olivier Gillet. 4# 5# Author: Olivier Gillet (ol.gillet@gmail.com) 6# 7# Permission is hereby granted, free of charge, to any person obtaining a copy 8# of this software and associated documentation files (the "Software"), to deal 9# in the Software without restriction, including without limitation the rights 10# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11# copies of the Software, and to permit persons to whom the Software is 12# furnished to do so, subject to the following conditions: 13# 14# The above copyright notice and this permission notice shall be included in 15# all copies or substantial portions of the Software. 16# 17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 23# THE SOFTWARE. 24# 25# See http://creativecommons.org/licenses/MIT/ for more information. 26# 27# ----------------------------------------------------------------------------- 28# 29# Streaming .wav file writer. 30 31import copy 32import numpy 33import struct 34 35_DATA_CHUNK_HEADER_SIZE = 8 36_FMT_CHUNK_DATA_SIZE = 16 37_FMT_CHUNK_HEADER_SIZE = 8 38_RIFF_FORMAT_DESCRIPTOR_SIZE = 4 39 40_UNSIGNED_CHAR_TO_FLOAT_SCALE = 128.0 41_FLOAT_TO_UNSIGNED_CHAR_SCALE = 127.0 42 43 44class AudioStreamWriter(object): 45 46 def __init__(self, file_name, sample_rate, bitdepth=16, num_channels=1): 47 self._sr = sample_rate 48 self._file_name = file_name 49 self._sample_rate = sample_rate 50 self._bitdepth = bitdepth 51 self._num_channels = num_channels 52 self._f = file(file_name, 'wb') 53 self._num_bytes = 0 54 self._write_header(False) 55 56 @staticmethod 57 def quantize(signal, bitdepth): 58 """Convert an array of float to an array of integers. 59 60 Args: 61 signal: numpy array. source signal. 62 bitdepth: int. size of the integer in bits. 63 64 Returns: 65 array of integers. 66 """ 67 norm = numpy.abs(signal).max() 68 69 # Normalization or clipping. 70 scaled_signal = copy.copy(signal) 71 if norm > 1.0: 72 logging.warning('Some samples will be clipped.') 73 # Clip samples above 1 and below -1. 74 scaled_signal[scaled_signal < -1] = -1 75 scaled_signal[scaled_signal > 1] = 1 76 77 if bitdepth == 8: 78 scaled_signal = (scaled_signal + 1.0) * _FLOAT_TO_UNSIGNED_CHAR_SCALE 79 scaled_signal = numpy.array(scaled_signal, dtype=numpy.uint8) 80 else: 81 scale = (1 << (bitdepth - 1)) - 1 82 # pylint: disable-msg=C6407 83 scaled_signal = scaled_signal * scale 84 scaled_signal = numpy.array(scaled_signal, dtype='i%d' % (bitdepth / 8)) 85 86 return scaled_signal 87 88 def _write_header(self, restore_position): 89 f = self._f 90 91 total_size = _RIFF_FORMAT_DESCRIPTOR_SIZE 92 total_size += _FMT_CHUNK_HEADER_SIZE + _FMT_CHUNK_DATA_SIZE 93 total_size += _DATA_CHUNK_HEADER_SIZE + self._num_bytes 94 95 current_position = f.tell() 96 f.seek(0) 97 f.write('RIFF') 98 f.write(struct.pack('<L', total_size)) 99 f.write('WAVEfmt ') 100 bitrate = self._sample_rate * self._num_channels * (self._bitdepth / 8) 101 bits_per_sample = self._num_channels * (self._bitdepth / 8) 102 f.write(struct.pack( 103 '<LHHLLHH', 104 16, 105 1, 106 self._num_channels, 107 self._sample_rate, 108 bitrate, 109 bits_per_sample, 110 self._bitdepth)) 111 f.write('data') 112 f.write(struct.pack('<L', self._num_bytes)) 113 if restore_position: 114 f.seek(current_position) 115 116 def append(self, signal): 117 if signal.dtype == numpy.uint8 or signal.dtype == numpy.int16: 118 assert self._bitdepth == signal.dtype.itemsize * 8 119 scaled_signal = signal 120 else: 121 scaled_signal = self.quantize(signal, self._bitdepth) 122 123 if scaled_signal.ndim == 1: 124 assert self._num_channels == 1 125 else: 126 assert self._num_channels == scaled_signal.shape[1] 127 scaled_signal.tofile(self._f) 128 self._num_bytes += scaled_signal.nbytes 129 self._write_header(True) 130