1#!/usr/local/bin/python3.8
2#
3# Copyright 2017-2018 Ettus Research, a National Instruments Company
4#
5# SPDX-License-Identifier: GPL-3.0-or-later
6#
7"""
8Curses FFT example using Python API
9"""
10
11import argparse
12import curses as cs
13import numpy as np
14import uhd
15
16
17def parse_args():
18    """Parse the command line arguments"""
19    parser = argparse.ArgumentParser()
20    parser.add_argument("-a", "--args", default="", type=str)
21    parser.add_argument("-f", "--freq", type=float, required=True)
22    parser.add_argument("-r", "--rate", default=1e6, type=float)
23    parser.add_argument("-g", "--gain", type=int, default=10)
24    parser.add_argument("-c", "--channel", type=int, default=0)
25    parser.add_argument("-n", "--nsamps", type=int, default=100000)
26    parser.add_argument("--dyn", type=int, default=60)
27    parser.add_argument("--ref", type=int, default=0)
28    return parser.parse_args()
29
30
31def psd(nfft, samples):
32    """Return the power spectral density of `samples`"""
33    window = np.hamming(nfft)
34    result = np.multiply(window, samples)
35    result = np.fft.fftshift(np.fft.fft(result, nfft))
36    result = np.square(np.abs(result))
37    result = np.nan_to_num(10.0 * np.log10(result))
38    result = np.abs(result)
39    return result
40
41
42def clip(minval, maxval, value):
43    """Clip the value between a and b"""
44    return min(minval, max(maxval, value))
45
46
47def main():
48    """Create Curses display of FFT"""
49    args = parse_args()
50    usrp = uhd.usrp.MultiUSRP(args.args)
51
52    # Set the USRP rate, freq, and gain
53    usrp.set_rx_rate(args.rate, args.channel)
54    usrp.set_rx_freq(uhd.types.TuneRequest(args.freq), args.channel)
55    usrp.set_rx_gain(args.gain, args.channel)
56
57    # Initialize the curses screen
58    screen = cs.initscr()
59    cs.curs_set(0)
60    cs.noecho()
61    cs.cbreak()
62    screen.keypad(1)
63    height, width = screen.getmaxyx()
64
65    # Create a pad for the y-axis
66    y_axis_width = 10
67    y_axis = cs.newwin(height, y_axis_width, 0, 0)
68
69    # Create the buffer to recv samples
70    num_samps = max(args.nsamps, width)
71    samples = np.empty((1, num_samps), dtype=np.complex64)
72
73    st_args = uhd.usrp.StreamArgs("fc32", "sc16")
74    st_args.channels = [args.channel]
75
76    metadata = uhd.types.RXMetadata()
77    streamer = usrp.get_rx_stream(st_args)
78    buffer_samps = streamer.get_max_num_samps()
79    recv_buffer = np.zeros((1, buffer_samps), dtype=np.complex64)
80
81    stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
82    stream_cmd.stream_now = True
83    streamer.issue_stream_cmd(stream_cmd)
84
85    db_step = float(args.dyn) / (height - 1.0)
86    db_start = db_step * int((args.ref - args.dyn) / db_step)
87    db_stop = db_step * int(args.ref / db_step)
88
89    try:
90        while True:
91            # Resize the frequency plot on screen resize
92            screen.clear()
93            if cs.is_term_resized(height, width):
94                height, width = screen.getmaxyx()
95                cs.resizeterm(height, width)
96
97                db_step = float(args.dyn) / (height - 1.0)
98                db_start = db_step * int((args.ref - args.dyn) / db_step)
99                db_stop = db_step * int(args.ref / db_step)
100
101                y_axis.clear()
102
103            # Create the vertical (dBfs) axis
104            y_axis.addstr(0, 1, "{:> 6.2f} |-".format(db_stop))
105            for i in range(1, height - 1):
106                label = db_stop - db_step * i
107                y_axis.addstr(i, 1, "{:> 6.2f} |-".format(label))
108            try:
109                y_axis.addstr(height - 1, 1, "{:> 6.2f} |-".format(db_start))
110            except cs.error:
111                pass
112            y_axis.refresh()
113
114            # Receive the samples
115            recv_samps = 0
116            while recv_samps < num_samps:
117                samps = streamer.recv(recv_buffer, metadata)
118
119                if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
120                    print(metadata.strerror())
121                if samps:
122                    real_samps = min(num_samps - recv_samps, samps)
123                    samples[:, recv_samps:recv_samps + real_samps] = recv_buffer[:, 0:real_samps]
124                    recv_samps += real_samps
125
126            # Get the power in each bin
127            bins = psd(width, samples[args.channel][0:width])
128
129            for i in range(y_axis_width, width):
130                vertical_slot = clip(height, 0, np.int(bins[i] / db_step))
131                try:
132                    for j in range(vertical_slot, height):
133                        screen.addch(j, i, '*')
134                except cs.error:
135                    pass
136            screen.refresh()
137
138    except KeyboardInterrupt:
139        pass
140
141    stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
142    streamer.issue_stream_cmd(stream_cmd)
143
144    cs.curs_set(1)
145    cs.nocbreak()
146    screen.keypad(0)
147    cs.echo()
148    cs.endwin()
149
150
151if __name__ == "__main__":
152    main()
153