1"""Base USB bus, driver and device APIs. 2 3This modules provides abstractions over several platform and implementation 4differences. As such, there is a lot of boilerplate here, but callers should 5be able to disregard almost everything and simply work on the UsbDriver/ 6UsbHidDriver level. 7 8BaseUsbDriver 9└── device: PyUsbDevice 10 ├── uses PyUSB 11 └── backed by (in order of priority) 12 ├── libusb-1.0 13 ├── libusb-0.1 14 └── OpenUSB 15 16UsbHidDriver 17├── extends: BaseUsbDriver 18└── device: HidapiDevice 19 ├── uses hidapi 20 └── backed by 21 ├── hid.dll on Windows 22 ├── hidraw on Linux if it was enabled during the build of hidapi 23 ├── IOHidManager on MacOS 24 └── libusb-1.0 on all other cases 25 26UsbDriver 27├── extends: BaseUsbDriver 28└── allows to differentiate between UsbHidDriver and (non HID) UsbDriver 29 30UsbDriver and UsbHidDriver are meant to be used as base classes to the actual 31device drivers. The users of those drivers generally do not care about read, 32write or other low level operations; thus, these low level operations are 33placed in <driver>.device. 34 35However, there still are legitimate reasons as to why someone would want to 36directly access the lower layers (device wrapper level, device implementation 37level, or lower). We do not hide or mark those references as private, but good 38judgement should be exercised when calling anything within <driver>.device. 39 40The USB drivers are organized into two buses. The recommended way to 41initialize and bind drivers is through their respective buses, though 42<driver>.find_supported_devices can also be useful in certain scenarios. 43 44HidapiBus 45└── drivers: all (recursive) subclasses of UsbHidDriver 46 47PyUsbBus 48└── drivers: all (recursive) subclasses of UsbDriver 49 50The subclass constructor can generally be kept unaware of the implementation 51details of the device parameter, and find_supported_devices already accepts 52keyword arguments and forwards them to the driver constructor. 53 54Copyright (C) 2019–2021 Jonas Malaco and contributors 55SPDX-License-Identifier: GPL-3.0-or-later 56""" 57 58import logging 59import sys 60 61import usb 62try: 63 # The hidapi package, depending on how it's compiled, exposes one or two 64 # top level modules: hid and, optionally, hidraw. When both are available, 65 # hid will be a libusb-based fallback implementation, and we prefer hidraw. 66 import hidraw as hid 67except ModuleNotFoundError: 68 import hid 69 70from liquidctl.driver.base import BaseDriver, BaseBus, find_all_subclasses 71from liquidctl.util import LazyHexRepr 72 73_LOGGER = logging.getLogger(__name__) 74 75 76class BaseUsbDriver(BaseDriver): 77 """Base driver class for generic USB devices. 78 79 Each driver should provide its own list of SUPPORTED_DEVICES, as well as 80 implementations for all methods applicable to the devices is supports. 81 82 SUPPORTED_DEVICES should consist of a list of (vendor id, product 83 id, None (reserved), description, and extra kwargs) tuples. 84 85 find_supported_devices will pass these extra kwargs, as well as any it 86 receives, to the constructor. 87 """ 88 89 SUPPORTED_DEVICES = [] 90 91 @classmethod 92 def probe(cls, handle, vendor=None, product=None, release=None, 93 serial=None, match=None, **kwargs): 94 """Probe `handle` and yield corresponding driver instances.""" 95 for vid, pid, _, description, devargs in cls.SUPPORTED_DEVICES: 96 if (vendor and vendor != vid) or handle.vendor_id != vid: 97 continue 98 if (product and product != pid) or handle.product_id != pid: 99 continue 100 if release and handle.release_number != release: 101 continue 102 if serial and handle.serial_number != serial: 103 continue 104 if match and match.lower() not in description.lower(): 105 continue 106 consargs = devargs.copy() 107 consargs.update(kwargs) 108 dev = cls(handle, description, **consargs) 109 _LOGGER.debug('instanced driver for %s', description) 110 yield dev 111 112 def __init__(self, device, description, **kwargs): 113 self.device = device 114 self._description = description 115 116 def connect(self, **kwargs): 117 """Connect to the device.""" 118 self.device.open() 119 return self 120 121 def disconnect(self, **kwargs): 122 """Disconnect from the device.""" 123 self.device.close() 124 125 @property 126 def description(self): 127 """Human readable description of the corresponding device.""" 128 return self._description 129 130 @property 131 def vendor_id(self): 132 """16-bit numeric vendor identifier.""" 133 return self.device.vendor_id 134 135 @property 136 def product_id(self): 137 """16-bit umeric product identifier.""" 138 return self.device.product_id 139 140 @property 141 def release_number(self): 142 """16-bit BCD device versioning number.""" 143 return self.device.release_number 144 145 @property 146 def serial_number(self): 147 """Serial number reported by the device, or None if N/A.""" 148 return self.device.serial_number 149 150 @property 151 def bus(self): 152 """Bus the device is connected to, or None if N/A.""" 153 return self.device.bus 154 155 @property 156 def address(self): 157 """Address of the device on the corresponding bus, or None if N/A. 158 159 Dependendent on bus enumeration order. 160 """ 161 return self.device.address 162 163 @property 164 def port(self): 165 """Physical location of the device, or None if N/A. 166 167 Tuple of USB port numbers, from the root hub to this device. Not 168 dependendent on bus enumeration order. 169 """ 170 return self.device.port 171 172 173class UsbHidDriver(BaseUsbDriver): 174 """Base driver class for USB Human Interface Devices (HIDs).""" 175 176 @classmethod 177 def find_supported_devices(cls, **kwargs): 178 """Find devices specifically compatible with this driver.""" 179 devs = [] 180 for vid, pid, _, _, _ in cls.SUPPORTED_DEVICES: 181 for dev in HidapiBus().find_devices(vendor=vid, product=pid, **kwargs): 182 if type(dev) == cls: 183 devs.append(dev) 184 return devs 185 186 def __init__(self, device, description, **kwargs): 187 # compatibility with v1.1.0 drivers, which could be directly 188 # instantiated with a usb.core.Device 189 if isinstance(device, usb.core.Device): 190 clname = self.__class__.__name__ 191 _LOGGER.warning('constructing a %s instance from a usb.core.Device has been deprecated, ' 192 'use %s.find_supported_devices() or pass a HidapiDevice handle', clname, clname) 193 usbdev = device 194 hidinfo = next(info for info in hid.enumerate(usbdev.idVendor, usbdev.idProduct) 195 if info['serial_number'] == usbdev.serial_number) 196 assert hidinfo, 'Could not find device in HID bus' 197 device = HidapiDevice(hid, hidinfo) 198 super().__init__(device, description, **kwargs) 199 200 201class UsbDriver(BaseUsbDriver): 202 """Base driver class for regular USB devices. 203 204 Specifically, regular USB devices are *not* Human Interface Devices (HIDs). 205 """ 206 207 @classmethod 208 def find_supported_devices(cls, **kwargs): 209 """Find devices specifically compatible with this driver.""" 210 devs = [] 211 for vid, pid, _, _, _ in cls.SUPPORTED_DEVICES: 212 for dev in PyUsbBus().find_devices(vendor=vid, product=pid, **kwargs): 213 if type(dev) == cls: 214 devs.append(dev) 215 return devs 216 217 218class PyUsbDevice: 219 """"A PyUSB backed device. 220 221 PyUSB will automatically pick the first available backend (at runtime). 222 The supported backends are: 223 224 - libusb-1.0 225 - libusb-0.1 226 - OpenUSB 227 """ 228 229 def __init__(self, usbdev, bInterfaceNumber=None): 230 self.api = usb 231 self.usbdev = usbdev 232 self.bInterfaceNumber = bInterfaceNumber 233 self._attached = False 234 235 def _select_interface(self, cfg): 236 return self.bInterfaceNumber or 0 237 238 def open(self, bInterfaceNumber=0): 239 """Connect to the device. 240 241 Ensure the device is configured and replace the kernel kernel on the 242 selected interface, if necessary. 243 """ 244 245 # we assume the device is already configured, there is only one 246 # configuration, or the first one is desired 247 248 try: 249 cfg = self.usbdev.get_active_configuration() 250 except usb.core.USBError as err: 251 if err.args[0] == 'Configuration not set': 252 _LOGGER.debug('setting the (first) configuration') 253 self.usbdev.set_configuration() 254 # FIXME device or handle might not be ready for use yet 255 cfg = self.usbdev.get_active_configuration() 256 else: 257 raise 258 259 self.bInterfaceNumber = self._select_interface(cfg) 260 _LOGGER.debug('selected interface: %d', self.bInterfaceNumber) 261 262 if (sys.platform.startswith('linux') and 263 self.usbdev.is_kernel_driver_active(self.bInterfaceNumber)): 264 _LOGGER.debug('replacing stock kernel driver with libusb') 265 self.usbdev.detach_kernel_driver(self.bInterfaceNumber) 266 self._attached = True 267 268 def claim(self): 269 """Explicitly claim the device from other programs.""" 270 _LOGGER.debug('explicitly claim interface') 271 usb.util.claim_interface(self.usbdev, self.bInterfaceNumber) 272 273 def release(self): 274 """Release the device to other programs.""" 275 if sys.platform == 'win32': 276 # on Windows we need to release the entire device for other 277 # programs to be able to access it 278 _LOGGER.debug('explicitly release device') 279 usb.util.dispose_resources(self.usbdev) 280 else: 281 # on Linux, and possibly on Mac and BSDs, releasing the specific 282 # interface is enough 283 _LOGGER.debug('explicitly release interface') 284 usb.util.release_interface(self.usbdev, self.bInterfaceNumber) 285 286 def close(self): 287 """Disconnect from the device. 288 289 Clean up and (Linux only) reattach the kernel driver. 290 """ 291 self.release() 292 if self._attached: 293 _LOGGER.debug('restoring stock kernel driver') 294 self.usbdev.attach_kernel_driver(self.bInterfaceNumber) 295 self._attached = False 296 297 def read(self, endpoint, length, timeout=None): 298 """Read from endpoint.""" 299 data = self.usbdev.read(endpoint, length, timeout=timeout) 300 _LOGGER.debug('read %d bytes: %r', len(data), LazyHexRepr(data)) 301 return data 302 303 def write(self, endpoint, data, timeout=None): 304 """Write to endpoint.""" 305 _LOGGER.debug('writting %d bytes: %r', len(data), LazyHexRepr(data)) 306 return self.usbdev.write(endpoint, data, timeout=timeout) 307 308 def ctrl_transfer(self, *args, **kwargs): 309 """Submit a contrl transfer.""" 310 _LOGGER.debug('sending control transfer with %r, %r', args, kwargs) 311 return self.usbdev.ctrl_transfer(*args, **kwargs) 312 313 @classmethod 314 def enumerate(cls, vid=None, pid=None): 315 args = {} 316 if vid: 317 args['idVendor'] = vid 318 if pid: 319 args['idProduct'] = pid 320 for handle in usb.core.find(find_all=True, **args): 321 yield cls(handle) 322 323 @property 324 def vendor_id(self): 325 return self.usbdev.idVendor 326 327 @property 328 def product_id(self): 329 return self.usbdev.idProduct 330 331 @property 332 def release_number(self): 333 return self.usbdev.bcdDevice 334 335 @property 336 def serial_number(self): 337 return self.usbdev.serial_number 338 339 @property 340 def bus(self): 341 return f'usb{self.usbdev.bus}' # follow Linux model 342 343 @property 344 def address(self): 345 return self.usbdev.address 346 347 @property 348 def port(self): 349 return self.usbdev.port_numbers 350 351 def __eq__(self, other): 352 return type(self) == type(other) and self.bus == other.bus and self.address == other.address 353 354 355class HidapiDevice: 356 """A hidapi backed device. 357 358 Depending on the platform, the selected `hidapi` and how it was built, this 359 might use any of the following backends: 360 361 - hid.dll on Windows 362 - hidraw on Linux, if it was enabled during the build of hidapi 363 - IOHidManager on MacOS 364 - libusb-1.0 on all other cases 365 366 The default hidapi API is the module 'hid'. On standard Linux builds of 367 the hidapi package, this might default to a libusb-1.0 backed 368 implementation; at the same time an alternate 'hidraw' module may also be 369 provided. The latter is prefered, when available. 370 371 Note: if a libusb-backed 'hid' is used on Linux (assuming default build 372 options) it will detach the kernel driver, making hidraw and hwmon 373 unavailable for that device. To fix, rebind the device to usbhid with: 374 375 echo '<bus>-<port>:1.0' | sudo tee /sys/bus/usb/drivers/usbhid/bind 376 """ 377 def __init__(self, hidapi, hidapi_dev_info): 378 self.api = hidapi 379 self.hidinfo = hidapi_dev_info 380 self.hiddev = self.api.device() 381 382 def open(self): 383 """Connect to the device.""" 384 self.hiddev.open_path(self.hidinfo['path']) 385 386 def close(self): 387 """NOOP.""" 388 self.hiddev.close() 389 390 def clear_enqueued_reports(self): 391 """Clear already enqueued incoming reports. 392 393 The OS generally enqueues incomming reports for open HIDs, and hidapi 394 emulates this when running on top of libusb. On Linux, up to 64 395 reports can be enqueued. 396 397 This method quickly reads and discards any already enqueued reports, 398 and is useful when later reads are not expected to return stale data. 399 """ 400 if self.hiddev.set_nonblocking(True) == 0: 401 timeout_ms = 0 # use hid_read; wont block because call succeeded 402 else: 403 timeout_ms = 1 # smallest timeout forwarded to hid_read_timeout 404 discarded = 0 405 while self.hiddev.read(max_length=1, timeout_ms=timeout_ms): 406 discarded += 1 407 _LOGGER.debug('discarded %d previously enqueued reports', discarded) 408 409 def read(self, length): 410 """Read raw report from HID. 411 412 The returned data follows the semantics of the Linux HIDRAW API. 413 414 > On a device which uses numbered reports, the first byte of the 415 > returned data will be the report number; the report data follows, 416 > beginning in the second byte. For devices which do not use numbered 417 > reports, the report data will begin at the first byte. 418 """ 419 self.hiddev.set_nonblocking(False) 420 data = self.hiddev.read(length) 421 _LOGGER.debug('read %d bytes: %r', len(data), LazyHexRepr(data)) 422 return data 423 424 def write(self, data): 425 """Write raw report to HID. 426 427 The buffer should follow the semantics of the Linux HIDRAW API. 428 429 > The first byte of the buffer passed to write() should be set to the 430 > report number. If the device does not use numbered reports, the 431 > first byte should be set to 0. The report data itself should begin 432 > at the second byte. 433 """ 434 _LOGGER.debug('writting report 0x%02x with %d bytes: %r', data[0], 435 len(data) - 1, LazyHexRepr(data, start=1)) 436 res = self.hiddev.write(data) 437 if res < 0: 438 raise OSError('Could not write to device') 439 if res != len(data): 440 _LOGGER.debug('wrote %d total bytes, expected %d', res, len(data)) 441 return res 442 443 def get_feature_report(self, report_id, length): 444 """Get feature report that matches `report_id` from HID. 445 446 If the device does not use numbered reports, set `report_id` to 0. 447 448 Unlike `read`, the returned data follows semantics similar to `write` 449 and `send_feature_report`: the first byte will always contain the 450 report ID (or 0), and the report data itself will being at the second 451 byte. 452 """ 453 data = self.hiddev.get_feature_report(report_id, length) 454 _LOGGER.debug('got feature report 0x%02x with %d bytes: %r', data[0], 455 len(data) - 1, LazyHexRepr(data, start=1)) 456 return data 457 458 def send_feature_report(self, data): 459 """Send feature report to HID. 460 461 The buffer should follow the semantics of `write`. 462 463 > The first byte of the buffer passed to write() should be set to the 464 > report number. If the device does not use numbered reports, the 465 > first byte should be set to 0. The report data itself should begin 466 > at the second byte. 467 """ 468 _LOGGER.debug('sending feature report 0x%02x with %d bytes: %r', 469 data[0], len(data) - 1, LazyHexRepr(data, start=1)) 470 res = self.hiddev.send_feature_report(data) 471 if res < 0: 472 raise OSError('Could not send feature report to device') 473 if res != len(data): 474 _LOGGER.debug('sent %d total bytes, expected %d', res, len(data)) 475 return res 476 477 @classmethod 478 def enumerate(cls, api, vid=None, pid=None): 479 infos = api.enumerate(vid or 0, pid or 0) 480 if sys.platform == 'darwin': 481 infos = sorted(infos, key=lambda info: info['path']) 482 for info in infos: 483 yield cls(api, info) 484 485 @property 486 def vendor_id(self): 487 return self.hidinfo['vendor_id'] 488 489 @property 490 def product_id(self): 491 return self.hidinfo['product_id'] 492 493 @property 494 def release_number(self): 495 return self.hidinfo['release_number'] 496 497 @property 498 def serial_number(self): 499 return self.hidinfo['serial_number'] 500 501 @property 502 def bus(self): 503 return 'hid' # follow Linux model 504 505 @property 506 def address(self): 507 return self.hidinfo['path'].decode(errors='replace') 508 509 @property 510 def port(self): 511 return None 512 513 def __eq__(self, other): 514 return type(self) == type(other) and self.bus == other.bus and self.address == other.address 515 516 517class HidapiBus(BaseBus): 518 def find_devices(self, vendor=None, product=None, bus=None, address=None, 519 usb_port=None, **kwargs): 520 """Find compatible HID devices.""" 521 handles = HidapiDevice.enumerate(hid, vendor, product) 522 drivers = sorted(find_all_subclasses(UsbHidDriver), 523 key=lambda x: (x.__module__, x.__name__)) 524 _LOGGER.debug('searching %s (%s)', self.__class__.__name__, 525 ', '.join(map(lambda x: x.__name__, drivers))) 526 for handle in handles: 527 if bus and handle.bus != bus: 528 continue 529 if address and handle.address != address: 530 continue 531 if usb_port and handle.port != usb_port: 532 continue 533 _LOGGER.debug('found HID device %04x:%04x', handle.vendor_id, 534 handle.product_id) 535 for drv in drivers: 536 yield from drv.probe(handle, vendor=vendor, product=product, **kwargs) 537 538 539class PyUsbBus(BaseBus): 540 def find_devices(self, vendor=None, product=None, bus=None, address=None, 541 usb_port=None, **kwargs): 542 """ Find compatible regular USB devices.""" 543 drivers = sorted(find_all_subclasses(UsbDriver), 544 key=lambda x: (x.__module__, x.__name__)) 545 _LOGGER.debug('searching %s (%s)', self.__class__.__name__, 546 ', '.join(map(lambda x: x.__name__, drivers))) 547 for handle in PyUsbDevice.enumerate(vendor, product): 548 if bus and handle.bus != bus: 549 continue 550 if address and handle.address != address: 551 continue 552 if usb_port and handle.port != usb_port: 553 continue 554 _LOGGER.debug('found USB device %04x:%04x', handle.vendor_id, 555 handle.product_id) 556 for drv in drivers: 557 yield from drv.probe(handle, vendor=vendor, product=product, **kwargs) 558