1# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
2#
3# Use of this source code is governed by a BSD-style license
4# that can be found in the LICENSE file in the root of the source
5# tree. An additional intellectual property rights grant can be found
6# in the file PATENTS.  All contributing project authors may
7# be found in the AUTHORS file in the root of the source tree.
8"""Signal processing utility module.
9"""
10
11import array
12import logging
13import os
14import sys
15import enum
16
17try:
18    import numpy as np
19except ImportError:
20    logging.critical('Cannot import the third-party Python package numpy')
21    sys.exit(1)
22
23try:
24    import pydub
25    import pydub.generators
26except ImportError:
27    logging.critical('Cannot import the third-party Python package pydub')
28    sys.exit(1)
29
30try:
31    import scipy.signal
32    import scipy.fftpack
33except ImportError:
34    logging.critical('Cannot import the third-party Python package scipy')
35    sys.exit(1)
36
37from . import exceptions
38
39
40class SignalProcessingUtils(object):
41    """Collection of signal processing utilities.
42  """
43
44    @enum.unique
45    class MixPadding(enum.Enum):
46        NO_PADDING = 0
47        ZERO_PADDING = 1
48        LOOP = 2
49
50    def __init__(self):
51        pass
52
53    @classmethod
54    def LoadWav(cls, filepath, channels=1):
55        """Loads wav file.
56
57    Args:
58      filepath: path to the wav audio track file to load.
59      channels: number of channels (downmixing to mono by default).
60
61    Returns:
62      AudioSegment instance.
63    """
64        if not os.path.exists(filepath):
65            logging.error('cannot find the <%s> audio track file', filepath)
66            raise exceptions.FileNotFoundError()
67        return pydub.AudioSegment.from_file(filepath,
68                                            format='wav',
69                                            channels=channels)
70
71    @classmethod
72    def SaveWav(cls, output_filepath, signal):
73        """Saves wav file.
74
75    Args:
76      output_filepath: path to the wav audio track file to save.
77      signal: AudioSegment instance.
78    """
79        return signal.export(output_filepath, format='wav')
80
81    @classmethod
82    def CountSamples(cls, signal):
83        """Number of samples per channel.
84
85    Args:
86      signal: AudioSegment instance.
87
88    Returns:
89      An integer.
90    """
91        number_of_samples = len(signal.get_array_of_samples())
92        assert signal.channels > 0
93        assert number_of_samples % signal.channels == 0
94        return number_of_samples / signal.channels
95
96    @classmethod
97    def GenerateSilence(cls, duration=1000, sample_rate=48000):
98        """Generates silence.
99
100    This method can also be used to create a template AudioSegment instance.
101    A template can then be used with other Generate*() methods accepting an
102    AudioSegment instance as argument.
103
104    Args:
105      duration: duration in ms.
106      sample_rate: sample rate.
107
108    Returns:
109      AudioSegment instance.
110    """
111        return pydub.AudioSegment.silent(duration, sample_rate)
112
113    @classmethod
114    def GeneratePureTone(cls, template, frequency=440.0):
115        """Generates a pure tone.
116
117    The pure tone is generated with the same duration and in the same format of
118    the given template signal.
119
120    Args:
121      template: AudioSegment instance.
122      frequency: Frequency of the pure tone in Hz.
123
124    Return:
125      AudioSegment instance.
126    """
127        if frequency > template.frame_rate >> 1:
128            raise exceptions.SignalProcessingException('Invalid frequency')
129
130        generator = pydub.generators.Sine(sample_rate=template.frame_rate,
131                                          bit_depth=template.sample_width * 8,
132                                          freq=frequency)
133
134        return generator.to_audio_segment(duration=len(template), volume=0.0)
135
136    @classmethod
137    def GenerateWhiteNoise(cls, template):
138        """Generates white noise.
139
140    The white noise is generated with the same duration and in the same format
141    of the given template signal.
142
143    Args:
144      template: AudioSegment instance.
145
146    Return:
147      AudioSegment instance.
148    """
149        generator = pydub.generators.WhiteNoise(
150            sample_rate=template.frame_rate,
151            bit_depth=template.sample_width * 8)
152        return generator.to_audio_segment(duration=len(template), volume=0.0)
153
154    @classmethod
155    def AudioSegmentToRawData(cls, signal):
156        samples = signal.get_array_of_samples()
157        if samples.typecode != 'h':
158            raise exceptions.SignalProcessingException(
159                'Unsupported samples type')
160        return np.array(signal.get_array_of_samples(), np.int16)
161
162    @classmethod
163    def Fft(cls, signal, normalize=True):
164        if signal.channels != 1:
165            raise NotImplementedError('multiple-channel FFT not implemented')
166        x = cls.AudioSegmentToRawData(signal).astype(np.float32)
167        if normalize:
168            x /= max(abs(np.max(x)), 1.0)
169        y = scipy.fftpack.fft(x)
170        return y[:len(y) / 2]
171
172    @classmethod
173    def DetectHardClipping(cls, signal, threshold=2):
174        """Detects hard clipping.
175
176    Hard clipping is simply detected by counting samples that touch either the
177    lower or upper bound too many times in a row (according to |threshold|).
178    The presence of a single sequence of samples meeting such property is enough
179    to label the signal as hard clipped.
180
181    Args:
182      signal: AudioSegment instance.
183      threshold: minimum number of samples at full-scale in a row.
184
185    Returns:
186      True if hard clipping is detect, False otherwise.
187    """
188        if signal.channels != 1:
189            raise NotImplementedError(
190                'multiple-channel clipping not implemented')
191        if signal.sample_width != 2:  # Note that signal.sample_width is in bytes.
192            raise exceptions.SignalProcessingException(
193                'hard-clipping detection only supported for 16 bit samples')
194        samples = cls.AudioSegmentToRawData(signal)
195
196        # Detect adjacent clipped samples.
197        samples_type_info = np.iinfo(samples.dtype)
198        mask_min = samples == samples_type_info.min
199        mask_max = samples == samples_type_info.max
200
201        def HasLongSequence(vector, min_legth=threshold):
202            """Returns True if there are one or more long sequences of True flags."""
203            seq_length = 0
204            for b in vector:
205                seq_length = seq_length + 1 if b else 0
206                if seq_length >= min_legth:
207                    return True
208            return False
209
210        return HasLongSequence(mask_min) or HasLongSequence(mask_max)
211
212    @classmethod
213    def ApplyImpulseResponse(cls, signal, impulse_response):
214        """Applies an impulse response to a signal.
215
216    Args:
217      signal: AudioSegment instance.
218      impulse_response: list or numpy vector of float values.
219
220    Returns:
221      AudioSegment instance.
222    """
223        # Get samples.
224        assert signal.channels == 1, (
225            'multiple-channel recordings not supported')
226        samples = signal.get_array_of_samples()
227
228        # Convolve.
229        logging.info(
230            'applying %d order impulse response to a signal lasting %d ms',
231            len(impulse_response), len(signal))
232        convolved_samples = scipy.signal.fftconvolve(in1=samples,
233                                                     in2=impulse_response,
234                                                     mode='full').astype(
235                                                         np.int16)
236        logging.info('convolution computed')
237
238        # Cast.
239        convolved_samples = array.array(signal.array_type, convolved_samples)
240
241        # Verify.
242        logging.debug('signal length: %d samples', len(samples))
243        logging.debug('convolved signal length: %d samples',
244                      len(convolved_samples))
245        assert len(convolved_samples) > len(samples)
246
247        # Generate convolved signal AudioSegment instance.
248        convolved_signal = pydub.AudioSegment(data=convolved_samples,
249                                              metadata={
250                                                  'sample_width':
251                                                  signal.sample_width,
252                                                  'frame_rate':
253                                                  signal.frame_rate,
254                                                  'frame_width':
255                                                  signal.frame_width,
256                                                  'channels': signal.channels,
257                                              })
258        assert len(convolved_signal) > len(signal)
259
260        return convolved_signal
261
262    @classmethod
263    def Normalize(cls, signal):
264        """Normalizes a signal.
265
266    Args:
267      signal: AudioSegment instance.
268
269    Returns:
270      An AudioSegment instance.
271    """
272        return signal.apply_gain(-signal.max_dBFS)
273
274    @classmethod
275    def Copy(cls, signal):
276        """Makes a copy os a signal.
277
278    Args:
279      signal: AudioSegment instance.
280
281    Returns:
282      An AudioSegment instance.
283    """
284        return pydub.AudioSegment(data=signal.get_array_of_samples(),
285                                  metadata={
286                                      'sample_width': signal.sample_width,
287                                      'frame_rate': signal.frame_rate,
288                                      'frame_width': signal.frame_width,
289                                      'channels': signal.channels,
290                                  })
291
292    @classmethod
293    def MixSignals(cls,
294                   signal,
295                   noise,
296                   target_snr=0.0,
297                   pad_noise=MixPadding.NO_PADDING):
298        """Mixes |signal| and |noise| with a target SNR.
299
300    Mix |signal| and |noise| with a desired SNR by scaling |noise|.
301    If the target SNR is +/- infinite, a copy of signal/noise is returned.
302    If |signal| is shorter than |noise|, the length of the mix equals that of
303    |signal|. Otherwise, the mix length depends on whether padding is applied.
304    When padding is not applied, that is |pad_noise| is set to NO_PADDING
305    (default), the mix length equals that of |noise| - i.e., |signal| is
306    truncated. Otherwise, |noise| is extended and the resulting mix has the same
307    length of |signal|.
308
309    Args:
310      signal: AudioSegment instance (signal).
311      noise: AudioSegment instance (noise).
312      target_snr: float, numpy.Inf or -numpy.Inf (dB).
313      pad_noise: SignalProcessingUtils.MixPadding, default: NO_PADDING.
314
315    Returns:
316      An AudioSegment instance.
317    """
318        # Handle infinite target SNR.
319        if target_snr == -np.Inf:
320            # Return a copy of noise.
321            logging.warning('SNR = -Inf, returning noise')
322            return cls.Copy(noise)
323        elif target_snr == np.Inf:
324            # Return a copy of signal.
325            logging.warning('SNR = +Inf, returning signal')
326            return cls.Copy(signal)
327
328        # Check signal and noise power.
329        signal_power = float(signal.dBFS)
330        noise_power = float(noise.dBFS)
331        if signal_power == -np.Inf:
332            logging.error('signal has -Inf power, cannot mix')
333            raise exceptions.SignalProcessingException(
334                'cannot mix a signal with -Inf power')
335        if noise_power == -np.Inf:
336            logging.error('noise has -Inf power, cannot mix')
337            raise exceptions.SignalProcessingException(
338                'cannot mix a signal with -Inf power')
339
340        # Mix.
341        gain_db = signal_power - noise_power - target_snr
342        signal_duration = len(signal)
343        noise_duration = len(noise)
344        if signal_duration <= noise_duration:
345            # Ignore |pad_noise|, |noise| is truncated if longer that |signal|, the
346            # mix will have the same length of |signal|.
347            return signal.overlay(noise.apply_gain(gain_db))
348        elif pad_noise == cls.MixPadding.NO_PADDING:
349            # |signal| is longer than |noise|, but no padding is applied to |noise|.
350            # Truncate |signal|.
351            return noise.overlay(signal, gain_during_overlay=gain_db)
352        elif pad_noise == cls.MixPadding.ZERO_PADDING:
353            # TODO(alessiob): Check that this works as expected.
354            return signal.overlay(noise.apply_gain(gain_db))
355        elif pad_noise == cls.MixPadding.LOOP:
356            # |signal| is longer than |noise|, extend |noise| by looping.
357            return signal.overlay(noise.apply_gain(gain_db), loop=True)
358        else:
359            raise exceptions.SignalProcessingException('invalid padding type')
360