1# 2# Copyright 2020 Ettus Research, a National Instruments Brand 3# 4# SPDX-License-Identifier: GPL-3.0-or-later 5# 6""" 7Provide a tone generator class for USRPs. 8""" 9 10import threading 11import numpy 12import uhd 13 14class ToneGenerator: 15 """ 16 Class that can output a tone from a different thread until told to stop 17 """ 18 def __init__(self, rate, freq, ampl, streamer=None): 19 self._streamer = streamer 20 self._buffer = uhd.dsp.signals.get_continuous_tone(rate, freq, ampl) 21 self._run = False 22 self._thread = None 23 24 def set_streamer(self, streamer): 25 """ 26 Update streamer object 27 """ 28 if self._run: 29 self.stop() 30 self._streamer = streamer 31 32 def start(self): 33 """ 34 Spawn the thread in the background 35 """ 36 if not self._streamer: 37 raise RuntimeError("No streamer defined!") 38 self._run = True 39 self._thread = threading.Thread(target=self._worker) 40 self._thread.start() 41 self._thread.setName("cal_tx") 42 43 def stop(self): 44 """ 45 Stop the transmitter 46 """ 47 self._run = False 48 self._thread.join() 49 self._thread = None 50 51 def _worker(self): 52 """ Here is where the action happens """ 53 metadata = uhd.types.TXMetadata() 54 while self._run: 55 # Give it a long-ish timeout so we don't have to throttle in here 56 if self._streamer.send(self._buffer, metadata, 1.0) != len(self._buffer): 57 print("WARNING: Failed to transmit entire buffer in ToneGenerator!") 58 # Send an EOB packet with a single zero-valued sample to close out TX 59 metadata.end_of_burst = True 60 self._streamer.send( 61 numpy.array([0], dtype=numpy.complex64), metadata, 0.1) 62