1#
2# Copyright 2020 Ettus Research, a National Instruments Brand
3#
4# SPDX-License-Identifier: GPL-3.0-or-later
5#
6"""
7Measurement Device Class for UHD Power Calibration
8"""
9
10import sys
11import time
12import inspect
13import importlib
14import numpy
15import uhd
16from .tone_gen import ToneGenerator
17from .visa import get_visa_device
18from .ni_rf_instr import RFSADevice, RFSGDevice
19
20###############################################################################
21# Base Classes
22###############################################################################
23class PowerMeterBase:
24    """
25    Base class for measuring output power (Tx) of the USRP. That means the
26    measurement device is receiving and the USRP (the DUT) is transmitting.
27    """
28    def __init__(self, options):
29        self._options = options
30        self.power_offset = 0
31
32    def set_frequency(self, freq):
33        """
34        Set the frequency of the measurement device.
35        """
36        raise NotImplementedError()
37
38    def get_power(self):
39        """
40        Return the current measured power in dBm.
41        """
42        return self._get_power() + self.power_offset
43
44    def _get_power(self):
45        """
46        Return the current measured power in dBm.
47        """
48        raise NotImplementedError()
49
50class SignalGeneratorBase:
51    """
52    Base class for measuring input power (Rx) of the USRP. That means the
53    measurement device is transmitting and the USRP (the DUT) is receiving.
54    """
55    def __init__(self, options):
56        self._options = options
57        self.power_offset = 0
58        # Make sure to set this before doing RX cal
59        self.max_output_power = None
60
61    def enable(self, enable=True):
62        """
63        Turn on the power generator. By default, it should be off, and only
64        produce a signal when this was called with an argument value of 'True'.
65        """
66        raise NotImplementedError()
67
68    def set_power(self, power_dbm):
69        """
70        Set the input power of the DUT. This will factor in the power offset,
71        and set the measurement device to produce the power that will cause the
72        DUT to receive power_dbm.
73
74        This will coerce to the next possible power available and return the
75        coerced value.
76        """
77        assert self.max_output_power is not None
78        if power_dbm > self.max_output_power:
79            print("[SigGen] WARNING! Trying to set power beyond safe levels. "
80                  "Capping output power at {} dBm.".format(self.max_output_power))
81            power_dbm = self.max_output_power
82        return self._set_power(power_dbm + self.power_offset) - self.power_offset
83
84    def get_power(self):
85        """
86        Return the input power of the DUT.  This will factor in the power offset,
87        and will return the power level in dBm that is going into the DUT.
88        Use this with set_power(), as not all power levels can be reached.
89        """
90        return self._get_power() - self.power_offset
91
92    def set_frequency(self, freq):
93        """
94        Set the center frequency of the generated signal.
95        """
96        raise NotImplementedError()
97
98    def _set_power(self, power_dbm):
99        """
100        Set the output power of the device in dBm.
101        """
102        raise NotImplementedError()
103
104    def _get_power(self):
105        """
106        Return the output power of the measurement device.
107        """
108        raise NotImplementedError()
109
110###############################################################################
111# Manual Measurement: For masochists, or for small sample sets
112###############################################################################
113class ManualPowerMeter(PowerMeterBase):
114    """
115    Manual measurement: The script does nothing, it just asks the user to
116    manually make changes and return values
117    """
118    key = 'manual'
119
120    def set_frequency(self, freq):
121        """
122        Ask user to set frequency
123        """
124        input("[TX] Set your power meter to following frequency: "
125              "{:.3f} MHz, then hit Enter.".format(freq/1e6))
126
127    def _get_power(self):
128        """
129        Ask user for the power
130        """
131        num_tries = 5
132        for _ in range(num_tries):
133            try:
134                return float(input("[TX] Please enter the measured power in dBm: ")) \
135                       + self.power_offset
136            except ValueError:
137                continue
138        raise ValueError("Invalid power value entered.")
139
140class ManualPowerGenerator(SignalGeneratorBase):
141    """
142    Manual measurement: The script does nothing, it just asks the user to
143    manually make changes and return values
144    """
145    key = 'manual'
146    num_tries = 5
147
148    def enable(self, enable=True):
149        """
150        Ask the user to turn the device on or off
151        """
152        input("[RX] Please {} your signal generator and hit Enter."
153              .format("enable" if enable else "disable"))
154
155    def _set_power(self, power_dbm):
156        """
157        Ask for a power, or the closest, and return that
158        """
159        new_power = input(
160            "[RX] Set your signal generator to following output power: "
161            "{:.1f} dBm, then hit Enter, or enter the closest available power: "
162            .format(power_dbm))
163        if not new_power:
164            return power_dbm
165        for _ in range(self.num_tries):
166            try:
167                return float(new_power)
168            except ValueError:
169                new_power = input(
170                    "[RX] Set your signal generator to following output power: "
171                    "{:.1f} dBm, then hit Enter, or enter the closest available power: "
172                    .format(power_dbm))
173                if not new_power:
174                    return power_dbm
175        raise ValueError("Invalid power value entered.")
176
177    def _get_power(self):
178        """
179        Ask user for current power
180        """
181        for _ in range(self.num_tries):
182            try:
183                return float(input(
184                    "[RX] Please enter the output power in dBm of your "
185                    "signal generator: "))
186            except ValueError:
187                continue
188        raise ValueError("Invalid power value entered.")
189
190    # pylint: disable=no-self-use
191    def set_frequency(self, freq):
192        """
193        Set the center frequency of the generated signal.
194        """
195        input("[RX] Set your signal generator to following frequency: {:.3f} MHz, then hit Enter."
196              .format(freq/1e6))
197    # pylint: enable=no-self-use
198
199##############################################################################
200# RFSA: Run through a NI-RFSA device, using RFmx library
201###############################################################################
202class RfsaPowerMeter(PowerMeterBase):
203    """
204    Power meter using RFmx TXP measurement on NI-RFSA devices.
205    """
206    key = 'rfsa'
207
208    def __init__(self, options):
209        super().__init__(options)
210        self.device = RFSADevice(options)
211
212    def set_frequency(self, freq):
213        """
214        Set the frequency of the measurement device.
215        """
216        self.device.set_frequency(freq)
217
218    def _get_power(self):
219        """
220        Return the current measured power in dBm.
221        """
222        return self.device.get_power_dbm()
223
224##############################################################################
225# VISA: Run through a VISA device, using SCPI commands
226###############################################################################
227class VisaPowerMeter(PowerMeterBase):
228    """
229    VISA based Tx measurement device
230    """
231    DEFAULT_VISA_LIB = '@py' # pyvisa-py
232    DEFAULT_VISA_QUERY = "?*::INSTR"
233
234    key = 'visa'
235
236    def __init__(self, options):
237        super().__init__(options)
238        # pylint: disable=import-outside-toplevel
239        # We disable this warning because having pyvisa installed is not a
240        # requirement, so we want to load it as late as possible, and only when
241        # needed.
242        import pyvisa
243        # pylint: enable=import-outside-toplevel
244        visa_lib = options.get('visa_lib', self.DEFAULT_VISA_LIB)
245        visa_query = options.get('visa_query', self.DEFAULT_VISA_QUERY)
246        self._rm = pyvisa.ResourceManager(visa_lib)
247        resources = self._rm.list_resources(visa_query)
248        if len(resources) > 1:
249            print("Found VISA devices:")
250            for resource in resources:
251                print("*" + resource)
252            raise RuntimeError(
253                "Found more than one measurement device. Please limit the query!")
254        if len(resources) == 0:
255            raise RuntimeError("No measurement device found!")
256        self._res = self._rm.open_resource(resources[0])
257        self.visa = get_visa_device(self._res, resources[0], options)
258        self.visa.init_power_meter()
259
260    def set_frequency(self, freq):
261        """
262        Set frequency
263        """
264        self.visa.set_frequency(freq)
265
266    def _get_power(self):
267        """
268        Get power
269        """
270        return self.visa.get_power_dbm()
271
272###############################################################################
273# USRP: Use a pre-calibrated USRP as a measurement device
274###############################################################################
275class USRPPowerGenerator(SignalGeneratorBase):
276    """
277    The power generator is actually a USRP. This only works if the USRP that is
278    used for power/signal generation has been previously calbrated itself.
279    """
280    key = 'usrp'
281
282    def __init__(self, options):
283        super().__init__(options)
284        usrp_args = options.get('args')
285        if not usrp_args:
286            raise RuntimeError(
287                "Must specify args for USRP measurement device!")
288        self._usrp = uhd.usrp.MultiUSRP(usrp_args)
289        self._rate = float(options.get('rate', 5e6))
290        self._lo_offset = float(options.get('lo_offset', 0))
291        self._chan = int(options.get('chan', 0))
292        self._amplitude = float(options.get('ampl', 1/numpy.sqrt(2)))
293        self._pwr_dbfs = 20 * numpy.log10(self._amplitude)
294        self._tone_freq = 0
295        stream_args = uhd.usrp.StreamArgs('fc32', 'sc16')
296        stream_args.channels = [self._chan]
297        self._streamer = self._usrp.get_tx_stream(stream_args)
298        print("==== Creating USRP tone generator. Power offset:", self.power_offset)
299        self._tone_gen = ToneGenerator(self._rate, self._tone_freq, self._amplitude)
300        self._tone_gen.set_streamer(self._streamer)
301
302    def enable(self, enable=True):
303        """
304        Turn the tone generator on or off.
305        """
306        if enable:
307            print("[SigGen] Starting tone generator.")
308            self._tone_gen.start()
309        else:
310            print("[SigGen] Stopping tone generator.")
311            self._tone_gen.stop()
312        time.sleep(0.1) # Give it some time to spin down
313
314    def set_frequency(self, freq):
315        """
316        Set the center frequency of the generated signal.
317        """
318        print("[SigGen] Channel {}: Tuning signal to {:.3f} MHz."
319              .format(self._chan, freq/1e6))
320        tune_req = uhd.types.TuneRequest(freq, self._lo_offset)
321        self._usrp.set_tx_freq(tune_req, self._chan)
322
323    def _set_power(self, power_dbm):
324        """
325        Set the output power of the device in dBm.
326        """
327        self._usrp.set_tx_power_reference(power_dbm - self._pwr_dbfs, self._chan)
328        return self._get_power()
329
330    def _get_power(self):
331        """
332        Return the output power of the measurement device.
333        """
334        return self._usrp.get_tx_power_reference(self._chan) + self._pwr_dbfs
335
336###############################################################################
337# RFSG: NI signal generator family
338###############################################################################
339class RFSGPowerGenerator(SignalGeneratorBase):
340    """
341    Power Generator using NI-RFSG devices.
342    """
343    key = 'rfsg'
344
345    def __init__(self, options):
346        super().__init__(options)
347        self.device = RFSGDevice(options)
348
349    def enable(self, enable=True):
350        """
351        Turn tone generation on and off
352        """
353        self.device.enable(enable)
354
355    def set_frequency(self, freq):
356        """
357        Set the center frequency of the generated signal.
358        """
359        self.device.set_frequency(freq)
360
361    def _set_power(self, power_dbm):
362        """
363        Set the output power of the device in dBm.
364        """
365        return self.device.set_power(power_dbm)
366
367    def _get_power(self):
368        """
369        Get the output power of the device in dBm.
370        """
371        return self.device.get_power()
372
373###############################################################################
374# The dispatch function
375###############################################################################
376def get_meas_device(direction, dev_key, options):
377    """
378    Return the measurement device object
379    """
380    assert direction in ('tx', 'rx')
381    base_class = SignalGeneratorBase if direction == 'rx' else PowerMeterBase
382    opt_dict = {
383        k[0]: k[1] if len(k) > 1 else None for k in [x.split("=", 1) for x in options]
384    }
385    members = inspect.getmembers(sys.modules[__name__])
386    if 'import' in opt_dict:
387        try:
388            print("Loading external module: {}".format(opt_dict.get('import')))
389            external_module = importlib.import_module(opt_dict.get('import'))
390            members += inspect.getmembers(external_module)
391        except (ModuleNotFoundError, ImportError):
392            print("WARNING: Could not import module '{}'"
393                  .format(opt_dict.get('import')))
394    for _, obj in members:
395        try:
396            if issubclass(obj, base_class) and dev_key == getattr(obj, 'key', ''):
397                return obj(opt_dict)
398        except TypeError:
399            continue
400    raise RuntimeError("No {} found for key: {}".format(
401        "signal generator" if direction == "rx" else "power meter", dev_key))
402