1#
2# Copyright 2020 Ettus Research, a National Instruments Brand
3#
4# SPDX-License-Identifier: GPL-3.0-or-later
5#
6"""
7Provide a tone generator class for USRPs.
8"""
9
10import threading
11import numpy
12import uhd
13
14class ToneGenerator:
15    """
16    Class that can output a tone from a different thread until told to stop
17    """
18    def __init__(self, rate, freq, ampl, streamer=None):
19        self._streamer = streamer
20        self._buffer = uhd.dsp.signals.get_continuous_tone(rate, freq, ampl)
21        self._run = False
22        self._thread = None
23
24    def set_streamer(self, streamer):
25        """
26        Update streamer object
27        """
28        if self._run:
29            self.stop()
30        self._streamer = streamer
31
32    def start(self):
33        """
34        Spawn the thread in the background
35        """
36        if not self._streamer:
37            raise RuntimeError("No streamer defined!")
38        self._run = True
39        self._thread = threading.Thread(target=self._worker)
40        self._thread.start()
41        self._thread.setName("cal_tx")
42
43    def stop(self):
44        """
45        Stop the transmitter
46        """
47        self._run = False
48        self._thread.join()
49        self._thread = None
50
51    def _worker(self):
52        """ Here is where the action happens """
53        metadata = uhd.types.TXMetadata()
54        while self._run:
55            # Give it a long-ish timeout so we don't have to throttle in here
56            if self._streamer.send(self._buffer, metadata, 1.0) != len(self._buffer):
57                print("WARNING: Failed to transmit entire buffer in ToneGenerator!")
58        # Send an EOB packet with a single zero-valued sample to close out TX
59        metadata.end_of_burst = True
60        self._streamer.send(
61            numpy.array([0], dtype=numpy.complex64), metadata, 0.1)
62