1#!/usr/bin/env python3
2
3import sys, math, logging, collections, collections.abc, itertools
4
5import SoapySDR
6import numpy
7
8__version__ = '1.5.1'
9logger = logging.getLogger(__name__)
10
11
12def closest(num_list, num):
13    """Return number closest to supplied number from list of numbers"""
14    return min(num_list, key=lambda x: abs(x - num))
15
16
17def detect_devices(soapy_args='', as_string=False):
18    """Detect connected SoapySDR devices"""
19    devices = [dict(d) for d in SoapySDR.Device.enumerate(soapy_args)]
20
21    if not as_string:
22        return devices
23    else:
24        devices_str = []
25        for d in devices:
26            d_str = []
27            d_str.append('driver={}'.format(d['driver']))
28            if d['driver'] == 'remote':
29                d_str.append('remote:driver={}'.format(d['remote:driver']))
30                d_str.append('remote={}'.format(d['remote']))
31            if 'serial' in d:
32                d_str.append('serial={}'.format(d['serial']))
33            if 'device_id' in d:
34                d_str.append('device_id={}'.format(d['device_id']))
35            if 'rtl' in d:
36                d_str.append('rtl={}'.format(d['rtl']))
37            if 'label' in d:
38                d_str.append('label={}'.format(d['label']))
39            devices_str.append(', '.join(d_str))
40        return devices_str
41
42
43class Ranges(collections.abc.Sequence):
44    """List of (minimum, maximum) tuples specifying ranges"""
45    def __init__(self, list_of_tuples):
46        self._ranges = list(list_of_tuples)
47
48    def closest(self, num):
49        """Return number closest to supplied number from available ranges"""
50        if num in self:
51            return num
52        else:
53            edges = itertools.chain.from_iterable(self)
54            return closest(edges, num)
55
56    def __contains__(self, num):
57        for r in self._ranges:
58            if num >= r[0] and num <= r[1]:
59                return True
60
61    def __getitem__(self, key):
62        return self._ranges[key]
63
64    def __len__(self):
65        return len(self._ranges)
66
67    def __repr__(self):
68        return 'Ranges({})'.format(repr(self._ranges))
69
70
71class SoapyDevice:
72    """Simple wrapper for SoapySDR"""
73    default_buffer_size = 8192
74
75    def __init__(self, soapy_args='', sample_rate=0, bandwidth=0, corr=0, gain=None, auto_gain=False,
76                 channel=0, antenna='', settings=None, force_sample_rate=False, force_bandwidth=False,
77                 buffer_size=0, stream_args=None):
78        self.device = SoapySDR.Device(soapy_args)
79        self.buffer = None
80        self.buffer_size = buffer_size
81        self.buffer_overflow_count = 0
82        self.stream = None
83        self.stream_args = stream_args
84        self.stream_timeout = 0
85
86        self._hardware = self.device.getHardwareKey()
87        self._channel = None
88        self._freq = None
89        self._sample_rate = None
90        self._bandwidth = None
91        self._corr = None
92        self._gain = None
93        self._auto_gain = None
94        self._antenna = None
95
96        self.channel = channel
97        self.force_sample_rate = force_sample_rate
98        self.force_bandwidth = force_bandwidth
99
100        self._fix_hardware_quirks()
101
102        if sample_rate:
103            self.sample_rate = sample_rate
104
105        if bandwidth:
106            self.bandwidth = bandwidth
107
108        if corr:
109            self.corr = corr
110
111        if gain and isinstance(gain, dict):
112            for amp_name, value in gain.items():
113                self.set_gain(amp_name, value)
114        elif gain is not None:
115            self.gain = gain
116
117        if auto_gain:
118            self.auto_gain = auto_gain
119
120        if antenna:
121            self.antenna = antenna
122
123        if settings:
124            for setting_name, value in settings.items():
125                self.set_setting(setting_name, value)
126
127    def _fix_hardware_quirks(self):
128        """Apply some settings to fix quirks of specific hardware"""
129        if self.hardware == 'LimeSDR-USB':
130            logger.debug('Applying fixes for LimeSDR-USB quirks...')
131            # LimeSDR driver doesn't provide useful list of allowed sample rates
132            self.force_sample_rate = True
133
134    @property
135    def hardware(self):
136        """Type of SDR hardware (read-only)"""
137        return self._hardware
138
139    @property
140    def is_streaming(self):
141        """Has been start_stream() already called? (read-only)"""
142        return bool(self.stream)
143
144    @property
145    def channel(self):
146        """RX channel number"""
147        return self._channel
148
149    @channel.setter
150    def channel(self, channel):
151        """Set RX channel number"""
152        if channel in self.list_channels():
153            self._channel = channel
154        else:
155            logger.warning('Incorrect RX channel number, using channel 0 instead!')
156            self._channel = 0
157
158    @property
159    def freq(self):
160        """Center frequency [Hz]"""
161        return self.device.getFrequency(SoapySDR.SOAPY_SDR_RX, self._channel)
162
163    @freq.setter
164    def freq(self, freq):
165        """Set center frequency [Hz]"""
166        freq_range = self.get_frequency_range()
167        if freq < freq_range[0] or freq > freq_range[1]:
168            raise ValueError('Center frequency out of range ({}, {})!'.format(
169                freq_range[0], freq_range[1]
170            ))
171
172        self._freq = freq
173        self.device.setFrequency(SoapySDR.SOAPY_SDR_RX, self._channel, freq)
174
175    @property
176    def sample_rate(self):
177        """Sample rate [Hz]"""
178        return self.device.getSampleRate(SoapySDR.SOAPY_SDR_RX, self._channel)
179
180    @sample_rate.setter
181    def sample_rate(self, sample_rate):
182        """Set sample rate [Hz]"""
183        if self.force_sample_rate:
184            real_sample_rate = sample_rate
185        else:
186            rate_ranges = self.list_sample_rates()
187            real_sample_rate = rate_ranges.closest(sample_rate)
188            if sample_rate != real_sample_rate:
189                logger.warning('Sample rate {} Hz is not supported, setting it to {} Hz!'.format(
190                    sample_rate, real_sample_rate
191                ))
192
193        self._sample_rate = real_sample_rate
194        self.device.setSampleRate(SoapySDR.SOAPY_SDR_RX, self._channel, real_sample_rate)
195
196    @property
197    def bandwidth(self):
198        """Filter bandwidth [Hz]"""
199        return self.device.getBandwidth(SoapySDR.SOAPY_SDR_RX, self._channel)
200
201    @bandwidth.setter
202    def bandwidth(self, bandwidth):
203        """Set filter bandwidth [Hz]"""
204        if self.force_bandwidth:
205            real_bandwidth = bandwidth
206        else:
207            band_ranges = self.list_bandwidths()
208            if not band_ranges:
209                logger.warning('Device does not support setting filter bandwidth!')
210                return
211
212            real_bandwidth = band_ranges.closest(bandwidth)
213            if bandwidth != real_bandwidth:
214                logger.warning('Filter bandwidth {} Hz is not supported, setting it to {} Hz!'.format(
215                    bandwidth, real_bandwidth
216                ))
217
218        self._bandwidth = real_bandwidth
219        self.device.setBandwidth(SoapySDR.SOAPY_SDR_RX, self._channel, real_bandwidth)
220
221    @property
222    def gain(self):
223        """Gain [dB]"""
224        return self.device.getGain(SoapySDR.SOAPY_SDR_RX, self._channel)
225
226    @gain.setter
227    def gain(self, gain):
228        """Set gain [dB]"""
229        gain_range = self.get_gain_range()
230        if gain < gain_range[0] or gain > gain_range[1]:
231            raise ValueError('Gain out of range ({}, {})!'.format(
232                gain_range[0], gain_range[1]
233            ))
234
235        self._gain = gain
236        self.device.setGain(SoapySDR.SOAPY_SDR_RX, self._channel, gain)
237
238    @property
239    def auto_gain(self):
240        """Automatic Gain Control"""
241        return self.device.getGainMode(SoapySDR.SOAPY_SDR_RX, self._channel)
242
243    @auto_gain.setter
244    def auto_gain(self, auto_gain):
245        """Set Automatic Gain Control"""
246        if not self.device.hasGainMode(SoapySDR.SOAPY_SDR_RX, self._channel):
247            logger.warning('Device does not support Automatic Gain Control!')
248            return
249
250        self._auto_gain = auto_gain
251        self.device.setGainMode(SoapySDR.SOAPY_SDR_RX, self._channel, auto_gain)
252
253    @property
254    def antenna(self):
255        """Selected antenna"""
256        return self.device.getAntenna(SoapySDR.SOAPY_SDR_RX, self._channel)
257
258    @antenna.setter
259    def antenna(self, antenna):
260        """Set the selected antenna"""
261        antennas = self.list_antennas()
262        if not antennas:
263            logger.warning('Device does not support setting selected antenna!')
264            return
265
266        if antenna not in antennas:
267            logger.warning('Unknown antenna {}!'.format(antenna))
268            return
269
270        self._antenna = antenna
271        self.device.setAntenna(SoapySDR.SOAPY_SDR_RX, self._channel, antenna)
272
273    @property
274    def corr(self):
275        """Frequency correction [ppm]"""
276        try:
277            return self.device.getFrequency(SoapySDR.SOAPY_SDR_RX, self._channel, 'CORR')
278        except RuntimeError:
279            return 0
280
281    @corr.setter
282    def corr(self, corr):
283        """Set frequency correction [ppm]"""
284        if 'CORR' not in self.list_frequencies():
285            logger.warning('Device does not support frequency correction!')
286            return
287
288        corr_range = self.get_frequency_range('CORR')
289        if corr < corr_range[0] or corr > corr_range[1]:
290            raise ValueError('Frequency correction out of range ({}, {})!'.format(
291                corr_range[0], corr_range[1]
292            ))
293
294        self._corr = corr
295        self.device.setFrequency(SoapySDR.SOAPY_SDR_RX, self._channel, 'CORR', corr)
296
297    def list_channels(self):
298        """List available RX channels"""
299        return list(range(self.device.getNumChannels(SoapySDR.SOAPY_SDR_RX)))
300
301    def list_sample_rates(self):
302        """List allowed sample rates"""
303        try:
304            rate_ranges = Ranges(
305                (f.minimum(), f.maximum())
306                for f in self.device.getSampleRateRange(SoapySDR.SOAPY_SDR_RX, self._channel)
307            )
308        except AttributeError:
309            rate_ranges = None
310
311        if rate_ranges:
312            return rate_ranges
313        else:
314            rate_list = self.device.listSampleRates(SoapySDR.SOAPY_SDR_RX, self._channel)
315            return Ranges((f, f) for f in rate_list)
316
317    def list_bandwidths(self):
318        """List allowed bandwidths"""
319        try:
320            band_ranges = Ranges(
321                (f.minimum(), f.maximum())
322                for f in self.device.getBandwidthRange(SoapySDR.SOAPY_SDR_RX, self._channel)
323            )
324        except AttributeError:
325            band_ranges = None
326
327        if band_ranges:
328            return band_ranges
329        else:
330            band_list = self.device.listBandwidths(SoapySDR.SOAPY_SDR_RX, self._channel)
331            return Ranges((f, f) for f in band_list)
332
333    def list_antennas(self):
334        """List available antennas"""
335        return self.device.listAntennas(SoapySDR.SOAPY_SDR_RX, self._channel)
336
337    def list_gains(self):
338        """List available amplification elements"""
339        return self.device.listGains(SoapySDR.SOAPY_SDR_RX, self._channel)
340
341    def list_frequencies(self):
342        """List available tunable elements"""
343        return self.device.listFrequencies(SoapySDR.SOAPY_SDR_RX, self._channel)
344
345    def list_settings(self):
346        """List available device settings, their default values and description"""
347        settings = {
348            s.key: {'value': s.value, 'name': s.name, 'description': s.description}
349            for s in self.device.getSettingInfo()
350        }
351        return settings
352
353    def list_stream_args(self):
354        """List available stream arguments, their default values and description"""
355        args = {
356            a.key: {'value': a.value, 'name': a.name, 'description': a.description}
357            for a in self.device.getStreamArgsInfo(SoapySDR.SOAPY_SDR_RX, self._channel)
358        }
359        return args
360
361    def get_gain(self, amp_name):
362        """Get gain of given amplification element"""
363        if amp_name not in self.list_gains():
364            raise ValueError('Unknown amplification element!')
365        return self.device.getGain(SoapySDR.SOAPY_SDR_RX, self._channel, amp_name)
366
367    def set_gain(self, amp_name, value):
368        """Set gain of given amplification element"""
369        if amp_name not in self.list_gains():
370            raise ValueError('Unknown amplification element!')
371        self.device.setGain(SoapySDR.SOAPY_SDR_RX, self._channel, amp_name, value)
372
373    def get_gain_range(self, amp_name=None):
374        """Get allowed range of total gain or gain of given amplification element"""
375        if amp_name:
376            if amp_name not in self.list_gains():
377                raise ValueError('Unknown amplification element!')
378            gain = self.device.getGainRange(SoapySDR.SOAPY_SDR_RX, self._channel, amp_name)
379        else:
380            gain = self.device.getGainRange(SoapySDR.SOAPY_SDR_RX, self._channel)
381        return (gain.minimum(), gain.maximum())
382
383    def get_frequency(self, tunable_name):
384        """Get frequency of given tunable element"""
385        if tunable_name not in self.list_frequencies():
386            raise ValueError('Unknown tunable element!')
387        return self.device.getFrequency(SoapySDR.SOAPY_SDR_RX, self._channel, tunable_name)
388
389    def set_frequency(self, tunable_name, value):
390        """Set frequency of given tunable element"""
391        if tunable_name not in self.list_frequencies():
392            raise ValueError('Unknown tunable element!')
393        self.device.setFrequency(SoapySDR.SOAPY_SDR_RX, self._channel, tunable_name, value)
394
395    def get_frequency_range(self, tunable_name=None):
396        """Get allowed range of center frequency or frequency of given tunable element"""
397        if tunable_name:
398            if tunable_name not in self.list_frequencies():
399                raise ValueError('Unknown tunable element!')
400            freq = self.device.getFrequencyRange(SoapySDR.SOAPY_SDR_RX, self._channel, tunable_name)[0]
401        else:
402            freq = self.device.getFrequencyRange(SoapySDR.SOAPY_SDR_RX, self._channel)[0]
403        return (freq.minimum(), freq.maximum())
404
405    def get_setting(self, setting_name):
406        """Get value of given device setting"""
407        if setting_name not in self.list_settings():
408            raise ValueError('Unknown device setting!')
409        return self.device.readSetting(setting_name)
410
411    def set_setting(self, setting_name, value):
412        """Set value of given device setting"""
413        if setting_name not in self.list_settings():
414            raise ValueError('Unknown device setting!')
415        self.device.writeSetting(setting_name, value)
416
417    def start_stream(self, buffer_size=0, stream_args=None, stream_timeout=0):
418        """Start streaming samples"""
419        if self.is_streaming:
420            raise RuntimeError('Streaming has been already initialized!')
421
422        logger.debug('SoapySDR stream - args: {}'.format(stream_args or self.stream_args or {}))
423        self.stream = self.device.setupStream(SoapySDR.SOAPY_SDR_RX, SoapySDR.SOAPY_SDR_CF32, [self._channel],
424                                              stream_args or self.stream_args or {})
425        self.device.activateStream(self.stream)
426
427        buffer_size = buffer_size or self.buffer_size
428        if not buffer_size:
429            try:
430                buffer_size = self.device.getStreamMTU(self.stream)
431            except AttributeError:
432                logger.warning('getStreamMTU not implemented! Using default value: {}'.format(
433                    self.default_buffer_size
434                ))
435                buffer_size = self.default_buffer_size
436
437        self.buffer = numpy.empty(buffer_size, numpy.complex64)
438        self.buffer_overflow_count = 0
439        self.stream_timeout = stream_timeout or 0.1 + (buffer_size / self.sample_rate)
440        logger.debug('SoapySDR stream - buffer size: {}'.format(buffer_size))
441        logger.debug('SoapySDR stream - read timeout: {:.6f}'.format(self.stream_timeout))
442
443        return self.buffer
444
445    def stop_stream(self):
446        """Stop streaming samples"""
447        if not self.is_streaming:
448            raise RuntimeError('Streaming is not initialized, you must run start_stream() first!')
449
450        self.device.deactivateStream(self.stream)
451        self.device.closeStream(self.stream)
452        self.stream = None
453        self.buffer = None
454
455    def read_stream(self, stream_timeout=0):
456        """Read samples into buffer"""
457        if not self.is_streaming:
458            raise RuntimeError('Streaming is not initialized, you must run start_stream() first!')
459
460        buffer_size = len(self.buffer)
461        res = self.device.readStream(self.stream, [self.buffer], buffer_size,
462                                     timeoutUs=math.ceil((stream_timeout or self.stream_timeout) * 1e6))
463        if res.ret > 0 and res.ret < buffer_size:
464            logger.warning('readStream returned only {} samples, but buffer size is {}!'.format(
465                res.ret, buffer_size
466            ))
467        return res
468
469    def read_stream_into_buffer(self, output_buffer):
470        """Read samples into supplied output_buffer (blocks until output_buffer is full)"""
471        output_buffer_size = len(output_buffer)
472        ptr = 0
473        while True:
474            res = self.read_stream()
475            if res.ret > 0:
476                output_buffer[ptr:ptr + res.ret] = self.buffer[:min(res.ret, output_buffer_size - ptr)]
477                ptr += res.ret
478            elif res.ret == -4:
479                self.buffer_overflow_count += 1
480                logger.debug('Buffer overflow error in readStream ({:d})!'.format(self.buffer_overflow_count))
481                logger.debug('Value of ptr when overflow happened: {}'.format(ptr))
482            else:
483                raise RuntimeError('Unhandled readStream() error: {} ({})'.format(
484                    res.ret, SoapySDR.errToStr(res.ret)
485                ))
486
487            if ptr >= len(output_buffer):
488                return
489
490
491if __name__ == '__main__':
492    logging.basicConfig(
493        level=logging.DEBUG,
494        format='%(levelname)s: %(message)s'
495    )
496
497    devices = detect_devices(as_string=True)
498    if not devices:
499        logger.error('No SoapySDR devices detected!')
500        sys.exit(1)
501
502    logger.info('Detected SoapySDR devices:')
503    for i, d in enumerate(devices):
504        logger.info('  {}'.format(d))
505