1#!/usr/bin/env python3 2 3import sys, math, logging, collections, collections.abc, itertools 4 5import SoapySDR 6import numpy 7 8__version__ = '1.5.1' 9logger = logging.getLogger(__name__) 10 11 12def closest(num_list, num): 13 """Return number closest to supplied number from list of numbers""" 14 return min(num_list, key=lambda x: abs(x - num)) 15 16 17def detect_devices(soapy_args='', as_string=False): 18 """Detect connected SoapySDR devices""" 19 devices = [dict(d) for d in SoapySDR.Device.enumerate(soapy_args)] 20 21 if not as_string: 22 return devices 23 else: 24 devices_str = [] 25 for d in devices: 26 d_str = [] 27 d_str.append('driver={}'.format(d['driver'])) 28 if d['driver'] == 'remote': 29 d_str.append('remote:driver={}'.format(d['remote:driver'])) 30 d_str.append('remote={}'.format(d['remote'])) 31 if 'serial' in d: 32 d_str.append('serial={}'.format(d['serial'])) 33 if 'device_id' in d: 34 d_str.append('device_id={}'.format(d['device_id'])) 35 if 'rtl' in d: 36 d_str.append('rtl={}'.format(d['rtl'])) 37 if 'label' in d: 38 d_str.append('label={}'.format(d['label'])) 39 devices_str.append(', '.join(d_str)) 40 return devices_str 41 42 43class Ranges(collections.abc.Sequence): 44 """List of (minimum, maximum) tuples specifying ranges""" 45 def __init__(self, list_of_tuples): 46 self._ranges = list(list_of_tuples) 47 48 def closest(self, num): 49 """Return number closest to supplied number from available ranges""" 50 if num in self: 51 return num 52 else: 53 edges = itertools.chain.from_iterable(self) 54 return closest(edges, num) 55 56 def __contains__(self, num): 57 for r in self._ranges: 58 if num >= r[0] and num <= r[1]: 59 return True 60 61 def __getitem__(self, key): 62 return self._ranges[key] 63 64 def __len__(self): 65 return len(self._ranges) 66 67 def __repr__(self): 68 return 'Ranges({})'.format(repr(self._ranges)) 69 70 71class SoapyDevice: 72 """Simple wrapper for SoapySDR""" 73 default_buffer_size = 8192 74 75 def __init__(self, soapy_args='', sample_rate=0, bandwidth=0, corr=0, gain=None, auto_gain=False, 76 channel=0, antenna='', settings=None, force_sample_rate=False, force_bandwidth=False, 77 buffer_size=0, stream_args=None): 78 self.device = SoapySDR.Device(soapy_args) 79 self.buffer = None 80 self.buffer_size = buffer_size 81 self.buffer_overflow_count = 0 82 self.stream = None 83 self.stream_args = stream_args 84 self.stream_timeout = 0 85 86 self._hardware = self.device.getHardwareKey() 87 self._channel = None 88 self._freq = None 89 self._sample_rate = None 90 self._bandwidth = None 91 self._corr = None 92 self._gain = None 93 self._auto_gain = None 94 self._antenna = None 95 96 self.channel = channel 97 self.force_sample_rate = force_sample_rate 98 self.force_bandwidth = force_bandwidth 99 100 self._fix_hardware_quirks() 101 102 if sample_rate: 103 self.sample_rate = sample_rate 104 105 if bandwidth: 106 self.bandwidth = bandwidth 107 108 if corr: 109 self.corr = corr 110 111 if gain and isinstance(gain, dict): 112 for amp_name, value in gain.items(): 113 self.set_gain(amp_name, value) 114 elif gain is not None: 115 self.gain = gain 116 117 if auto_gain: 118 self.auto_gain = auto_gain 119 120 if antenna: 121 self.antenna = antenna 122 123 if settings: 124 for setting_name, value in settings.items(): 125 self.set_setting(setting_name, value) 126 127 def _fix_hardware_quirks(self): 128 """Apply some settings to fix quirks of specific hardware""" 129 if self.hardware == 'LimeSDR-USB': 130 logger.debug('Applying fixes for LimeSDR-USB quirks...') 131 # LimeSDR driver doesn't provide useful list of allowed sample rates 132 self.force_sample_rate = True 133 134 @property 135 def hardware(self): 136 """Type of SDR hardware (read-only)""" 137 return self._hardware 138 139 @property 140 def is_streaming(self): 141 """Has been start_stream() already called? (read-only)""" 142 return bool(self.stream) 143 144 @property 145 def channel(self): 146 """RX channel number""" 147 return self._channel 148 149 @channel.setter 150 def channel(self, channel): 151 """Set RX channel number""" 152 if channel in self.list_channels(): 153 self._channel = channel 154 else: 155 logger.warning('Incorrect RX channel number, using channel 0 instead!') 156 self._channel = 0 157 158 @property 159 def freq(self): 160 """Center frequency [Hz]""" 161 return self.device.getFrequency(SoapySDR.SOAPY_SDR_RX, self._channel) 162 163 @freq.setter 164 def freq(self, freq): 165 """Set center frequency [Hz]""" 166 freq_range = self.get_frequency_range() 167 if freq < freq_range[0] or freq > freq_range[1]: 168 raise ValueError('Center frequency out of range ({}, {})!'.format( 169 freq_range[0], freq_range[1] 170 )) 171 172 self._freq = freq 173 self.device.setFrequency(SoapySDR.SOAPY_SDR_RX, self._channel, freq) 174 175 @property 176 def sample_rate(self): 177 """Sample rate [Hz]""" 178 return self.device.getSampleRate(SoapySDR.SOAPY_SDR_RX, self._channel) 179 180 @sample_rate.setter 181 def sample_rate(self, sample_rate): 182 """Set sample rate [Hz]""" 183 if self.force_sample_rate: 184 real_sample_rate = sample_rate 185 else: 186 rate_ranges = self.list_sample_rates() 187 real_sample_rate = rate_ranges.closest(sample_rate) 188 if sample_rate != real_sample_rate: 189 logger.warning('Sample rate {} Hz is not supported, setting it to {} Hz!'.format( 190 sample_rate, real_sample_rate 191 )) 192 193 self._sample_rate = real_sample_rate 194 self.device.setSampleRate(SoapySDR.SOAPY_SDR_RX, self._channel, real_sample_rate) 195 196 @property 197 def bandwidth(self): 198 """Filter bandwidth [Hz]""" 199 return self.device.getBandwidth(SoapySDR.SOAPY_SDR_RX, self._channel) 200 201 @bandwidth.setter 202 def bandwidth(self, bandwidth): 203 """Set filter bandwidth [Hz]""" 204 if self.force_bandwidth: 205 real_bandwidth = bandwidth 206 else: 207 band_ranges = self.list_bandwidths() 208 if not band_ranges: 209 logger.warning('Device does not support setting filter bandwidth!') 210 return 211 212 real_bandwidth = band_ranges.closest(bandwidth) 213 if bandwidth != real_bandwidth: 214 logger.warning('Filter bandwidth {} Hz is not supported, setting it to {} Hz!'.format( 215 bandwidth, real_bandwidth 216 )) 217 218 self._bandwidth = real_bandwidth 219 self.device.setBandwidth(SoapySDR.SOAPY_SDR_RX, self._channel, real_bandwidth) 220 221 @property 222 def gain(self): 223 """Gain [dB]""" 224 return self.device.getGain(SoapySDR.SOAPY_SDR_RX, self._channel) 225 226 @gain.setter 227 def gain(self, gain): 228 """Set gain [dB]""" 229 gain_range = self.get_gain_range() 230 if gain < gain_range[0] or gain > gain_range[1]: 231 raise ValueError('Gain out of range ({}, {})!'.format( 232 gain_range[0], gain_range[1] 233 )) 234 235 self._gain = gain 236 self.device.setGain(SoapySDR.SOAPY_SDR_RX, self._channel, gain) 237 238 @property 239 def auto_gain(self): 240 """Automatic Gain Control""" 241 return self.device.getGainMode(SoapySDR.SOAPY_SDR_RX, self._channel) 242 243 @auto_gain.setter 244 def auto_gain(self, auto_gain): 245 """Set Automatic Gain Control""" 246 if not self.device.hasGainMode(SoapySDR.SOAPY_SDR_RX, self._channel): 247 logger.warning('Device does not support Automatic Gain Control!') 248 return 249 250 self._auto_gain = auto_gain 251 self.device.setGainMode(SoapySDR.SOAPY_SDR_RX, self._channel, auto_gain) 252 253 @property 254 def antenna(self): 255 """Selected antenna""" 256 return self.device.getAntenna(SoapySDR.SOAPY_SDR_RX, self._channel) 257 258 @antenna.setter 259 def antenna(self, antenna): 260 """Set the selected antenna""" 261 antennas = self.list_antennas() 262 if not antennas: 263 logger.warning('Device does not support setting selected antenna!') 264 return 265 266 if antenna not in antennas: 267 logger.warning('Unknown antenna {}!'.format(antenna)) 268 return 269 270 self._antenna = antenna 271 self.device.setAntenna(SoapySDR.SOAPY_SDR_RX, self._channel, antenna) 272 273 @property 274 def corr(self): 275 """Frequency correction [ppm]""" 276 try: 277 return self.device.getFrequency(SoapySDR.SOAPY_SDR_RX, self._channel, 'CORR') 278 except RuntimeError: 279 return 0 280 281 @corr.setter 282 def corr(self, corr): 283 """Set frequency correction [ppm]""" 284 if 'CORR' not in self.list_frequencies(): 285 logger.warning('Device does not support frequency correction!') 286 return 287 288 corr_range = self.get_frequency_range('CORR') 289 if corr < corr_range[0] or corr > corr_range[1]: 290 raise ValueError('Frequency correction out of range ({}, {})!'.format( 291 corr_range[0], corr_range[1] 292 )) 293 294 self._corr = corr 295 self.device.setFrequency(SoapySDR.SOAPY_SDR_RX, self._channel, 'CORR', corr) 296 297 def list_channels(self): 298 """List available RX channels""" 299 return list(range(self.device.getNumChannels(SoapySDR.SOAPY_SDR_RX))) 300 301 def list_sample_rates(self): 302 """List allowed sample rates""" 303 try: 304 rate_ranges = Ranges( 305 (f.minimum(), f.maximum()) 306 for f in self.device.getSampleRateRange(SoapySDR.SOAPY_SDR_RX, self._channel) 307 ) 308 except AttributeError: 309 rate_ranges = None 310 311 if rate_ranges: 312 return rate_ranges 313 else: 314 rate_list = self.device.listSampleRates(SoapySDR.SOAPY_SDR_RX, self._channel) 315 return Ranges((f, f) for f in rate_list) 316 317 def list_bandwidths(self): 318 """List allowed bandwidths""" 319 try: 320 band_ranges = Ranges( 321 (f.minimum(), f.maximum()) 322 for f in self.device.getBandwidthRange(SoapySDR.SOAPY_SDR_RX, self._channel) 323 ) 324 except AttributeError: 325 band_ranges = None 326 327 if band_ranges: 328 return band_ranges 329 else: 330 band_list = self.device.listBandwidths(SoapySDR.SOAPY_SDR_RX, self._channel) 331 return Ranges((f, f) for f in band_list) 332 333 def list_antennas(self): 334 """List available antennas""" 335 return self.device.listAntennas(SoapySDR.SOAPY_SDR_RX, self._channel) 336 337 def list_gains(self): 338 """List available amplification elements""" 339 return self.device.listGains(SoapySDR.SOAPY_SDR_RX, self._channel) 340 341 def list_frequencies(self): 342 """List available tunable elements""" 343 return self.device.listFrequencies(SoapySDR.SOAPY_SDR_RX, self._channel) 344 345 def list_settings(self): 346 """List available device settings, their default values and description""" 347 settings = { 348 s.key: {'value': s.value, 'name': s.name, 'description': s.description} 349 for s in self.device.getSettingInfo() 350 } 351 return settings 352 353 def list_stream_args(self): 354 """List available stream arguments, their default values and description""" 355 args = { 356 a.key: {'value': a.value, 'name': a.name, 'description': a.description} 357 for a in self.device.getStreamArgsInfo(SoapySDR.SOAPY_SDR_RX, self._channel) 358 } 359 return args 360 361 def get_gain(self, amp_name): 362 """Get gain of given amplification element""" 363 if amp_name not in self.list_gains(): 364 raise ValueError('Unknown amplification element!') 365 return self.device.getGain(SoapySDR.SOAPY_SDR_RX, self._channel, amp_name) 366 367 def set_gain(self, amp_name, value): 368 """Set gain of given amplification element""" 369 if amp_name not in self.list_gains(): 370 raise ValueError('Unknown amplification element!') 371 self.device.setGain(SoapySDR.SOAPY_SDR_RX, self._channel, amp_name, value) 372 373 def get_gain_range(self, amp_name=None): 374 """Get allowed range of total gain or gain of given amplification element""" 375 if amp_name: 376 if amp_name not in self.list_gains(): 377 raise ValueError('Unknown amplification element!') 378 gain = self.device.getGainRange(SoapySDR.SOAPY_SDR_RX, self._channel, amp_name) 379 else: 380 gain = self.device.getGainRange(SoapySDR.SOAPY_SDR_RX, self._channel) 381 return (gain.minimum(), gain.maximum()) 382 383 def get_frequency(self, tunable_name): 384 """Get frequency of given tunable element""" 385 if tunable_name not in self.list_frequencies(): 386 raise ValueError('Unknown tunable element!') 387 return self.device.getFrequency(SoapySDR.SOAPY_SDR_RX, self._channel, tunable_name) 388 389 def set_frequency(self, tunable_name, value): 390 """Set frequency of given tunable element""" 391 if tunable_name not in self.list_frequencies(): 392 raise ValueError('Unknown tunable element!') 393 self.device.setFrequency(SoapySDR.SOAPY_SDR_RX, self._channel, tunable_name, value) 394 395 def get_frequency_range(self, tunable_name=None): 396 """Get allowed range of center frequency or frequency of given tunable element""" 397 if tunable_name: 398 if tunable_name not in self.list_frequencies(): 399 raise ValueError('Unknown tunable element!') 400 freq = self.device.getFrequencyRange(SoapySDR.SOAPY_SDR_RX, self._channel, tunable_name)[0] 401 else: 402 freq = self.device.getFrequencyRange(SoapySDR.SOAPY_SDR_RX, self._channel)[0] 403 return (freq.minimum(), freq.maximum()) 404 405 def get_setting(self, setting_name): 406 """Get value of given device setting""" 407 if setting_name not in self.list_settings(): 408 raise ValueError('Unknown device setting!') 409 return self.device.readSetting(setting_name) 410 411 def set_setting(self, setting_name, value): 412 """Set value of given device setting""" 413 if setting_name not in self.list_settings(): 414 raise ValueError('Unknown device setting!') 415 self.device.writeSetting(setting_name, value) 416 417 def start_stream(self, buffer_size=0, stream_args=None, stream_timeout=0): 418 """Start streaming samples""" 419 if self.is_streaming: 420 raise RuntimeError('Streaming has been already initialized!') 421 422 logger.debug('SoapySDR stream - args: {}'.format(stream_args or self.stream_args or {})) 423 self.stream = self.device.setupStream(SoapySDR.SOAPY_SDR_RX, SoapySDR.SOAPY_SDR_CF32, [self._channel], 424 stream_args or self.stream_args or {}) 425 self.device.activateStream(self.stream) 426 427 buffer_size = buffer_size or self.buffer_size 428 if not buffer_size: 429 try: 430 buffer_size = self.device.getStreamMTU(self.stream) 431 except AttributeError: 432 logger.warning('getStreamMTU not implemented! Using default value: {}'.format( 433 self.default_buffer_size 434 )) 435 buffer_size = self.default_buffer_size 436 437 self.buffer = numpy.empty(buffer_size, numpy.complex64) 438 self.buffer_overflow_count = 0 439 self.stream_timeout = stream_timeout or 0.1 + (buffer_size / self.sample_rate) 440 logger.debug('SoapySDR stream - buffer size: {}'.format(buffer_size)) 441 logger.debug('SoapySDR stream - read timeout: {:.6f}'.format(self.stream_timeout)) 442 443 return self.buffer 444 445 def stop_stream(self): 446 """Stop streaming samples""" 447 if not self.is_streaming: 448 raise RuntimeError('Streaming is not initialized, you must run start_stream() first!') 449 450 self.device.deactivateStream(self.stream) 451 self.device.closeStream(self.stream) 452 self.stream = None 453 self.buffer = None 454 455 def read_stream(self, stream_timeout=0): 456 """Read samples into buffer""" 457 if not self.is_streaming: 458 raise RuntimeError('Streaming is not initialized, you must run start_stream() first!') 459 460 buffer_size = len(self.buffer) 461 res = self.device.readStream(self.stream, [self.buffer], buffer_size, 462 timeoutUs=math.ceil((stream_timeout or self.stream_timeout) * 1e6)) 463 if res.ret > 0 and res.ret < buffer_size: 464 logger.warning('readStream returned only {} samples, but buffer size is {}!'.format( 465 res.ret, buffer_size 466 )) 467 return res 468 469 def read_stream_into_buffer(self, output_buffer): 470 """Read samples into supplied output_buffer (blocks until output_buffer is full)""" 471 output_buffer_size = len(output_buffer) 472 ptr = 0 473 while True: 474 res = self.read_stream() 475 if res.ret > 0: 476 output_buffer[ptr:ptr + res.ret] = self.buffer[:min(res.ret, output_buffer_size - ptr)] 477 ptr += res.ret 478 elif res.ret == -4: 479 self.buffer_overflow_count += 1 480 logger.debug('Buffer overflow error in readStream ({:d})!'.format(self.buffer_overflow_count)) 481 logger.debug('Value of ptr when overflow happened: {}'.format(ptr)) 482 else: 483 raise RuntimeError('Unhandled readStream() error: {} ({})'.format( 484 res.ret, SoapySDR.errToStr(res.ret) 485 )) 486 487 if ptr >= len(output_buffer): 488 return 489 490 491if __name__ == '__main__': 492 logging.basicConfig( 493 level=logging.DEBUG, 494 format='%(levelname)s: %(message)s' 495 ) 496 497 devices = detect_devices(as_string=True) 498 if not devices: 499 logger.error('No SoapySDR devices detected!') 500 sys.exit(1) 501 502 logger.info('Detected SoapySDR devices:') 503 for i, d in enumerate(devices): 504 logger.info(' {}'.format(d)) 505