1#!/usr/local/bin/python3.8 2# 3# Copyright 2017-2018 Ettus Research, a National Instruments Company 4# 5# SPDX-License-Identifier: GPL-3.0-or-later 6# 7""" 8Curses FFT example using Python API 9""" 10 11import argparse 12import curses as cs 13import numpy as np 14import uhd 15 16 17def parse_args(): 18 """Parse the command line arguments""" 19 parser = argparse.ArgumentParser() 20 parser.add_argument("-a", "--args", default="", type=str) 21 parser.add_argument("-f", "--freq", type=float, required=True) 22 parser.add_argument("-r", "--rate", default=1e6, type=float) 23 parser.add_argument("-g", "--gain", type=int, default=10) 24 parser.add_argument("-c", "--channel", type=int, default=0) 25 parser.add_argument("-n", "--nsamps", type=int, default=100000) 26 parser.add_argument("--dyn", type=int, default=60) 27 parser.add_argument("--ref", type=int, default=0) 28 return parser.parse_args() 29 30 31def psd(nfft, samples): 32 """Return the power spectral density of `samples`""" 33 window = np.hamming(nfft) 34 result = np.multiply(window, samples) 35 result = np.fft.fftshift(np.fft.fft(result, nfft)) 36 result = np.square(np.abs(result)) 37 result = np.nan_to_num(10.0 * np.log10(result)) 38 result = np.abs(result) 39 return result 40 41 42def clip(minval, maxval, value): 43 """Clip the value between a and b""" 44 return min(minval, max(maxval, value)) 45 46 47def main(): 48 """Create Curses display of FFT""" 49 args = parse_args() 50 usrp = uhd.usrp.MultiUSRP(args.args) 51 52 # Set the USRP rate, freq, and gain 53 usrp.set_rx_rate(args.rate, args.channel) 54 usrp.set_rx_freq(uhd.types.TuneRequest(args.freq), args.channel) 55 usrp.set_rx_gain(args.gain, args.channel) 56 57 # Initialize the curses screen 58 screen = cs.initscr() 59 cs.curs_set(0) 60 cs.noecho() 61 cs.cbreak() 62 screen.keypad(1) 63 height, width = screen.getmaxyx() 64 65 # Create a pad for the y-axis 66 y_axis_width = 10 67 y_axis = cs.newwin(height, y_axis_width, 0, 0) 68 69 # Create the buffer to recv samples 70 num_samps = max(args.nsamps, width) 71 samples = np.empty((1, num_samps), dtype=np.complex64) 72 73 st_args = uhd.usrp.StreamArgs("fc32", "sc16") 74 st_args.channels = [args.channel] 75 76 metadata = uhd.types.RXMetadata() 77 streamer = usrp.get_rx_stream(st_args) 78 buffer_samps = streamer.get_max_num_samps() 79 recv_buffer = np.zeros((1, buffer_samps), dtype=np.complex64) 80 81 stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) 82 stream_cmd.stream_now = True 83 streamer.issue_stream_cmd(stream_cmd) 84 85 db_step = float(args.dyn) / (height - 1.0) 86 db_start = db_step * int((args.ref - args.dyn) / db_step) 87 db_stop = db_step * int(args.ref / db_step) 88 89 try: 90 while True: 91 # Resize the frequency plot on screen resize 92 screen.clear() 93 if cs.is_term_resized(height, width): 94 height, width = screen.getmaxyx() 95 cs.resizeterm(height, width) 96 97 db_step = float(args.dyn) / (height - 1.0) 98 db_start = db_step * int((args.ref - args.dyn) / db_step) 99 db_stop = db_step * int(args.ref / db_step) 100 101 y_axis.clear() 102 103 # Create the vertical (dBfs) axis 104 y_axis.addstr(0, 1, "{:> 6.2f} |-".format(db_stop)) 105 for i in range(1, height - 1): 106 label = db_stop - db_step * i 107 y_axis.addstr(i, 1, "{:> 6.2f} |-".format(label)) 108 try: 109 y_axis.addstr(height - 1, 1, "{:> 6.2f} |-".format(db_start)) 110 except cs.error: 111 pass 112 y_axis.refresh() 113 114 # Receive the samples 115 recv_samps = 0 116 while recv_samps < num_samps: 117 samps = streamer.recv(recv_buffer, metadata) 118 119 if metadata.error_code != uhd.types.RXMetadataErrorCode.none: 120 print(metadata.strerror()) 121 if samps: 122 real_samps = min(num_samps - recv_samps, samps) 123 samples[:, recv_samps:recv_samps + real_samps] = recv_buffer[:, 0:real_samps] 124 recv_samps += real_samps 125 126 # Get the power in each bin 127 bins = psd(width, samples[args.channel][0:width]) 128 129 for i in range(y_axis_width, width): 130 vertical_slot = clip(height, 0, np.int(bins[i] / db_step)) 131 try: 132 for j in range(vertical_slot, height): 133 screen.addch(j, i, '*') 134 except cs.error: 135 pass 136 screen.refresh() 137 138 except KeyboardInterrupt: 139 pass 140 141 stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) 142 streamer.issue_stream_cmd(stream_cmd) 143 144 cs.curs_set(1) 145 cs.nocbreak() 146 screen.keypad(0) 147 cs.echo() 148 cs.endwin() 149 150 151if __name__ == "__main__": 152 main() 153