1# -*- coding: utf-8 -*-
2"""Implements Session to control USBTMC instruments
3
4Loosely based on PyUSBTMC:python module to handle USB-TMC(Test and
5Measurement class) devices. by Noboru Yamamot, Accl. Lab, KEK, JAPAN
6
7This file is an offspring of the Lantz Project.
8
9:copyright: 2014-2020 by PyVISA-py Authors, see AUTHORS for more details.
10:license: MIT, see LICENSE for more details.
11
12"""
13import enum
14import struct
15import time
16import warnings
17from collections import namedtuple
18
19import usb
20
21from .usbutil import find_devices, find_endpoint, find_interfaces, usb_find_desc
22
23
24class MsgID(enum.IntEnum):
25    """From USB-TMC table2"""
26
27    dev_dep_msg_out = 1
28    request_dev_dep_msg_in = 2
29    dev_dep_msg_in = 2
30    vendor_specific_out = 126
31    request_vendor_specific_in = 127
32    vendor_specific_in = 127
33
34    # USB488
35    trigger = 128
36
37
38class Request(enum.IntEnum):
39    initiate_abort_bulk_out = 1
40    check_abort_bulk_out_status = 2
41    initiate_abort_bulk_in = 3
42    check_abort_bulk_in_status = 4
43    initiate_clear = 5
44    check_clear_status = 6
45    get_capabilities = 7
46    indicator_pulse = 64
47
48    # USB488
49    read_status_byte = 128
50    ren_control = 160
51    go_to_local = 161
52    local_lockout = 162
53
54
55class UsbTmcStatus(enum.IntEnum):
56    success = 1
57    pending = 2
58    failed = 0x80
59    transfer_not_in_progress = 0x81
60    split_not_in_progress = 0x82
61    split_in_progress = 0x83
62
63
64UsbTmcCapabilities = namedtuple("UsbTmcCapabilities", "usb488 ren_control trigger")
65
66
67def find_tmc_devices(
68    vendor=None, product=None, serial_number=None, custom_match=None, **kwargs
69):
70    """Find connected USBTMC devices. See usbutil.find_devices for more info."""
71
72    def is_usbtmc(dev):
73        if custom_match and not custom_match(dev):
74            return False
75        return bool(find_interfaces(dev, bInterfaceClass=0xFE, bInterfaceSubClass=3))
76
77    return find_devices(vendor, product, serial_number, is_usbtmc, **kwargs)
78
79
80class BulkOutMessage(object):
81    """The Host uses the Bulk-OUT endpoint to send USBTMC command messages to
82    the device.
83
84    """
85
86    @staticmethod
87    def build_array(btag, eom, chunk):
88        size = len(chunk)
89        return (
90            struct.pack("BBBx", MsgID.dev_dep_msg_out, btag, ~btag & 0xFF)
91            + struct.pack("<LBxxx", size, eom)
92            + chunk
93            + b"\0" * ((4 - size) % 4)
94        )
95
96
97class BulkInMessage(
98    namedtuple(
99        "BulkInMessage",
100        "msgid btag btaginverse " "transfer_size transfer_attributes data",
101    )
102):
103    """The Host uses the Bulk-IN endpoint to read USBTMC response messages from
104    the device.
105
106    The Host must first send a USBTMC command message that expects a response
107    before attempting to read a USBTMC response message.
108
109    """
110
111    @classmethod
112    def from_bytes(cls, data):
113        msgid, btag, btaginverse = struct.unpack_from("BBBx", data)
114        if msgid != MsgID.dev_dep_msg_in:
115            warnings.warn(
116                "Unexpected MsgID format. Consider updating the device's firmware. See https://github.com/pyvisa/pyvisa-py/issues/20"
117            )
118            return BulkInMessage.from_quirky(data)
119
120        transfer_size, transfer_attributes = struct.unpack_from("<LBxxx", data, 4)
121
122        # Truncate data to the specified length (discard padding).
123        data = data[12 : 12 + transfer_size]
124        return cls(msgid, btag, btaginverse, transfer_size, transfer_attributes, data)
125
126    @classmethod
127    def from_quirky(cls, data):
128        """Constructs a correct response for quirky devices."""
129        msgid, btag, btaginverse = struct.unpack_from("BBBx", data)
130        data = data.rstrip(b"\x00")
131        # check whether it contains a ';' and if throw away the first 12 bytes
132        if b";" in data:
133            transfer_size, transfer_attributes = struct.unpack_from("<LBxxx", data, 4)
134            data = data[12:]
135        else:
136            transfer_size = 0
137            transfer_attributes = 1
138        return cls(msgid, btag, btaginverse, transfer_size, transfer_attributes, data)
139
140    @staticmethod
141    def build_array(btag, transfer_size, term_char=None):
142        """
143
144        :param transfer_size:
145        :param btag:
146        :param term_char:
147        :return:
148        """
149
150        if term_char is None:
151            transfer_attributes = 0
152            term_char = 0
153        else:
154            transfer_attributes = 2
155
156        return struct.pack(
157            "BBBx", MsgID.request_dev_dep_msg_in, btag, ~btag & 0xFF
158        ) + struct.pack("<LBBxx", transfer_size, transfer_attributes, term_char)
159
160
161class USBRaw(object):
162    """Base class for drivers that communicate with instruments
163    via usb port using pyUSB
164    """
165
166    #: Configuration number to be used. If None, the default will be used.
167    CONFIGURATION = None
168
169    #: Interface index it be used
170    INTERFACE = (0, 0)
171
172    #: Receive and Send endpoints to be used. If None the first IN (or OUT)
173    #: BULK endpoint will be used.
174    ENDPOINTS = (None, None)
175
176    find_devices = staticmethod(find_devices)
177
178    def __init__(
179        self,
180        vendor=None,
181        product=None,
182        serial_number=None,
183        device_filters=None,
184        timeout=None,
185        **kwargs
186    ):
187        super(USBRaw, self).__init__()
188
189        # Timeout expressed in ms as an integer and limited to 2**32-1
190        # If left to None pyusb will use its default value
191        self.timeout = timeout
192
193        device_filters = device_filters or {}
194        devices = list(
195            self.find_devices(vendor, product, serial_number, None, **device_filters)
196        )
197
198        if not devices:
199            raise ValueError("No device found.")
200        elif len(devices) > 1:
201            desc = "\n".join(str(dev) for dev in devices)
202            raise ValueError(
203                "{} devices found:\n{}\nPlease narrow the search"
204                " criteria".format(len(devices), desc)
205            )
206
207        self.usb_dev = devices[0]
208
209        try:
210            if self.usb_dev.is_kernel_driver_active(0):
211                self.usb_dev.detach_kernel_driver(0)
212        except (usb.core.USBError, NotImplementedError):
213            pass
214
215        try:
216            self.usb_dev.set_configuration()
217        except usb.core.USBError as e:
218            raise Exception("failed to set configuration\n %s" % e)
219
220        try:
221            self.usb_dev.set_interface_altsetting()
222        except usb.core.USBError:
223            pass
224
225        self.usb_intf = self._find_interface(self.usb_dev, self.INTERFACE)
226
227        self.usb_recv_ep, self.usb_send_ep = self._find_endpoints(
228            self.usb_intf, self.ENDPOINTS
229        )
230
231    def _find_interface(self, dev, setting):
232        return self.usb_dev.get_active_configuration()[self.INTERFACE]
233
234    def _find_endpoints(self, interface, setting):
235        recv, send = setting
236        if recv is None:
237            recv = find_endpoint(interface, usb.ENDPOINT_IN, usb.ENDPOINT_TYPE_BULK)
238        else:
239            recv = usb_find_desc(interface, bEndpointAddress=recv)
240
241        if send is None:
242            send = find_endpoint(interface, usb.ENDPOINT_OUT, usb.ENDPOINT_TYPE_BULK)
243        else:
244            send = usb_find_desc(interface, bEndpointAddress=send)
245
246        return recv, send
247
248    def write(self, data):
249        """Send raw bytes to the instrument.
250
251        :param data: bytes to be sent to the instrument
252        :type data: bytes
253        """
254
255        try:
256            return self.usb_send_ep.write(data)
257        except usb.core.USBError as e:
258            raise ValueError(str(e))
259
260    def read(self, size):
261        """Receive raw bytes to the instrument.
262
263        :param size: number of bytes to receive
264        :return: received bytes
265        :return type: bytes
266        """
267
268        if size <= 0:
269            size = 1
270
271        data = self.usb_recv_ep.read(size, self.timeout).tobytes()
272
273        return data
274
275    def close(self):
276        return usb.util.dispose_resources(self.usb_dev)
277
278
279class USBTMC(USBRaw):
280
281    # Maximum number of bytes per transfer (for sending and receiving).
282    RECV_CHUNK = 1024 ** 2
283
284    find_devices = staticmethod(find_tmc_devices)
285
286    def __init__(self, vendor=None, product=None, serial_number=None, **kwargs):
287        super(USBTMC, self).__init__(vendor, product, serial_number, **kwargs)
288        self.usb_intr_in = find_endpoint(
289            self.usb_intf, usb.ENDPOINT_IN, usb.ENDPOINT_TYPE_INTERRUPT
290        )
291
292        self.usb_dev.reset()
293        self.usb_dev.set_configuration()
294
295        time.sleep(0.01)
296
297        self._capabilities = self._get_capabilities()
298
299        self._btag = 0
300
301        if not (self.usb_recv_ep and self.usb_send_ep):
302            msg = "TMC device must have both Bulk-In and Bulk-out endpoints."
303            raise ValueError(msg)
304
305        self._enable_remote_control()
306
307    def _enable_remote_control(self):
308        if not self._capabilities.ren_control:
309            return
310
311        self.usb_dev.ctrl_transfer(
312            usb.util.build_request_type(
313                usb.util.CTRL_IN,
314                usb.util.CTRL_TYPE_CLASS,
315                usb.util.CTRL_RECIPIENT_INTERFACE,
316            ),
317            Request.ren_control,
318            1,
319            self.usb_intf.index,
320            1,
321            timeout=self.timeout,
322        )
323
324    def _get_capabilities(self):
325        c = self.usb_dev.ctrl_transfer(
326            usb.util.build_request_type(
327                usb.util.CTRL_IN,
328                usb.util.CTRL_TYPE_CLASS,
329                usb.util.CTRL_RECIPIENT_INTERFACE,
330            ),
331            Request.get_capabilities,
332            0x0000,
333            self.usb_intf.index,
334            0x0018,
335            timeout=self.timeout,
336        )
337
338        usb488_capabilities = c[0xE]
339
340        # bit #2: The interface is a 488.2 USB488 interface.
341        # bit #1: The interface accepts REN_CONTROL, GO_TO_LOCAL,
342        #         and LOCAL_LOCKOUT requests.
343        # bit #0: The interface accepts the MsgID = TRIGGER
344        #         USBTMC command message and forwards
345        #         TRIGGER requests to the Function Layer.
346        return UsbTmcCapabilities(
347            usb488=bool(usb488_capabilities & (1 << 2)),
348            ren_control=bool(usb488_capabilities & (1 << 1)),
349            trigger=bool(usb488_capabilities & (1 << 0)),
350        )
351
352    def _find_interface(self, dev, setting):
353        interfaces = find_interfaces(dev, bInterfaceClass=0xFE, bInterfaceSubClass=3)
354        if not interfaces:
355            raise ValueError("USB TMC interface not found.")
356        elif len(interfaces) > 1:
357            pass
358
359        return interfaces[0]
360
361    def _abort_bulk_in(self, btag):
362        """Request that the device abort a pending Bulk-IN operation."""
363
364        abort_timeout_ms = 5000
365
366        # Send INITIATE_ABORT_BULK_IN.
367        # According to USBTMC 1.00 4.2.1.4:
368        #   wValue = bTag value of transfer to be aborted
369        #   wIndex = Bulk-IN endpoint
370        #   wLength = 0x0002 (length of device response)
371        data = self.usb_dev.ctrl_transfer(
372            usb.util.build_request_type(
373                usb.util.CTRL_IN,
374                usb.util.CTRL_TYPE_CLASS,
375                usb.util.CTRL_RECIPIENT_ENDPOINT,
376            ),
377            Request.initiate_abort_bulk_in,
378            btag,
379            self.usb_recv_ep.bEndpointAddress,
380            0x0002,
381            timeout=abort_timeout_ms,
382        )
383
384        if data[0] != UsbTmcStatus.success:
385            # Abort Bulk-IN failed. Ignore it.
386            return
387
388        # Read remaining data from Bulk-IN endpoint.
389        self.usb_recv_ep.read(self.RECV_CHUNK, abort_timeout_ms)
390
391        # Send CHECK_ABORT_BULK_IN_STATUS until it completes.
392        # According to USBTMC 1.00 4.2.1.5:
393        #   wValue = 0x0000
394        #   wIndex = Bulk-IN endpoint
395        #   wLength = 0x0008 (length of device response)
396        for retry in range(100):
397            data = self.usb_dev.ctrl_transfer(
398                usb.util.build_request_type(
399                    usb.util.CTRL_IN,
400                    usb.util.CTRL_TYPE_CLASS,
401                    usb.util.CTRL_RECIPIENT_ENDPOINT,
402                ),
403                Request.check_abort_bulk_in_status,
404                0x0000,
405                self.usb_recv_ep.bEndpointAddress,
406                0x0008,
407                timeout=abort_timeout_ms,
408            )
409            if data[0] != UsbTmcStatus.pending:
410                break
411            time.sleep(0.05)
412
413    def write(self, data):
414        """Send raw bytes to the instrument.
415
416        :param data: bytes to be sent to the instrument
417        :type data: bytes
418        """
419
420        begin, end, size = 0, 0, len(data)
421        bytes_sent = 0
422
423        raw_write = super(USBTMC, self).write
424
425        # Send all data via one or more Bulk-OUT transfers.
426        # Set the EOM flag on the last transfer only.
427        # Send at least one transfer (possibly empty).
428        while (end == 0) or (end < size):
429            begin, end = end, begin + self.RECV_CHUNK
430
431            self._btag = (self._btag % 255) + 1
432
433            eom = end >= size
434            data = BulkOutMessage.build_array(self._btag, eom, data[begin:end])
435
436            bytes_sent += raw_write(data)
437
438        return size
439
440    def read(self, size):
441
442        recv_chunk = self.RECV_CHUNK
443        if size > 0 and size < recv_chunk:
444            recv_chunk = size
445
446        header_size = 12
447        max_padding = 511
448
449        eom = False
450
451        raw_read = super(USBTMC, self).read
452        raw_write = super(USBTMC, self).write
453
454        received = bytearray()
455
456        while not eom:
457            self._btag = (self._btag % 255) + 1
458
459            req = BulkInMessage.build_array(self._btag, recv_chunk, None)
460
461            raw_write(req)
462
463            try:
464                resp = raw_read(recv_chunk + header_size + max_padding)
465                response = BulkInMessage.from_bytes(resp)
466            except (usb.core.USBError, ValueError):
467                # Abort failed Bulk-IN operation.
468                self._abort_bulk_in(self._btag)
469                raise
470
471            received.extend(response.data)
472
473            # Detect EOM only when device sends all expected bytes.
474            if len(response.data) >= response.transfer_size:
475                eom = response.transfer_attributes & 1
476
477        return bytes(received)
478