1# -*- coding: utf-8 -*- 2"""Implements Session to control USBTMC instruments 3 4Loosely based on PyUSBTMC:python module to handle USB-TMC(Test and 5Measurement class) devices. by Noboru Yamamot, Accl. Lab, KEK, JAPAN 6 7This file is an offspring of the Lantz Project. 8 9:copyright: 2014-2020 by PyVISA-py Authors, see AUTHORS for more details. 10:license: MIT, see LICENSE for more details. 11 12""" 13import enum 14import struct 15import time 16import warnings 17from collections import namedtuple 18 19import usb 20 21from .usbutil import find_devices, find_endpoint, find_interfaces, usb_find_desc 22 23 24class MsgID(enum.IntEnum): 25 """From USB-TMC table2""" 26 27 dev_dep_msg_out = 1 28 request_dev_dep_msg_in = 2 29 dev_dep_msg_in = 2 30 vendor_specific_out = 126 31 request_vendor_specific_in = 127 32 vendor_specific_in = 127 33 34 # USB488 35 trigger = 128 36 37 38class Request(enum.IntEnum): 39 initiate_abort_bulk_out = 1 40 check_abort_bulk_out_status = 2 41 initiate_abort_bulk_in = 3 42 check_abort_bulk_in_status = 4 43 initiate_clear = 5 44 check_clear_status = 6 45 get_capabilities = 7 46 indicator_pulse = 64 47 48 # USB488 49 read_status_byte = 128 50 ren_control = 160 51 go_to_local = 161 52 local_lockout = 162 53 54 55class UsbTmcStatus(enum.IntEnum): 56 success = 1 57 pending = 2 58 failed = 0x80 59 transfer_not_in_progress = 0x81 60 split_not_in_progress = 0x82 61 split_in_progress = 0x83 62 63 64UsbTmcCapabilities = namedtuple("UsbTmcCapabilities", "usb488 ren_control trigger") 65 66 67def find_tmc_devices( 68 vendor=None, product=None, serial_number=None, custom_match=None, **kwargs 69): 70 """Find connected USBTMC devices. See usbutil.find_devices for more info.""" 71 72 def is_usbtmc(dev): 73 if custom_match and not custom_match(dev): 74 return False 75 return bool(find_interfaces(dev, bInterfaceClass=0xFE, bInterfaceSubClass=3)) 76 77 return find_devices(vendor, product, serial_number, is_usbtmc, **kwargs) 78 79 80class BulkOutMessage(object): 81 """The Host uses the Bulk-OUT endpoint to send USBTMC command messages to 82 the device. 83 84 """ 85 86 @staticmethod 87 def build_array(btag, eom, chunk): 88 size = len(chunk) 89 return ( 90 struct.pack("BBBx", MsgID.dev_dep_msg_out, btag, ~btag & 0xFF) 91 + struct.pack("<LBxxx", size, eom) 92 + chunk 93 + b"\0" * ((4 - size) % 4) 94 ) 95 96 97class BulkInMessage( 98 namedtuple( 99 "BulkInMessage", 100 "msgid btag btaginverse " "transfer_size transfer_attributes data", 101 ) 102): 103 """The Host uses the Bulk-IN endpoint to read USBTMC response messages from 104 the device. 105 106 The Host must first send a USBTMC command message that expects a response 107 before attempting to read a USBTMC response message. 108 109 """ 110 111 @classmethod 112 def from_bytes(cls, data): 113 msgid, btag, btaginverse = struct.unpack_from("BBBx", data) 114 if msgid != MsgID.dev_dep_msg_in: 115 warnings.warn( 116 "Unexpected MsgID format. Consider updating the device's firmware. See https://github.com/pyvisa/pyvisa-py/issues/20" 117 ) 118 return BulkInMessage.from_quirky(data) 119 120 transfer_size, transfer_attributes = struct.unpack_from("<LBxxx", data, 4) 121 122 # Truncate data to the specified length (discard padding). 123 data = data[12 : 12 + transfer_size] 124 return cls(msgid, btag, btaginverse, transfer_size, transfer_attributes, data) 125 126 @classmethod 127 def from_quirky(cls, data): 128 """Constructs a correct response for quirky devices.""" 129 msgid, btag, btaginverse = struct.unpack_from("BBBx", data) 130 data = data.rstrip(b"\x00") 131 # check whether it contains a ';' and if throw away the first 12 bytes 132 if b";" in data: 133 transfer_size, transfer_attributes = struct.unpack_from("<LBxxx", data, 4) 134 data = data[12:] 135 else: 136 transfer_size = 0 137 transfer_attributes = 1 138 return cls(msgid, btag, btaginverse, transfer_size, transfer_attributes, data) 139 140 @staticmethod 141 def build_array(btag, transfer_size, term_char=None): 142 """ 143 144 :param transfer_size: 145 :param btag: 146 :param term_char: 147 :return: 148 """ 149 150 if term_char is None: 151 transfer_attributes = 0 152 term_char = 0 153 else: 154 transfer_attributes = 2 155 156 return struct.pack( 157 "BBBx", MsgID.request_dev_dep_msg_in, btag, ~btag & 0xFF 158 ) + struct.pack("<LBBxx", transfer_size, transfer_attributes, term_char) 159 160 161class USBRaw(object): 162 """Base class for drivers that communicate with instruments 163 via usb port using pyUSB 164 """ 165 166 #: Configuration number to be used. If None, the default will be used. 167 CONFIGURATION = None 168 169 #: Interface index it be used 170 INTERFACE = (0, 0) 171 172 #: Receive and Send endpoints to be used. If None the first IN (or OUT) 173 #: BULK endpoint will be used. 174 ENDPOINTS = (None, None) 175 176 find_devices = staticmethod(find_devices) 177 178 def __init__( 179 self, 180 vendor=None, 181 product=None, 182 serial_number=None, 183 device_filters=None, 184 timeout=None, 185 **kwargs 186 ): 187 super(USBRaw, self).__init__() 188 189 # Timeout expressed in ms as an integer and limited to 2**32-1 190 # If left to None pyusb will use its default value 191 self.timeout = timeout 192 193 device_filters = device_filters or {} 194 devices = list( 195 self.find_devices(vendor, product, serial_number, None, **device_filters) 196 ) 197 198 if not devices: 199 raise ValueError("No device found.") 200 elif len(devices) > 1: 201 desc = "\n".join(str(dev) for dev in devices) 202 raise ValueError( 203 "{} devices found:\n{}\nPlease narrow the search" 204 " criteria".format(len(devices), desc) 205 ) 206 207 self.usb_dev = devices[0] 208 209 try: 210 if self.usb_dev.is_kernel_driver_active(0): 211 self.usb_dev.detach_kernel_driver(0) 212 except (usb.core.USBError, NotImplementedError): 213 pass 214 215 try: 216 self.usb_dev.set_configuration() 217 except usb.core.USBError as e: 218 raise Exception("failed to set configuration\n %s" % e) 219 220 try: 221 self.usb_dev.set_interface_altsetting() 222 except usb.core.USBError: 223 pass 224 225 self.usb_intf = self._find_interface(self.usb_dev, self.INTERFACE) 226 227 self.usb_recv_ep, self.usb_send_ep = self._find_endpoints( 228 self.usb_intf, self.ENDPOINTS 229 ) 230 231 def _find_interface(self, dev, setting): 232 return self.usb_dev.get_active_configuration()[self.INTERFACE] 233 234 def _find_endpoints(self, interface, setting): 235 recv, send = setting 236 if recv is None: 237 recv = find_endpoint(interface, usb.ENDPOINT_IN, usb.ENDPOINT_TYPE_BULK) 238 else: 239 recv = usb_find_desc(interface, bEndpointAddress=recv) 240 241 if send is None: 242 send = find_endpoint(interface, usb.ENDPOINT_OUT, usb.ENDPOINT_TYPE_BULK) 243 else: 244 send = usb_find_desc(interface, bEndpointAddress=send) 245 246 return recv, send 247 248 def write(self, data): 249 """Send raw bytes to the instrument. 250 251 :param data: bytes to be sent to the instrument 252 :type data: bytes 253 """ 254 255 try: 256 return self.usb_send_ep.write(data) 257 except usb.core.USBError as e: 258 raise ValueError(str(e)) 259 260 def read(self, size): 261 """Receive raw bytes to the instrument. 262 263 :param size: number of bytes to receive 264 :return: received bytes 265 :return type: bytes 266 """ 267 268 if size <= 0: 269 size = 1 270 271 data = self.usb_recv_ep.read(size, self.timeout).tobytes() 272 273 return data 274 275 def close(self): 276 return usb.util.dispose_resources(self.usb_dev) 277 278 279class USBTMC(USBRaw): 280 281 # Maximum number of bytes per transfer (for sending and receiving). 282 RECV_CHUNK = 1024 ** 2 283 284 find_devices = staticmethod(find_tmc_devices) 285 286 def __init__(self, vendor=None, product=None, serial_number=None, **kwargs): 287 super(USBTMC, self).__init__(vendor, product, serial_number, **kwargs) 288 self.usb_intr_in = find_endpoint( 289 self.usb_intf, usb.ENDPOINT_IN, usb.ENDPOINT_TYPE_INTERRUPT 290 ) 291 292 self.usb_dev.reset() 293 self.usb_dev.set_configuration() 294 295 time.sleep(0.01) 296 297 self._capabilities = self._get_capabilities() 298 299 self._btag = 0 300 301 if not (self.usb_recv_ep and self.usb_send_ep): 302 msg = "TMC device must have both Bulk-In and Bulk-out endpoints." 303 raise ValueError(msg) 304 305 self._enable_remote_control() 306 307 def _enable_remote_control(self): 308 if not self._capabilities.ren_control: 309 return 310 311 self.usb_dev.ctrl_transfer( 312 usb.util.build_request_type( 313 usb.util.CTRL_IN, 314 usb.util.CTRL_TYPE_CLASS, 315 usb.util.CTRL_RECIPIENT_INTERFACE, 316 ), 317 Request.ren_control, 318 1, 319 self.usb_intf.index, 320 1, 321 timeout=self.timeout, 322 ) 323 324 def _get_capabilities(self): 325 c = self.usb_dev.ctrl_transfer( 326 usb.util.build_request_type( 327 usb.util.CTRL_IN, 328 usb.util.CTRL_TYPE_CLASS, 329 usb.util.CTRL_RECIPIENT_INTERFACE, 330 ), 331 Request.get_capabilities, 332 0x0000, 333 self.usb_intf.index, 334 0x0018, 335 timeout=self.timeout, 336 ) 337 338 usb488_capabilities = c[0xE] 339 340 # bit #2: The interface is a 488.2 USB488 interface. 341 # bit #1: The interface accepts REN_CONTROL, GO_TO_LOCAL, 342 # and LOCAL_LOCKOUT requests. 343 # bit #0: The interface accepts the MsgID = TRIGGER 344 # USBTMC command message and forwards 345 # TRIGGER requests to the Function Layer. 346 return UsbTmcCapabilities( 347 usb488=bool(usb488_capabilities & (1 << 2)), 348 ren_control=bool(usb488_capabilities & (1 << 1)), 349 trigger=bool(usb488_capabilities & (1 << 0)), 350 ) 351 352 def _find_interface(self, dev, setting): 353 interfaces = find_interfaces(dev, bInterfaceClass=0xFE, bInterfaceSubClass=3) 354 if not interfaces: 355 raise ValueError("USB TMC interface not found.") 356 elif len(interfaces) > 1: 357 pass 358 359 return interfaces[0] 360 361 def _abort_bulk_in(self, btag): 362 """Request that the device abort a pending Bulk-IN operation.""" 363 364 abort_timeout_ms = 5000 365 366 # Send INITIATE_ABORT_BULK_IN. 367 # According to USBTMC 1.00 4.2.1.4: 368 # wValue = bTag value of transfer to be aborted 369 # wIndex = Bulk-IN endpoint 370 # wLength = 0x0002 (length of device response) 371 data = self.usb_dev.ctrl_transfer( 372 usb.util.build_request_type( 373 usb.util.CTRL_IN, 374 usb.util.CTRL_TYPE_CLASS, 375 usb.util.CTRL_RECIPIENT_ENDPOINT, 376 ), 377 Request.initiate_abort_bulk_in, 378 btag, 379 self.usb_recv_ep.bEndpointAddress, 380 0x0002, 381 timeout=abort_timeout_ms, 382 ) 383 384 if data[0] != UsbTmcStatus.success: 385 # Abort Bulk-IN failed. Ignore it. 386 return 387 388 # Read remaining data from Bulk-IN endpoint. 389 self.usb_recv_ep.read(self.RECV_CHUNK, abort_timeout_ms) 390 391 # Send CHECK_ABORT_BULK_IN_STATUS until it completes. 392 # According to USBTMC 1.00 4.2.1.5: 393 # wValue = 0x0000 394 # wIndex = Bulk-IN endpoint 395 # wLength = 0x0008 (length of device response) 396 for retry in range(100): 397 data = self.usb_dev.ctrl_transfer( 398 usb.util.build_request_type( 399 usb.util.CTRL_IN, 400 usb.util.CTRL_TYPE_CLASS, 401 usb.util.CTRL_RECIPIENT_ENDPOINT, 402 ), 403 Request.check_abort_bulk_in_status, 404 0x0000, 405 self.usb_recv_ep.bEndpointAddress, 406 0x0008, 407 timeout=abort_timeout_ms, 408 ) 409 if data[0] != UsbTmcStatus.pending: 410 break 411 time.sleep(0.05) 412 413 def write(self, data): 414 """Send raw bytes to the instrument. 415 416 :param data: bytes to be sent to the instrument 417 :type data: bytes 418 """ 419 420 begin, end, size = 0, 0, len(data) 421 bytes_sent = 0 422 423 raw_write = super(USBTMC, self).write 424 425 # Send all data via one or more Bulk-OUT transfers. 426 # Set the EOM flag on the last transfer only. 427 # Send at least one transfer (possibly empty). 428 while (end == 0) or (end < size): 429 begin, end = end, begin + self.RECV_CHUNK 430 431 self._btag = (self._btag % 255) + 1 432 433 eom = end >= size 434 data = BulkOutMessage.build_array(self._btag, eom, data[begin:end]) 435 436 bytes_sent += raw_write(data) 437 438 return size 439 440 def read(self, size): 441 442 recv_chunk = self.RECV_CHUNK 443 if size > 0 and size < recv_chunk: 444 recv_chunk = size 445 446 header_size = 12 447 max_padding = 511 448 449 eom = False 450 451 raw_read = super(USBTMC, self).read 452 raw_write = super(USBTMC, self).write 453 454 received = bytearray() 455 456 while not eom: 457 self._btag = (self._btag % 255) + 1 458 459 req = BulkInMessage.build_array(self._btag, recv_chunk, None) 460 461 raw_write(req) 462 463 try: 464 resp = raw_read(recv_chunk + header_size + max_padding) 465 response = BulkInMessage.from_bytes(resp) 466 except (usb.core.USBError, ValueError): 467 # Abort failed Bulk-IN operation. 468 self._abort_bulk_in(self._btag) 469 raise 470 471 received.extend(response.data) 472 473 # Detect EOM only when device sends all expected bytes. 474 if len(response.data) >= response.transfer_size: 475 eom = response.transfer_attributes & 1 476 477 return bytes(received) 478