1# 2# Copyright 2020 Ettus Research, a National Instruments Brand 3# 4# SPDX-License-Identifier: GPL-3.0-or-later 5# 6""" 7Measurement Device Class for UHD Power Calibration 8""" 9 10import sys 11import time 12import inspect 13import importlib 14import numpy 15import uhd 16from .tone_gen import ToneGenerator 17from .visa import get_visa_device 18from .ni_rf_instr import RFSADevice, RFSGDevice 19 20############################################################################### 21# Base Classes 22############################################################################### 23class PowerMeterBase: 24 """ 25 Base class for measuring output power (Tx) of the USRP. That means the 26 measurement device is receiving and the USRP (the DUT) is transmitting. 27 """ 28 def __init__(self, options): 29 self._options = options 30 self.power_offset = 0 31 32 def set_frequency(self, freq): 33 """ 34 Set the frequency of the measurement device. 35 """ 36 raise NotImplementedError() 37 38 def get_power(self): 39 """ 40 Return the current measured power in dBm. 41 """ 42 return self._get_power() + self.power_offset 43 44 def _get_power(self): 45 """ 46 Return the current measured power in dBm. 47 """ 48 raise NotImplementedError() 49 50class SignalGeneratorBase: 51 """ 52 Base class for measuring input power (Rx) of the USRP. That means the 53 measurement device is transmitting and the USRP (the DUT) is receiving. 54 """ 55 def __init__(self, options): 56 self._options = options 57 self.power_offset = 0 58 # Make sure to set this before doing RX cal 59 self.max_output_power = None 60 61 def enable(self, enable=True): 62 """ 63 Turn on the power generator. By default, it should be off, and only 64 produce a signal when this was called with an argument value of 'True'. 65 """ 66 raise NotImplementedError() 67 68 def set_power(self, power_dbm): 69 """ 70 Set the input power of the DUT. This will factor in the power offset, 71 and set the measurement device to produce the power that will cause the 72 DUT to receive power_dbm. 73 74 This will coerce to the next possible power available and return the 75 coerced value. 76 """ 77 assert self.max_output_power is not None 78 if power_dbm > self.max_output_power: 79 print("[SigGen] WARNING! Trying to set power beyond safe levels. " 80 "Capping output power at {} dBm.".format(self.max_output_power)) 81 power_dbm = self.max_output_power 82 return self._set_power(power_dbm + self.power_offset) - self.power_offset 83 84 def get_power(self): 85 """ 86 Return the input power of the DUT. This will factor in the power offset, 87 and will return the power level in dBm that is going into the DUT. 88 Use this with set_power(), as not all power levels can be reached. 89 """ 90 return self._get_power() - self.power_offset 91 92 def set_frequency(self, freq): 93 """ 94 Set the center frequency of the generated signal. 95 """ 96 raise NotImplementedError() 97 98 def _set_power(self, power_dbm): 99 """ 100 Set the output power of the device in dBm. 101 """ 102 raise NotImplementedError() 103 104 def _get_power(self): 105 """ 106 Return the output power of the measurement device. 107 """ 108 raise NotImplementedError() 109 110############################################################################### 111# Manual Measurement: For masochists, or for small sample sets 112############################################################################### 113class ManualPowerMeter(PowerMeterBase): 114 """ 115 Manual measurement: The script does nothing, it just asks the user to 116 manually make changes and return values 117 """ 118 key = 'manual' 119 120 def set_frequency(self, freq): 121 """ 122 Ask user to set frequency 123 """ 124 input("[TX] Set your power meter to following frequency: " 125 "{:.3f} MHz, then hit Enter.".format(freq/1e6)) 126 127 def _get_power(self): 128 """ 129 Ask user for the power 130 """ 131 num_tries = 5 132 for _ in range(num_tries): 133 try: 134 return float(input("[TX] Please enter the measured power in dBm: ")) \ 135 + self.power_offset 136 except ValueError: 137 continue 138 raise ValueError("Invalid power value entered.") 139 140class ManualPowerGenerator(SignalGeneratorBase): 141 """ 142 Manual measurement: The script does nothing, it just asks the user to 143 manually make changes and return values 144 """ 145 key = 'manual' 146 num_tries = 5 147 148 def enable(self, enable=True): 149 """ 150 Ask the user to turn the device on or off 151 """ 152 input("[RX] Please {} your signal generator and hit Enter." 153 .format("enable" if enable else "disable")) 154 155 def _set_power(self, power_dbm): 156 """ 157 Ask for a power, or the closest, and return that 158 """ 159 new_power = input( 160 "[RX] Set your signal generator to following output power: " 161 "{:.1f} dBm, then hit Enter, or enter the closest available power: " 162 .format(power_dbm)) 163 if not new_power: 164 return power_dbm 165 for _ in range(self.num_tries): 166 try: 167 return float(new_power) 168 except ValueError: 169 new_power = input( 170 "[RX] Set your signal generator to following output power: " 171 "{:.1f} dBm, then hit Enter, or enter the closest available power: " 172 .format(power_dbm)) 173 if not new_power: 174 return power_dbm 175 raise ValueError("Invalid power value entered.") 176 177 def _get_power(self): 178 """ 179 Ask user for current power 180 """ 181 for _ in range(self.num_tries): 182 try: 183 return float(input( 184 "[RX] Please enter the output power in dBm of your " 185 "signal generator: ")) 186 except ValueError: 187 continue 188 raise ValueError("Invalid power value entered.") 189 190 # pylint: disable=no-self-use 191 def set_frequency(self, freq): 192 """ 193 Set the center frequency of the generated signal. 194 """ 195 input("[RX] Set your signal generator to following frequency: {:.3f} MHz, then hit Enter." 196 .format(freq/1e6)) 197 # pylint: enable=no-self-use 198 199############################################################################## 200# RFSA: Run through a NI-RFSA device, using RFmx library 201############################################################################### 202class RfsaPowerMeter(PowerMeterBase): 203 """ 204 Power meter using RFmx TXP measurement on NI-RFSA devices. 205 """ 206 key = 'rfsa' 207 208 def __init__(self, options): 209 super().__init__(options) 210 self.device = RFSADevice(options) 211 212 def set_frequency(self, freq): 213 """ 214 Set the frequency of the measurement device. 215 """ 216 self.device.set_frequency(freq) 217 218 def _get_power(self): 219 """ 220 Return the current measured power in dBm. 221 """ 222 return self.device.get_power_dbm() 223 224############################################################################## 225# VISA: Run through a VISA device, using SCPI commands 226############################################################################### 227class VisaPowerMeter(PowerMeterBase): 228 """ 229 VISA based Tx measurement device 230 """ 231 DEFAULT_VISA_LIB = '@py' # pyvisa-py 232 DEFAULT_VISA_QUERY = "?*::INSTR" 233 234 key = 'visa' 235 236 def __init__(self, options): 237 super().__init__(options) 238 # pylint: disable=import-outside-toplevel 239 # We disable this warning because having pyvisa installed is not a 240 # requirement, so we want to load it as late as possible, and only when 241 # needed. 242 import pyvisa 243 # pylint: enable=import-outside-toplevel 244 visa_lib = options.get('visa_lib', self.DEFAULT_VISA_LIB) 245 visa_query = options.get('visa_query', self.DEFAULT_VISA_QUERY) 246 self._rm = pyvisa.ResourceManager(visa_lib) 247 resources = self._rm.list_resources(visa_query) 248 if len(resources) > 1: 249 print("Found VISA devices:") 250 for resource in resources: 251 print("*" + resource) 252 raise RuntimeError( 253 "Found more than one measurement device. Please limit the query!") 254 if len(resources) == 0: 255 raise RuntimeError("No measurement device found!") 256 self._res = self._rm.open_resource(resources[0]) 257 self.visa = get_visa_device(self._res, resources[0], options) 258 self.visa.init_power_meter() 259 260 def set_frequency(self, freq): 261 """ 262 Set frequency 263 """ 264 self.visa.set_frequency(freq) 265 266 def _get_power(self): 267 """ 268 Get power 269 """ 270 return self.visa.get_power_dbm() 271 272############################################################################### 273# USRP: Use a pre-calibrated USRP as a measurement device 274############################################################################### 275class USRPPowerGenerator(SignalGeneratorBase): 276 """ 277 The power generator is actually a USRP. This only works if the USRP that is 278 used for power/signal generation has been previously calbrated itself. 279 """ 280 key = 'usrp' 281 282 def __init__(self, options): 283 super().__init__(options) 284 usrp_args = options.get('args') 285 if not usrp_args: 286 raise RuntimeError( 287 "Must specify args for USRP measurement device!") 288 self._usrp = uhd.usrp.MultiUSRP(usrp_args) 289 self._rate = float(options.get('rate', 5e6)) 290 self._lo_offset = float(options.get('lo_offset', 0)) 291 self._chan = int(options.get('chan', 0)) 292 self._amplitude = float(options.get('ampl', 1/numpy.sqrt(2))) 293 self._pwr_dbfs = 20 * numpy.log10(self._amplitude) 294 self._tone_freq = 0 295 stream_args = uhd.usrp.StreamArgs('fc32', 'sc16') 296 stream_args.channels = [self._chan] 297 self._streamer = self._usrp.get_tx_stream(stream_args) 298 print("==== Creating USRP tone generator. Power offset:", self.power_offset) 299 self._tone_gen = ToneGenerator(self._rate, self._tone_freq, self._amplitude) 300 self._tone_gen.set_streamer(self._streamer) 301 302 def enable(self, enable=True): 303 """ 304 Turn the tone generator on or off. 305 """ 306 if enable: 307 print("[SigGen] Starting tone generator.") 308 self._tone_gen.start() 309 else: 310 print("[SigGen] Stopping tone generator.") 311 self._tone_gen.stop() 312 time.sleep(0.1) # Give it some time to spin down 313 314 def set_frequency(self, freq): 315 """ 316 Set the center frequency of the generated signal. 317 """ 318 print("[SigGen] Channel {}: Tuning signal to {:.3f} MHz." 319 .format(self._chan, freq/1e6)) 320 tune_req = uhd.types.TuneRequest(freq, self._lo_offset) 321 self._usrp.set_tx_freq(tune_req, self._chan) 322 323 def _set_power(self, power_dbm): 324 """ 325 Set the output power of the device in dBm. 326 """ 327 self._usrp.set_tx_power_reference(power_dbm - self._pwr_dbfs, self._chan) 328 return self._get_power() 329 330 def _get_power(self): 331 """ 332 Return the output power of the measurement device. 333 """ 334 return self._usrp.get_tx_power_reference(self._chan) + self._pwr_dbfs 335 336############################################################################### 337# RFSG: NI signal generator family 338############################################################################### 339class RFSGPowerGenerator(SignalGeneratorBase): 340 """ 341 Power Generator using NI-RFSG devices. 342 """ 343 key = 'rfsg' 344 345 def __init__(self, options): 346 super().__init__(options) 347 self.device = RFSGDevice(options) 348 349 def enable(self, enable=True): 350 """ 351 Turn tone generation on and off 352 """ 353 self.device.enable(enable) 354 355 def set_frequency(self, freq): 356 """ 357 Set the center frequency of the generated signal. 358 """ 359 self.device.set_frequency(freq) 360 361 def _set_power(self, power_dbm): 362 """ 363 Set the output power of the device in dBm. 364 """ 365 return self.device.set_power(power_dbm) 366 367 def _get_power(self): 368 """ 369 Get the output power of the device in dBm. 370 """ 371 return self.device.get_power() 372 373############################################################################### 374# The dispatch function 375############################################################################### 376def get_meas_device(direction, dev_key, options): 377 """ 378 Return the measurement device object 379 """ 380 assert direction in ('tx', 'rx') 381 base_class = SignalGeneratorBase if direction == 'rx' else PowerMeterBase 382 opt_dict = { 383 k[0]: k[1] if len(k) > 1 else None for k in [x.split("=", 1) for x in options] 384 } 385 members = inspect.getmembers(sys.modules[__name__]) 386 if 'import' in opt_dict: 387 try: 388 print("Loading external module: {}".format(opt_dict.get('import'))) 389 external_module = importlib.import_module(opt_dict.get('import')) 390 members += inspect.getmembers(external_module) 391 except (ModuleNotFoundError, ImportError): 392 print("WARNING: Could not import module '{}'" 393 .format(opt_dict.get('import'))) 394 for _, obj in members: 395 try: 396 if issubclass(obj, base_class) and dev_key == getattr(obj, 'key', ''): 397 return obj(opt_dict) 398 except TypeError: 399 continue 400 raise RuntimeError("No {} found for key: {}".format( 401 "signal generator" if direction == "rx" else "power meter", dev_key)) 402