1"""Base USB bus, driver and device APIs.
2
3This modules provides abstractions over several platform and implementation
4differences.  As such, there is a lot of boilerplate here, but callers should
5be able to disregard almost everything and simply work on the UsbDriver/
6UsbHidDriver level.
7
8BaseUsbDriver
9└── device: PyUsbDevice
10    ├── uses PyUSB
11    └── backed by (in order of priority)
12        ├── libusb-1.0
13        ├── libusb-0.1
14        └── OpenUSB
15
16UsbHidDriver
17├── extends: BaseUsbDriver
18└── device: HidapiDevice
19    ├── uses hidapi
20    └── backed by
21        ├── hid.dll on Windows
22        ├── hidraw on Linux if it was enabled during the build of hidapi
23        ├── IOHidManager on MacOS
24        └── libusb-1.0 on all other cases
25
26UsbDriver
27├── extends: BaseUsbDriver
28└── allows to differentiate between UsbHidDriver and (non HID) UsbDriver
29
30UsbDriver and UsbHidDriver are meant to be used as base classes to the actual
31device drivers.  The users of those drivers generally do not care about read,
32write or other low level operations; thus, these low level operations are
33placed in <driver>.device.
34
35However, there still are legitimate reasons as to why someone would want to
36directly access the lower layers (device wrapper level, device implementation
37level, or lower).  We do not hide or mark those references as private, but good
38judgement should be exercised when calling anything within <driver>.device.
39
40The USB drivers are organized into two buses.  The recommended way to
41initialize and bind drivers is through their respective buses, though
42<driver>.find_supported_devices can also be useful in certain scenarios.
43
44HidapiBus
45└── drivers: all (recursive) subclasses of UsbHidDriver
46
47PyUsbBus
48└── drivers: all (recursive) subclasses of UsbDriver
49
50The subclass constructor can generally be kept unaware of the implementation
51details of the device parameter, and find_supported_devices already accepts
52keyword arguments and forwards them to the driver constructor.
53
54Copyright (C) 2019–2021  Jonas Malaco and contributors
55SPDX-License-Identifier: GPL-3.0-or-later
56"""
57
58import logging
59import sys
60
61import usb
62try:
63    # The hidapi package, depending on how it's compiled, exposes one or two
64    # top level modules: hid and, optionally, hidraw.  When both are available,
65    # hid will be a libusb-based fallback implementation, and we prefer hidraw.
66    import hidraw as hid
67except ModuleNotFoundError:
68    import hid
69
70from liquidctl.driver.base import BaseDriver, BaseBus, find_all_subclasses
71from liquidctl.util import LazyHexRepr
72
73_LOGGER = logging.getLogger(__name__)
74
75
76class BaseUsbDriver(BaseDriver):
77    """Base driver class for generic USB devices.
78
79    Each driver should provide its own list of SUPPORTED_DEVICES, as well as
80    implementations for all methods applicable to the devices is supports.
81
82    SUPPORTED_DEVICES should consist of a list of (vendor id, product
83    id, None (reserved), description, and extra kwargs) tuples.
84
85    find_supported_devices will pass these extra kwargs, as well as any it
86    receives, to the constructor.
87    """
88
89    SUPPORTED_DEVICES = []
90
91    @classmethod
92    def probe(cls, handle, vendor=None, product=None, release=None,
93              serial=None, match=None, **kwargs):
94        """Probe `handle` and yield corresponding driver instances."""
95        for vid, pid, _, description, devargs in cls.SUPPORTED_DEVICES:
96            if (vendor and vendor != vid) or handle.vendor_id != vid:
97                continue
98            if (product and product != pid) or handle.product_id != pid:
99                continue
100            if release and handle.release_number != release:
101                continue
102            if serial and handle.serial_number != serial:
103                continue
104            if match and match.lower() not in description.lower():
105                continue
106            consargs = devargs.copy()
107            consargs.update(kwargs)
108            dev = cls(handle, description, **consargs)
109            _LOGGER.debug('instanced driver for %s', description)
110            yield dev
111
112    def __init__(self, device, description, **kwargs):
113        self.device = device
114        self._description = description
115
116    def connect(self, **kwargs):
117        """Connect to the device."""
118        self.device.open()
119        return self
120
121    def disconnect(self, **kwargs):
122        """Disconnect from the device."""
123        self.device.close()
124
125    @property
126    def description(self):
127        """Human readable description of the corresponding device."""
128        return self._description
129
130    @property
131    def vendor_id(self):
132        """16-bit numeric vendor identifier."""
133        return self.device.vendor_id
134
135    @property
136    def product_id(self):
137        """16-bit umeric product identifier."""
138        return self.device.product_id
139
140    @property
141    def release_number(self):
142        """16-bit BCD device versioning number."""
143        return self.device.release_number
144
145    @property
146    def serial_number(self):
147        """Serial number reported by the device, or None if N/A."""
148        return self.device.serial_number
149
150    @property
151    def bus(self):
152        """Bus the device is connected to, or None if N/A."""
153        return self.device.bus
154
155    @property
156    def address(self):
157        """Address of the device on the corresponding bus, or None if N/A.
158
159        Dependendent on bus enumeration order.
160        """
161        return self.device.address
162
163    @property
164    def port(self):
165        """Physical location of the device, or None if N/A.
166
167        Tuple of USB port numbers, from the root hub to this device.  Not
168        dependendent on bus enumeration order.
169        """
170        return self.device.port
171
172
173class UsbHidDriver(BaseUsbDriver):
174    """Base driver class for USB Human Interface Devices (HIDs)."""
175
176    @classmethod
177    def find_supported_devices(cls, **kwargs):
178        """Find devices specifically compatible with this driver."""
179        devs = []
180        for vid, pid, _, _, _ in cls.SUPPORTED_DEVICES:
181            for dev in HidapiBus().find_devices(vendor=vid, product=pid, **kwargs):
182                if type(dev) == cls:
183                    devs.append(dev)
184        return devs
185
186    def __init__(self, device, description, **kwargs):
187        # compatibility with v1.1.0 drivers, which could be directly
188        # instantiated with a usb.core.Device
189        if isinstance(device, usb.core.Device):
190            clname = self.__class__.__name__
191            _LOGGER.warning('constructing a %s instance from a usb.core.Device has been deprecated, '
192                            'use %s.find_supported_devices() or pass a HidapiDevice handle', clname, clname)
193            usbdev = device
194            hidinfo = next(info for info in hid.enumerate(usbdev.idVendor, usbdev.idProduct)
195                           if info['serial_number'] == usbdev.serial_number)
196            assert hidinfo, 'Could not find device in HID bus'
197            device = HidapiDevice(hid, hidinfo)
198        super().__init__(device, description, **kwargs)
199
200
201class UsbDriver(BaseUsbDriver):
202    """Base driver class for regular USB devices.
203
204    Specifically, regular USB devices are *not* Human Interface Devices (HIDs).
205    """
206
207    @classmethod
208    def find_supported_devices(cls, **kwargs):
209        """Find devices specifically compatible with this driver."""
210        devs = []
211        for vid, pid, _, _, _ in cls.SUPPORTED_DEVICES:
212            for dev in PyUsbBus().find_devices(vendor=vid, product=pid, **kwargs):
213                if type(dev) == cls:
214                    devs.append(dev)
215        return devs
216
217
218class PyUsbDevice:
219    """"A PyUSB backed device.
220
221    PyUSB will automatically pick the first available backend (at runtime).
222    The supported backends are:
223
224     - libusb-1.0
225     - libusb-0.1
226     - OpenUSB
227    """
228
229    def __init__(self, usbdev, bInterfaceNumber=None):
230        self.api = usb
231        self.usbdev = usbdev
232        self.bInterfaceNumber = bInterfaceNumber
233        self._attached = False
234
235    def _select_interface(self, cfg):
236        return self.bInterfaceNumber or 0
237
238    def open(self, bInterfaceNumber=0):
239        """Connect to the device.
240
241        Ensure the device is configured and replace the kernel kernel on the
242        selected interface, if necessary.
243        """
244
245        # we assume the device is already configured, there is only one
246        # configuration, or the first one is desired
247
248        try:
249            cfg = self.usbdev.get_active_configuration()
250        except usb.core.USBError as err:
251            if err.args[0] == 'Configuration not set':
252                _LOGGER.debug('setting the (first) configuration')
253                self.usbdev.set_configuration()
254                # FIXME device or handle might not be ready for use yet
255                cfg = self.usbdev.get_active_configuration()
256            else:
257                raise
258
259        self.bInterfaceNumber = self._select_interface(cfg)
260        _LOGGER.debug('selected interface: %d', self.bInterfaceNumber)
261
262        if (sys.platform.startswith('linux') and
263                self.usbdev.is_kernel_driver_active(self.bInterfaceNumber)):
264            _LOGGER.debug('replacing stock kernel driver with libusb')
265            self.usbdev.detach_kernel_driver(self.bInterfaceNumber)
266            self._attached = True
267
268    def claim(self):
269        """Explicitly claim the device from other programs."""
270        _LOGGER.debug('explicitly claim interface')
271        usb.util.claim_interface(self.usbdev, self.bInterfaceNumber)
272
273    def release(self):
274        """Release the device to other programs."""
275        if sys.platform == 'win32':
276            # on Windows we need to release the entire device for other
277            # programs to be able to access it
278            _LOGGER.debug('explicitly release device')
279            usb.util.dispose_resources(self.usbdev)
280        else:
281            # on Linux, and possibly on Mac and BSDs, releasing the specific
282            # interface is enough
283            _LOGGER.debug('explicitly release interface')
284            usb.util.release_interface(self.usbdev, self.bInterfaceNumber)
285
286    def close(self):
287        """Disconnect from the device.
288
289        Clean up and (Linux only) reattach the kernel driver.
290        """
291        self.release()
292        if self._attached:
293            _LOGGER.debug('restoring stock kernel driver')
294            self.usbdev.attach_kernel_driver(self.bInterfaceNumber)
295            self._attached = False
296
297    def read(self, endpoint, length, timeout=None):
298        """Read from endpoint."""
299        data = self.usbdev.read(endpoint, length, timeout=timeout)
300        _LOGGER.debug('read %d bytes: %r', len(data), LazyHexRepr(data))
301        return data
302
303    def write(self, endpoint, data, timeout=None):
304        """Write to endpoint."""
305        _LOGGER.debug('writting %d bytes: %r', len(data), LazyHexRepr(data))
306        return self.usbdev.write(endpoint, data, timeout=timeout)
307
308    def ctrl_transfer(self, *args, **kwargs):
309        """Submit a contrl transfer."""
310        _LOGGER.debug('sending control transfer with %r, %r', args, kwargs)
311        return self.usbdev.ctrl_transfer(*args, **kwargs)
312
313    @classmethod
314    def enumerate(cls, vid=None, pid=None):
315        args = {}
316        if vid:
317            args['idVendor'] = vid
318        if pid:
319            args['idProduct'] = pid
320        for handle in usb.core.find(find_all=True, **args):
321            yield cls(handle)
322
323    @property
324    def vendor_id(self):
325        return self.usbdev.idVendor
326
327    @property
328    def product_id(self):
329        return self.usbdev.idProduct
330
331    @property
332    def release_number(self):
333        return self.usbdev.bcdDevice
334
335    @property
336    def serial_number(self):
337        return self.usbdev.serial_number
338
339    @property
340    def bus(self):
341        return f'usb{self.usbdev.bus}'  # follow Linux model
342
343    @property
344    def address(self):
345        return self.usbdev.address
346
347    @property
348    def port(self):
349        return self.usbdev.port_numbers
350
351    def __eq__(self, other):
352        return type(self) == type(other) and self.bus == other.bus and self.address == other.address
353
354
355class HidapiDevice:
356    """A hidapi backed device.
357
358    Depending on the platform, the selected `hidapi` and how it was built, this
359    might use any of the following backends:
360
361     - hid.dll on Windows
362     - hidraw on Linux, if it was enabled during the build of hidapi
363     - IOHidManager on MacOS
364     - libusb-1.0 on all other cases
365
366    The default hidapi API is the module 'hid'.  On standard Linux builds of
367    the hidapi package, this might default to a libusb-1.0 backed
368    implementation; at the same time an alternate 'hidraw' module may also be
369    provided.  The latter is prefered, when available.
370
371    Note: if a libusb-backed 'hid' is used on Linux (assuming default build
372    options) it will detach the kernel driver, making hidraw and hwmon
373    unavailable for that device.  To fix, rebind the device to usbhid with:
374
375        echo '<bus>-<port>:1.0' | sudo tee /sys/bus/usb/drivers/usbhid/bind
376    """
377    def __init__(self, hidapi, hidapi_dev_info):
378        self.api = hidapi
379        self.hidinfo = hidapi_dev_info
380        self.hiddev = self.api.device()
381
382    def open(self):
383        """Connect to the device."""
384        self.hiddev.open_path(self.hidinfo['path'])
385
386    def close(self):
387        """NOOP."""
388        self.hiddev.close()
389
390    def clear_enqueued_reports(self):
391        """Clear already enqueued incoming reports.
392
393        The OS generally enqueues incomming reports for open HIDs, and hidapi
394        emulates this when running on top of libusb.  On Linux, up to 64
395        reports can be enqueued.
396
397        This method quickly reads and discards any already enqueued reports,
398        and is useful when later reads are not expected to return stale data.
399        """
400        if self.hiddev.set_nonblocking(True) == 0:
401            timeout_ms = 0  # use hid_read; wont block because call succeeded
402        else:
403            timeout_ms = 1  # smallest timeout forwarded to hid_read_timeout
404        discarded = 0
405        while self.hiddev.read(max_length=1, timeout_ms=timeout_ms):
406            discarded += 1
407        _LOGGER.debug('discarded %d previously enqueued reports', discarded)
408
409    def read(self, length):
410        """Read raw report from HID.
411
412        The returned data follows the semantics of the Linux HIDRAW API.
413
414        > On a device which uses numbered reports, the first byte of the
415        > returned data will be the report number; the report data follows,
416        > beginning in the second byte. For devices which do not use numbered
417        > reports, the report data will begin at the first byte.
418        """
419        self.hiddev.set_nonblocking(False)
420        data = self.hiddev.read(length)
421        _LOGGER.debug('read %d bytes: %r', len(data), LazyHexRepr(data))
422        return data
423
424    def write(self, data):
425        """Write raw report to HID.
426
427        The buffer should follow the semantics of the Linux HIDRAW API.
428
429        > The first byte of the buffer passed to write() should be set to the
430        > report number.  If the device does not use numbered reports, the
431        > first byte should be set to 0. The report data itself should begin
432        > at the second byte.
433        """
434        _LOGGER.debug('writting report 0x%02x with %d bytes: %r', data[0],
435                      len(data) - 1, LazyHexRepr(data, start=1))
436        res = self.hiddev.write(data)
437        if res < 0:
438            raise OSError('Could not write to device')
439        if res != len(data):
440            _LOGGER.debug('wrote %d total bytes, expected %d', res, len(data))
441        return res
442
443    def get_feature_report(self, report_id, length):
444        """Get feature report that matches `report_id` from HID.
445
446        If the device does not use numbered reports, set `report_id` to 0.
447
448        Unlike `read`, the returned data follows semantics similar to `write`
449        and `send_feature_report`: the first byte will always contain the
450        report ID (or 0), and the report data itself will being at the second
451        byte.
452        """
453        data = self.hiddev.get_feature_report(report_id, length)
454        _LOGGER.debug('got feature report 0x%02x with %d bytes: %r', data[0],
455                      len(data) - 1, LazyHexRepr(data, start=1))
456        return data
457
458    def send_feature_report(self, data):
459        """Send feature report to HID.
460
461        The buffer should follow the semantics of `write`.
462
463        > The first byte of the buffer passed to write() should be set to the
464        > report number.  If the device does not use numbered reports, the
465        > first byte should be set to 0. The report data itself should begin
466        > at the second byte.
467        """
468        _LOGGER.debug('sending feature report 0x%02x with %d bytes: %r',
469                      data[0], len(data) - 1, LazyHexRepr(data, start=1))
470        res = self.hiddev.send_feature_report(data)
471        if res < 0:
472            raise OSError('Could not send feature report to device')
473        if res != len(data):
474            _LOGGER.debug('sent %d total bytes, expected %d', res, len(data))
475        return res
476
477    @classmethod
478    def enumerate(cls, api, vid=None, pid=None):
479        infos = api.enumerate(vid or 0, pid or 0)
480        if sys.platform == 'darwin':
481            infos = sorted(infos, key=lambda info: info['path'])
482        for info in infos:
483            yield cls(api, info)
484
485    @property
486    def vendor_id(self):
487        return self.hidinfo['vendor_id']
488
489    @property
490    def product_id(self):
491        return self.hidinfo['product_id']
492
493    @property
494    def release_number(self):
495        return self.hidinfo['release_number']
496
497    @property
498    def serial_number(self):
499        return self.hidinfo['serial_number']
500
501    @property
502    def bus(self):
503        return 'hid'  # follow Linux model
504
505    @property
506    def address(self):
507        return self.hidinfo['path'].decode(errors='replace')
508
509    @property
510    def port(self):
511        return None
512
513    def __eq__(self, other):
514        return type(self) == type(other) and self.bus == other.bus and self.address == other.address
515
516
517class HidapiBus(BaseBus):
518    def find_devices(self, vendor=None, product=None, bus=None, address=None,
519                     usb_port=None, **kwargs):
520        """Find compatible HID devices."""
521        handles = HidapiDevice.enumerate(hid, vendor, product)
522        drivers = sorted(find_all_subclasses(UsbHidDriver),
523                         key=lambda x: (x.__module__, x.__name__))
524        _LOGGER.debug('searching %s (%s)', self.__class__.__name__,
525                      ', '.join(map(lambda x: x.__name__, drivers)))
526        for handle in handles:
527            if bus and handle.bus != bus:
528                continue
529            if address and handle.address != address:
530                continue
531            if usb_port and handle.port != usb_port:
532                continue
533            _LOGGER.debug('found HID device %04x:%04x', handle.vendor_id,
534                          handle.product_id)
535            for drv in drivers:
536                yield from drv.probe(handle, vendor=vendor, product=product, **kwargs)
537
538
539class PyUsbBus(BaseBus):
540    def find_devices(self, vendor=None, product=None, bus=None, address=None,
541                     usb_port=None, **kwargs):
542        """ Find compatible regular USB devices."""
543        drivers = sorted(find_all_subclasses(UsbDriver),
544                         key=lambda x: (x.__module__, x.__name__))
545        _LOGGER.debug('searching %s (%s)', self.__class__.__name__,
546                      ', '.join(map(lambda x: x.__name__, drivers)))
547        for handle in PyUsbDevice.enumerate(vendor, product):
548            if bus and handle.bus != bus:
549                continue
550            if address and handle.address != address:
551                continue
552            if usb_port and handle.port != usb_port:
553                continue
554            _LOGGER.debug('found USB device %04x:%04x', handle.vendor_id,
555                          handle.product_id)
556            for drv in drivers:
557                yield from drv.probe(handle, vendor=vendor, product=product, **kwargs)
558