1#!/usr/bin/env python3 2"""Creating an asyncio generator for blocks of audio data. 3 4This example shows how a generator can be used to analyze audio input blocks. 5In addition, it shows how a generator can be created that yields not only input 6blocks but also output blocks where audio data can be written to. 7 8You need Python 3.7 or newer to run this. 9 10""" 11import asyncio 12import queue 13import sys 14 15import numpy as np 16import sounddevice as sd 17 18 19async def inputstream_generator(channels=1, **kwargs): 20 """Generator that yields blocks of input data as NumPy arrays.""" 21 q_in = asyncio.Queue() 22 loop = asyncio.get_event_loop() 23 24 def callback(indata, frame_count, time_info, status): 25 loop.call_soon_threadsafe(q_in.put_nowait, (indata.copy(), status)) 26 27 stream = sd.InputStream(callback=callback, channels=channels, **kwargs) 28 with stream: 29 while True: 30 indata, status = await q_in.get() 31 yield indata, status 32 33 34async def stream_generator(blocksize, *, channels=1, dtype='float32', 35 pre_fill_blocks=10, **kwargs): 36 """Generator that yields blocks of input/output data as NumPy arrays. 37 38 The output blocks are uninitialized and have to be filled with 39 appropriate audio signals. 40 41 """ 42 assert blocksize != 0 43 q_in = asyncio.Queue() 44 q_out = queue.Queue() 45 loop = asyncio.get_event_loop() 46 47 def callback(indata, outdata, frame_count, time_info, status): 48 loop.call_soon_threadsafe(q_in.put_nowait, (indata.copy(), status)) 49 outdata[:] = q_out.get_nowait() 50 51 # pre-fill output queue 52 for _ in range(pre_fill_blocks): 53 q_out.put(np.zeros((blocksize, channels), dtype=dtype)) 54 55 stream = sd.Stream(blocksize=blocksize, callback=callback, dtype=dtype, 56 channels=channels, **kwargs) 57 with stream: 58 while True: 59 indata, status = await q_in.get() 60 outdata = np.empty((blocksize, channels), dtype=dtype) 61 yield indata, outdata, status 62 q_out.put_nowait(outdata) 63 64 65async def print_input_infos(**kwargs): 66 """Show minimum and maximum value of each incoming audio block.""" 67 async for indata, status in inputstream_generator(**kwargs): 68 if status: 69 print(status) 70 print('min:', indata.min(), '\t', 'max:', indata.max()) 71 72 73async def wire_coro(**kwargs): 74 """Create a connection between audio inputs and outputs. 75 76 Asynchronously iterates over a stream generator and for each block 77 simply copies the input data into the output block. 78 79 """ 80 async for indata, outdata, status in stream_generator(**kwargs): 81 if status: 82 print(status) 83 outdata[:] = indata 84 85 86async def main(**kwargs): 87 print('Some informations about the input signal:') 88 try: 89 await asyncio.wait_for(print_input_infos(), timeout=2) 90 except asyncio.TimeoutError: 91 pass 92 print('\nEnough of that, activating wire ...\n') 93 audio_task = asyncio.create_task(wire_coro(**kwargs)) 94 for i in range(10, 0, -1): 95 print(i) 96 await asyncio.sleep(1) 97 audio_task.cancel() 98 try: 99 await audio_task 100 except asyncio.CancelledError: 101 print('\nwire was cancelled') 102 103 104if __name__ == "__main__": 105 try: 106 asyncio.run(main(blocksize=1024)) 107 except KeyboardInterrupt: 108 sys.exit('\nInterrupted by user') 109