1#!/usr/bin/env python3
2"""Creating an asyncio generator for blocks of audio data.
3
4This example shows how a generator can be used to analyze audio input blocks.
5In addition, it shows how a generator can be created that yields not only input
6blocks but also output blocks where audio data can be written to.
7
8You need Python 3.7 or newer to run this.
9
10"""
11import asyncio
12import queue
13import sys
14
15import numpy as np
16import sounddevice as sd
17
18
19async def inputstream_generator(channels=1, **kwargs):
20    """Generator that yields blocks of input data as NumPy arrays."""
21    q_in = asyncio.Queue()
22    loop = asyncio.get_event_loop()
23
24    def callback(indata, frame_count, time_info, status):
25        loop.call_soon_threadsafe(q_in.put_nowait, (indata.copy(), status))
26
27    stream = sd.InputStream(callback=callback, channels=channels, **kwargs)
28    with stream:
29        while True:
30            indata, status = await q_in.get()
31            yield indata, status
32
33
34async def stream_generator(blocksize, *, channels=1, dtype='float32',
35                           pre_fill_blocks=10, **kwargs):
36    """Generator that yields blocks of input/output data as NumPy arrays.
37
38    The output blocks are uninitialized and have to be filled with
39    appropriate audio signals.
40
41    """
42    assert blocksize != 0
43    q_in = asyncio.Queue()
44    q_out = queue.Queue()
45    loop = asyncio.get_event_loop()
46
47    def callback(indata, outdata, frame_count, time_info, status):
48        loop.call_soon_threadsafe(q_in.put_nowait, (indata.copy(), status))
49        outdata[:] = q_out.get_nowait()
50
51    # pre-fill output queue
52    for _ in range(pre_fill_blocks):
53        q_out.put(np.zeros((blocksize, channels), dtype=dtype))
54
55    stream = sd.Stream(blocksize=blocksize, callback=callback, dtype=dtype,
56                       channels=channels, **kwargs)
57    with stream:
58        while True:
59            indata, status = await q_in.get()
60            outdata = np.empty((blocksize, channels), dtype=dtype)
61            yield indata, outdata, status
62            q_out.put_nowait(outdata)
63
64
65async def print_input_infos(**kwargs):
66    """Show minimum and maximum value of each incoming audio block."""
67    async for indata, status in inputstream_generator(**kwargs):
68        if status:
69            print(status)
70        print('min:', indata.min(), '\t', 'max:', indata.max())
71
72
73async def wire_coro(**kwargs):
74    """Create a connection between audio inputs and outputs.
75
76    Asynchronously iterates over a stream generator and for each block
77    simply copies the input data into the output block.
78
79    """
80    async for indata, outdata, status in stream_generator(**kwargs):
81        if status:
82            print(status)
83        outdata[:] = indata
84
85
86async def main(**kwargs):
87    print('Some informations about the input signal:')
88    try:
89        await asyncio.wait_for(print_input_infos(), timeout=2)
90    except asyncio.TimeoutError:
91        pass
92    print('\nEnough of that, activating wire ...\n')
93    audio_task = asyncio.create_task(wire_coro(**kwargs))
94    for i in range(10, 0, -1):
95        print(i)
96        await asyncio.sleep(1)
97    audio_task.cancel()
98    try:
99        await audio_task
100    except asyncio.CancelledError:
101        print('\nwire was cancelled')
102
103
104if __name__ == "__main__":
105    try:
106        asyncio.run(main(blocksize=1024))
107    except KeyboardInterrupt:
108        sys.exit('\nInterrupted by user')
109