1# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
2#
3# Use of this source code is governed by a BSD-style license
4# that can be found in the LICENSE file in the root of the source
5# tree. An additional intellectual property rights grant can be found
6# in the file PATENTS.  All contributing project authors may
7# be found in the AUTHORS file in the root of the source tree.
8"""Test data generators producing signals pairs intended to be used to
9test the APM module. Each pair consists of a noisy input and a reference signal.
10The former is used as APM input and it is generated by adding noise to a
11clean audio track. The reference is the expected APM output.
12
13Throughout this file, the following naming convention is used:
14  - input signal: the clean signal (e.g., speech),
15  - noise signal: the noise to be summed up to the input signal (e.g., white
16    noise, Gaussian noise),
17  - noisy signal: input + noise.
18The noise signal may or may not be a function of the clean signal. For
19instance, white noise is independently generated, whereas reverberation is
20obtained by convolving the input signal with an impulse response.
21"""
22
23import logging
24import os
25import shutil
26import sys
27
28try:
29    import scipy.io
30except ImportError:
31    logging.critical('Cannot import the third-party Python package scipy')
32    sys.exit(1)
33
34from . import data_access
35from . import exceptions
36from . import signal_processing
37
38
39class TestDataGenerator(object):
40    """Abstract class responsible for the generation of noisy signals.
41
42  Given a clean signal, it generates two streams named noisy signal and
43  reference. The former is the clean signal deteriorated by the noise source,
44  the latter goes through the same deterioration process, but more "gently".
45  Noisy signal and reference are produced so that the reference is the signal
46  expected at the output of the APM module when the latter is fed with the noisy
47  signal.
48
49  An test data generator generates one or more pairs.
50  """
51
52    NAME = None
53    REGISTERED_CLASSES = {}
54
55    def __init__(self, output_directory_prefix):
56        self._output_directory_prefix = output_directory_prefix
57        # Init dictionaries with one entry for each test data generator
58        # configuration (e.g., different SNRs).
59        # Noisy audio track files (stored separately in a cache folder).
60        self._noisy_signal_filepaths = None
61        # Path to be used for the APM simulation output files.
62        self._apm_output_paths = None
63        # Reference audio track files (stored separately in a cache folder).
64        self._reference_signal_filepaths = None
65        self.Clear()
66
67    @classmethod
68    def RegisterClass(cls, class_to_register):
69        """Registers a TestDataGenerator implementation.
70
71    Decorator to automatically register the classes that extend
72    TestDataGenerator.
73    Example usage:
74
75    @TestDataGenerator.RegisterClass
76    class IdentityGenerator(TestDataGenerator):
77      pass
78    """
79        cls.REGISTERED_CLASSES[class_to_register.NAME] = class_to_register
80        return class_to_register
81
82    @property
83    def config_names(self):
84        return self._noisy_signal_filepaths.keys()
85
86    @property
87    def noisy_signal_filepaths(self):
88        return self._noisy_signal_filepaths
89
90    @property
91    def apm_output_paths(self):
92        return self._apm_output_paths
93
94    @property
95    def reference_signal_filepaths(self):
96        return self._reference_signal_filepaths
97
98    def Generate(self, input_signal_filepath, test_data_cache_path,
99                 base_output_path):
100        """Generates a set of noisy input and reference audiotrack file pairs.
101
102    This method initializes an empty set of pairs and calls the _Generate()
103    method implemented in a concrete class.
104
105    Args:
106      input_signal_filepath: path to the clean input audio track file.
107      test_data_cache_path: path to the cache of the generated audio track
108                            files.
109      base_output_path: base path where output is written.
110    """
111        self.Clear()
112        self._Generate(input_signal_filepath, test_data_cache_path,
113                       base_output_path)
114
115    def Clear(self):
116        """Clears the generated output path dictionaries.
117    """
118        self._noisy_signal_filepaths = {}
119        self._apm_output_paths = {}
120        self._reference_signal_filepaths = {}
121
122    def _Generate(self, input_signal_filepath, test_data_cache_path,
123                  base_output_path):
124        """Abstract method to be implemented in each concrete class.
125    """
126        raise NotImplementedError()
127
128    def _AddNoiseSnrPairs(self, base_output_path, noisy_mix_filepaths,
129                          snr_value_pairs):
130        """Adds noisy-reference signal pairs.
131
132    Args:
133      base_output_path: noisy tracks base output path.
134      noisy_mix_filepaths: nested dictionary of noisy signal paths organized
135                           by noisy track name and SNR level.
136      snr_value_pairs: list of SNR pairs.
137    """
138        for noise_track_name in noisy_mix_filepaths:
139            for snr_noisy, snr_refence in snr_value_pairs:
140                config_name = '{0}_{1:d}_{2:d}_SNR'.format(
141                    noise_track_name, snr_noisy, snr_refence)
142                output_path = self._MakeDir(base_output_path, config_name)
143                self._AddNoiseReferenceFilesPair(
144                    config_name=config_name,
145                    noisy_signal_filepath=noisy_mix_filepaths[noise_track_name]
146                    [snr_noisy],
147                    reference_signal_filepath=noisy_mix_filepaths[
148                        noise_track_name][snr_refence],
149                    output_path=output_path)
150
151    def _AddNoiseReferenceFilesPair(self, config_name, noisy_signal_filepath,
152                                    reference_signal_filepath, output_path):
153        """Adds one noisy-reference signal pair.
154
155    Args:
156      config_name: name of the APM configuration.
157      noisy_signal_filepath: path to noisy audio track file.
158      reference_signal_filepath: path to reference audio track file.
159      output_path: APM output path.
160    """
161        assert config_name not in self._noisy_signal_filepaths
162        self._noisy_signal_filepaths[config_name] = os.path.abspath(
163            noisy_signal_filepath)
164        self._apm_output_paths[config_name] = os.path.abspath(output_path)
165        self._reference_signal_filepaths[config_name] = os.path.abspath(
166            reference_signal_filepath)
167
168    def _MakeDir(self, base_output_path, test_data_generator_config_name):
169        output_path = os.path.join(
170            base_output_path,
171            self._output_directory_prefix + test_data_generator_config_name)
172        data_access.MakeDirectory(output_path)
173        return output_path
174
175
176@TestDataGenerator.RegisterClass
177class IdentityTestDataGenerator(TestDataGenerator):
178    """Generator that adds no noise.
179
180  Both the noisy and the reference signals are the input signal.
181  """
182
183    NAME = 'identity'
184
185    def __init__(self, output_directory_prefix, copy_with_identity):
186        TestDataGenerator.__init__(self, output_directory_prefix)
187        self._copy_with_identity = copy_with_identity
188
189    @property
190    def copy_with_identity(self):
191        return self._copy_with_identity
192
193    def _Generate(self, input_signal_filepath, test_data_cache_path,
194                  base_output_path):
195        config_name = 'default'
196        output_path = self._MakeDir(base_output_path, config_name)
197
198        if self._copy_with_identity:
199            input_signal_filepath_new = os.path.join(
200                test_data_cache_path,
201                os.path.split(input_signal_filepath)[1])
202            logging.info('copying ' + input_signal_filepath + ' to ' +
203                         (input_signal_filepath_new))
204            shutil.copy(input_signal_filepath, input_signal_filepath_new)
205            input_signal_filepath = input_signal_filepath_new
206
207        self._AddNoiseReferenceFilesPair(
208            config_name=config_name,
209            noisy_signal_filepath=input_signal_filepath,
210            reference_signal_filepath=input_signal_filepath,
211            output_path=output_path)
212
213
214@TestDataGenerator.RegisterClass
215class WhiteNoiseTestDataGenerator(TestDataGenerator):
216    """Generator that adds white noise.
217  """
218
219    NAME = 'white_noise'
220
221    # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs.
222    # The reference (second value of each pair) always has a lower amount of noise
223    # - i.e., the SNR is 10 dB higher.
224    _SNR_VALUE_PAIRS = [
225        [20, 30],  # Smallest noise.
226        [10, 20],
227        [5, 15],
228        [0, 10],  # Largest noise.
229    ]
230
231    _NOISY_SIGNAL_FILENAME_TEMPLATE = 'noise_{0:d}_SNR.wav'
232
233    def __init__(self, output_directory_prefix):
234        TestDataGenerator.__init__(self, output_directory_prefix)
235
236    def _Generate(self, input_signal_filepath, test_data_cache_path,
237                  base_output_path):
238        # Load the input signal.
239        input_signal = signal_processing.SignalProcessingUtils.LoadWav(
240            input_signal_filepath)
241
242        # Create the noise track.
243        noise_signal = signal_processing.SignalProcessingUtils.GenerateWhiteNoise(
244            input_signal)
245
246        # Create the noisy mixes (once for each unique SNR value).
247        noisy_mix_filepaths = {}
248        snr_values = set(
249            [snr for pair in self._SNR_VALUE_PAIRS for snr in pair])
250        for snr in snr_values:
251            noisy_signal_filepath = os.path.join(
252                test_data_cache_path,
253                self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(snr))
254
255            # Create and save if not done.
256            if not os.path.exists(noisy_signal_filepath):
257                # Create noisy signal.
258                noisy_signal = signal_processing.SignalProcessingUtils.MixSignals(
259                    input_signal, noise_signal, snr)
260
261                # Save.
262                signal_processing.SignalProcessingUtils.SaveWav(
263                    noisy_signal_filepath, noisy_signal)
264
265            # Add file to the collection of mixes.
266            noisy_mix_filepaths[snr] = noisy_signal_filepath
267
268        # Add all the noisy-reference signal pairs.
269        for snr_noisy, snr_refence in self._SNR_VALUE_PAIRS:
270            config_name = '{0:d}_{1:d}_SNR'.format(snr_noisy, snr_refence)
271            output_path = self._MakeDir(base_output_path, config_name)
272            self._AddNoiseReferenceFilesPair(
273                config_name=config_name,
274                noisy_signal_filepath=noisy_mix_filepaths[snr_noisy],
275                reference_signal_filepath=noisy_mix_filepaths[snr_refence],
276                output_path=output_path)
277
278
279# TODO(alessiob): remove comment when class implemented.
280# @TestDataGenerator.RegisterClass
281class NarrowBandNoiseTestDataGenerator(TestDataGenerator):
282    """Generator that adds narrow-band noise.
283  """
284
285    NAME = 'narrow_band_noise'
286
287    def __init__(self, output_directory_prefix):
288        TestDataGenerator.__init__(self, output_directory_prefix)
289
290    def _Generate(self, input_signal_filepath, test_data_cache_path,
291                  base_output_path):
292        # TODO(alessiob): implement.
293        pass
294
295
296@TestDataGenerator.RegisterClass
297class AdditiveNoiseTestDataGenerator(TestDataGenerator):
298    """Generator that adds noise loops.
299
300  This generator uses all the wav files in a given path (default: noise_tracks/)
301  and mixes them to the clean speech with different target SNRs (hard-coded).
302  """
303
304    NAME = 'additive_noise'
305    _NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav'
306
307    DEFAULT_NOISE_TRACKS_PATH = os.path.join(os.path.dirname(__file__),
308                                             os.pardir, 'noise_tracks')
309
310    # TODO(alessiob): Make the list of SNR pairs customizable.
311    # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs.
312    # The reference (second value of each pair) always has a lower amount of noise
313    # - i.e., the SNR is 10 dB higher.
314    _SNR_VALUE_PAIRS = [
315        [20, 30],  # Smallest noise.
316        [10, 20],
317        [5, 15],
318        [0, 10],  # Largest noise.
319    ]
320
321    def __init__(self, output_directory_prefix, noise_tracks_path):
322        TestDataGenerator.__init__(self, output_directory_prefix)
323        self._noise_tracks_path = noise_tracks_path
324        self._noise_tracks_file_names = [
325            n for n in os.listdir(self._noise_tracks_path)
326            if n.lower().endswith('.wav')
327        ]
328        if len(self._noise_tracks_file_names) == 0:
329            raise exceptions.InitializationException(
330                'No wav files found in the noise tracks path %s' %
331                (self._noise_tracks_path))
332
333    def _Generate(self, input_signal_filepath, test_data_cache_path,
334                  base_output_path):
335        """Generates test data pairs using environmental noise.
336
337    For each noise track and pair of SNR values, the following two audio tracks
338    are created: the noisy signal and the reference signal. The former is
339    obtained by mixing the (clean) input signal to the corresponding noise
340    track enforcing the target SNR.
341    """
342        # Init.
343        snr_values = set(
344            [snr for pair in self._SNR_VALUE_PAIRS for snr in pair])
345
346        # Load the input signal.
347        input_signal = signal_processing.SignalProcessingUtils.LoadWav(
348            input_signal_filepath)
349
350        noisy_mix_filepaths = {}
351        for noise_track_filename in self._noise_tracks_file_names:
352            # Load the noise track.
353            noise_track_name, _ = os.path.splitext(noise_track_filename)
354            noise_track_filepath = os.path.join(self._noise_tracks_path,
355                                                noise_track_filename)
356            if not os.path.exists(noise_track_filepath):
357                logging.error('cannot find the <%s> noise track',
358                              noise_track_filename)
359                raise exceptions.FileNotFoundError()
360
361            noise_signal = signal_processing.SignalProcessingUtils.LoadWav(
362                noise_track_filepath)
363
364            # Create the noisy mixes (once for each unique SNR value).
365            noisy_mix_filepaths[noise_track_name] = {}
366            for snr in snr_values:
367                noisy_signal_filepath = os.path.join(
368                    test_data_cache_path,
369                    self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(
370                        noise_track_name, snr))
371
372                # Create and save if not done.
373                if not os.path.exists(noisy_signal_filepath):
374                    # Create noisy signal.
375                    noisy_signal = signal_processing.SignalProcessingUtils.MixSignals(
376                        input_signal,
377                        noise_signal,
378                        snr,
379                        pad_noise=signal_processing.SignalProcessingUtils.
380                        MixPadding.LOOP)
381
382                    # Save.
383                    signal_processing.SignalProcessingUtils.SaveWav(
384                        noisy_signal_filepath, noisy_signal)
385
386                # Add file to the collection of mixes.
387                noisy_mix_filepaths[noise_track_name][
388                    snr] = noisy_signal_filepath
389
390        # Add all the noise-SNR pairs.
391        self._AddNoiseSnrPairs(base_output_path, noisy_mix_filepaths,
392                               self._SNR_VALUE_PAIRS)
393
394
395@TestDataGenerator.RegisterClass
396class ReverberationTestDataGenerator(TestDataGenerator):
397    """Generator that adds reverberation noise.
398
399  TODO(alessiob): Make this class more generic since the impulse response can be
400  anything (not just reverberation); call it e.g.,
401  ConvolutionalNoiseTestDataGenerator.
402  """
403
404    NAME = 'reverberation'
405
406    _IMPULSE_RESPONSES = {
407        'lecture': 'air_binaural_lecture_0_0_1.mat',  # Long echo.
408        'booth': 'air_binaural_booth_0_0_1.mat',  # Short echo.
409    }
410    _MAX_IMPULSE_RESPONSE_LENGTH = None
411
412    # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs.
413    # The reference (second value of each pair) always has a lower amount of noise
414    # - i.e., the SNR is 5 dB higher.
415    _SNR_VALUE_PAIRS = [
416        [3, 8],  # Smallest noise.
417        [-3, 2],  # Largest noise.
418    ]
419
420    _NOISE_TRACK_FILENAME_TEMPLATE = '{0}.wav'
421    _NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav'
422
423    def __init__(self, output_directory_prefix, aechen_ir_database_path):
424        TestDataGenerator.__init__(self, output_directory_prefix)
425        self._aechen_ir_database_path = aechen_ir_database_path
426
427    def _Generate(self, input_signal_filepath, test_data_cache_path,
428                  base_output_path):
429        """Generates test data pairs using reverberation noise.
430
431    For each impulse response, one noise track is created. For each impulse
432    response and pair of SNR values, the following 2 audio tracks are
433    created: the noisy signal and the reference signal. The former is
434    obtained by mixing the (clean) input signal to the corresponding noise
435    track enforcing the target SNR.
436    """
437        # Init.
438        snr_values = set(
439            [snr for pair in self._SNR_VALUE_PAIRS for snr in pair])
440
441        # Load the input signal.
442        input_signal = signal_processing.SignalProcessingUtils.LoadWav(
443            input_signal_filepath)
444
445        noisy_mix_filepaths = {}
446        for impulse_response_name in self._IMPULSE_RESPONSES:
447            noise_track_filename = self._NOISE_TRACK_FILENAME_TEMPLATE.format(
448                impulse_response_name)
449            noise_track_filepath = os.path.join(test_data_cache_path,
450                                                noise_track_filename)
451            noise_signal = None
452            try:
453                # Load noise track.
454                noise_signal = signal_processing.SignalProcessingUtils.LoadWav(
455                    noise_track_filepath)
456            except exceptions.FileNotFoundError:
457                # Generate noise track by applying the impulse response.
458                impulse_response_filepath = os.path.join(
459                    self._aechen_ir_database_path,
460                    self._IMPULSE_RESPONSES[impulse_response_name])
461                noise_signal = self._GenerateNoiseTrack(
462                    noise_track_filepath, input_signal,
463                    impulse_response_filepath)
464            assert noise_signal is not None
465
466            # Create the noisy mixes (once for each unique SNR value).
467            noisy_mix_filepaths[impulse_response_name] = {}
468            for snr in snr_values:
469                noisy_signal_filepath = os.path.join(
470                    test_data_cache_path,
471                    self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(
472                        impulse_response_name, snr))
473
474                # Create and save if not done.
475                if not os.path.exists(noisy_signal_filepath):
476                    # Create noisy signal.
477                    noisy_signal = signal_processing.SignalProcessingUtils.MixSignals(
478                        input_signal, noise_signal, snr)
479
480                    # Save.
481                    signal_processing.SignalProcessingUtils.SaveWav(
482                        noisy_signal_filepath, noisy_signal)
483
484                # Add file to the collection of mixes.
485                noisy_mix_filepaths[impulse_response_name][
486                    snr] = noisy_signal_filepath
487
488        # Add all the noise-SNR pairs.
489        self._AddNoiseSnrPairs(base_output_path, noisy_mix_filepaths,
490                               self._SNR_VALUE_PAIRS)
491
492    def _GenerateNoiseTrack(self, noise_track_filepath, input_signal,
493                            impulse_response_filepath):
494        """Generates noise track.
495
496    Generate a signal by convolving input_signal with the impulse response in
497    impulse_response_filepath; then save to noise_track_filepath.
498
499    Args:
500      noise_track_filepath: output file path for the noise track.
501      input_signal: (clean) input signal samples.
502      impulse_response_filepath: impulse response file path.
503
504    Returns:
505      AudioSegment instance.
506    """
507        # Load impulse response.
508        data = scipy.io.loadmat(impulse_response_filepath)
509        impulse_response = data['h_air'].flatten()
510        if self._MAX_IMPULSE_RESPONSE_LENGTH is not None:
511            logging.info('truncating impulse response from %d to %d samples',
512                         len(impulse_response),
513                         self._MAX_IMPULSE_RESPONSE_LENGTH)
514            impulse_response = impulse_response[:self.
515                                                _MAX_IMPULSE_RESPONSE_LENGTH]
516
517        # Apply impulse response.
518        processed_signal = (
519            signal_processing.SignalProcessingUtils.ApplyImpulseResponse(
520                input_signal, impulse_response))
521
522        # Save.
523        signal_processing.SignalProcessingUtils.SaveWav(
524            noise_track_filepath, processed_signal)
525
526        return processed_signal
527