1#!/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com> 5# Copyright (c) 2017 Red Hat, Inc. 6# 7# This program is free software: you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation; either version 2 of the License, or 10# (at your option) any later version. 11# 12# This program is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with this program. If not, see <http://www.gnu.org/licenses/>. 19# 20 21import hidtools.hid 22import os 23import pyudev 24import select 25import struct 26import uuid 27 28import logging 29logger = logging.getLogger('hidtools.hid.uhid') 30 31 32class UHIDIncompleteException(Exception): 33 """ 34 An exception raised when a UHIDDevice does not have sufficient 35 information to create a kernel device. 36 """ 37 pass 38 39 40class UHIDDevice(object): 41 """ 42 A uhid device. uhid is a kernel interface to create virtual HID devices 43 based on a report descriptor. 44 45 This class also acts as context manager for any :class:`UHIDDevice` 46 objects. See :meth:`dispatch` for details. 47 48 .. attribute:: device_nodes 49 50 A list of evdev nodes associated with this HID device. Populating 51 this list requires udev events to be processed, ensure that 52 :meth:`dispatch` is called and that you wait for some reasonable 53 time after creating the device. 54 55 .. attribute:: hidraw_nodes 56 57 A list of hidraw nodes associated with this HID device. Populating 58 this list requires udev events to be processed, ensure that 59 :meth:`dispatch` is called and that you wait for some reasonable 60 time after creating the device. 61 62 .. attribute:: uniq 63 64 A uniq string assigned to this device. This string is autogenerated 65 and can be used to reliably identify the device. 66 67 """ 68 __UHID_LEGACY_CREATE = 0 69 _UHID_DESTROY = 1 70 _UHID_START = 2 71 _UHID_STOP = 3 72 _UHID_OPEN = 4 73 _UHID_CLOSE = 5 74 _UHID_OUTPUT = 6 75 __UHID_LEGACY_OUTPUT_EV = 7 76 __UHID_LEGACY_INPUT = 8 77 _UHID_GET_REPORT = 9 78 _UHID_GET_REPORT_REPLY = 10 79 _UHID_CREATE2 = 11 80 _UHID_INPUT2 = 12 81 _UHID_SET_REPORT = 13 82 _UHID_SET_REPORT_REPLY = 14 83 84 UHID_FEATURE_REPORT = 0 85 UHID_OUTPUT_REPORT = 1 86 UHID_INPUT_REPORT = 2 87 88 _polling_functions = {} 89 _poll = select.poll() 90 _devices = [] 91 92 _pyudev_context = None 93 _pyudev_monitor = None 94 95 @classmethod 96 def dispatch(cls, timeout=None): 97 """ 98 Process any events available on any internally registered file 99 descriptor and deal with the events. 100 101 The caller must call this function regularly to make sure things 102 like udev events are processed correctly. There's no indicator of 103 when to call :meth:`dispatch` yet, call it whenever you're idle. 104 105 :returns: the number of devices data was available on 106 """ 107 devices = cls._poll.poll(timeout) 108 for fd, mask in devices: 109 if mask & select.POLLIN: 110 fun = cls._polling_functions[fd] 111 fun() 112 return len(devices) 113 114 @classmethod 115 def _append_fd_to_poll(cls, fd, read_function, mask=select.POLLIN): 116 cls._poll.register(fd, mask) 117 cls._polling_functions[fd] = read_function 118 119 @classmethod 120 def _remove_fd_from_poll(cls, fd): 121 cls._poll.unregister(fd) 122 123 @classmethod 124 def _init_pyudev(cls): 125 if cls._pyudev_context is None: 126 cls._pyudev_context = pyudev.Context() 127 cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context) 128 cls._pyudev_monitor.start() 129 130 cls._append_fd_to_poll(cls._pyudev_monitor.fileno(), 131 cls._cls_udev_event_callback) 132 133 @classmethod 134 def _cls_udev_event_callback(cls): 135 event = cls._pyudev_monitor.poll() 136 137 if event is None: 138 return 139 140 for d in cls._devices: 141 if d.udev_device is not None and d.udev_device.sys_path in event.sys_path: 142 d._udev_event(event) 143 144 def __init__(self): 145 self._name = None 146 self._phys = '' 147 self._rdesc = None 148 self.parsed_rdesc = None 149 self._info = None 150 self._fd = os.open('/dev/uhid', os.O_RDWR) 151 self._start = self.start 152 self._stop = self.stop 153 self._open = self.open 154 self._close = self.close 155 self._output_report = self.output_report 156 self._udev_device = None 157 self._ready = False 158 self._is_destroyed = False 159 self.device_nodes = [] 160 self.hidraw_nodes = [] 161 self.uniq = f'uhid_{str(uuid.uuid4())}' 162 self._append_fd_to_poll(self._fd, self._process_one_event) 163 self._init_pyudev() 164 UHIDDevice._devices.append(self) 165 166 def __enter__(self): 167 return self 168 169 def __exit__(self, *exc_details): 170 if not self._is_destroyed: 171 self.destroy() 172 173 def udev_event(self, event): 174 """ 175 Callback invoked on a udev event. 176 """ 177 pass 178 179 def _udev_event(self, event): 180 # we do not need to process the udev events if the device is being 181 # removed 182 if not self._ready: 183 return 184 185 if event.action == 'add': 186 device = event 187 188 try: 189 devname = device.properties['DEVNAME'] 190 if devname.startswith('/dev/input/event'): 191 self.device_nodes.append(devname) 192 elif devname.startswith('/dev/hidraw'): 193 self.hidraw_nodes.append(devname) 194 except KeyError: 195 pass 196 197 self.udev_event(event) 198 199 @property 200 def fd(self): 201 """ 202 The fd to the ``/dev/uhid`` device node 203 """ 204 return self._fd 205 206 @property 207 def rdesc(self): 208 """ 209 The device's report descriptor 210 """ 211 return self._rdesc 212 213 @rdesc.setter 214 def rdesc(self, rdesc): 215 parsed_rdesc = rdesc 216 if not isinstance(rdesc, hidtools.hid.ReportDescriptor): 217 if isinstance(rdesc, str): 218 rdesc = f'XXX {rdesc}' 219 parsed_rdesc = hidtools.hid.ReportDescriptor.from_string(rdesc) 220 else: 221 parsed_rdesc = hidtools.hid.ReportDescriptor.from_bytes(rdesc) 222 self.parsed_rdesc = parsed_rdesc 223 self._rdesc = parsed_rdesc.bytes 224 225 @property 226 def phys(self): 227 """ 228 The device's phys string 229 """ 230 return self._phys 231 232 @phys.setter 233 def phys(self, phys): 234 self._phys = phys 235 236 @property 237 def name(self): 238 """ 239 The devices HID name 240 """ 241 return self._name 242 243 @name.setter 244 def name(self, name): 245 self._name = name 246 247 @property 248 def info(self): 249 """ 250 The devices's bus, vendor ID and product ID as tuple 251 """ 252 return self._info 253 254 @info.setter 255 def info(self, info): 256 self._info = info 257 258 @property 259 def bus(self): 260 """ 261 The device's bus type (0x3 for USB, 0x5 for Bluetooth, etc.) 262 """ 263 return self._info[0] 264 265 @property 266 def vid(self): 267 """ 268 The device's 16-bit vendor ID 269 """ 270 return self._info[1] 271 272 @property 273 def pid(self): 274 """ 275 The device's 16-bit product ID 276 """ 277 return self._info[2] 278 279 def _call_set_report(self, req, err): 280 buf = struct.pack('< L L H', 281 UHIDDevice._UHID_SET_REPORT_REPLY, 282 req, 283 err) 284 os.write(self._fd, buf) 285 286 def _call_get_report(self, req, data, err): 287 data = bytes(data) 288 buf = struct.pack('< L L H H 4096s', 289 UHIDDevice._UHID_GET_REPORT_REPLY, 290 req, 291 err, 292 len(data), 293 data) 294 os.write(self._fd, buf) 295 296 def call_input_event(self, data): 297 """ 298 Send an input event from this device. 299 300 :param list data: a list of 8-bit integers representing the HID 301 report for this input event 302 """ 303 data = bytes(data) 304 buf = struct.pack('< L H 4096s', 305 UHIDDevice._UHID_INPUT2, 306 len(data), 307 data) 308 os.write(self._fd, buf) 309 310 @property 311 def udev_device(self): 312 """ 313 The devices' udev device. 314 315 The device may be None if udev hasn't processed the device yet. 316 """ 317 if self._udev_device is None: 318 for device in self._pyudev_context.list_devices(subsystem='hid'): 319 try: 320 if self.uniq == device.properties['HID_UNIQ']: 321 self._udev_device = device 322 break 323 except KeyError: 324 pass 325 return self._udev_device 326 327 @property 328 def sys_path(self): 329 """ 330 The device's /sys path 331 """ 332 return self.udev_device.sys_path 333 334 def create_kernel_device(self): 335 """ 336 Create a kernel device from this device. Note that the device is not 337 immediately ready to go after creation, you must wait for 338 :meth:`start` and ideally for :meth:`open` to be called. 339 340 :raises: :class:`UHIDIncompleteException` if the device does not 341 have a name, report descriptor or the info bits set. 342 """ 343 if (self._name is None or 344 self._rdesc is None or 345 self._info is None): 346 raise UHIDIncompleteException("missing uhid initialization") 347 348 buf = struct.pack('< L 128s 64s 64s H H L L L L 4096s', 349 UHIDDevice._UHID_CREATE2, 350 bytes(self._name, 'utf-8'), # name 351 bytes(self._phys, 'utf-8'), # phys 352 bytes(self.uniq, 'utf-8'), # uniq 353 len(self._rdesc), # rd_size 354 self.bus, # bus 355 self.vid, # vendor 356 self.pid, # product 357 0, # version 358 0, # country 359 bytes(self._rdesc)) # rd_data[HID_MAX_DESCRIPTOR_SIZE] 360 361 n = os.write(self._fd, buf) 362 assert n == len(buf) 363 self._ready = True 364 365 def destroy(self): 366 """ 367 Destroy the device. The kernel will trigger the appropriate 368 messages in response before removing the device. 369 370 This function is called automatically on __exit__() 371 """ 372 373 if self._ready: 374 buf = struct.pack('< L', UHIDDevice._UHID_DESTROY) 375 os.write(self._fd, buf) 376 self._ready = False 377 # equivalent to dispatch() but just for our device. 378 # this ensures that the callbacks are called correctly 379 poll = select.poll() 380 poll.register(self._fd, select.POLLIN) 381 if poll.poll(100): 382 fun = self._polling_functions[self._fd] 383 fun() 384 385 UHIDDevice._devices.remove(self) 386 self._remove_fd_from_poll(self._fd) 387 os.close(self._fd) 388 self._is_destroyed = True 389 390 def start(self, flags): 391 """ 392 Called when the uhid device is ready to accept IO. 393 394 This message is sent by the kernel, to receive this message you must 395 call :meth:`dispatch` 396 """ 397 logger.debug('start') 398 399 def stop(self): 400 """ 401 Called when the uhid device no longer accepts IO. 402 403 This message is sent by the kernel, to receive this message you must 404 call :meth:`dispatch` 405 """ 406 logger.debug('stop') 407 408 def open(self): 409 """ 410 Called when a userspace client opens the created kernel device. 411 412 This message is sent by the kernel, to receive this message you must 413 call :meth:`dispatch` 414 """ 415 logger.debug('open {}'.format(self.sys_path)) 416 417 def close(self): 418 """ 419 Called when a userspace client closes the created kernel device. 420 421 Sending events on a closed device will not result in anyone reading 422 it. 423 424 This message is sent by the kernel, to receive this message you must 425 call :meth:`dispatch` 426 """ 427 logger.debug('close') 428 429 def set_report(self, req, rnum, rtype, data): 430 """ 431 Callback invoked when a process calls SetReport on this UHID device. 432 433 Return ``0`` on success or an errno on failure. 434 435 The default method always returns ``EIO`` for a failure. Override 436 this in your device if you want SetReport to succeed. 437 438 :param req: the request identifier 439 :param rnum: ??? 440 :param rtype: one of :attr:`UHID_FEATURE_REPORT`, :attr:`UHID_INPUT_REPORT`, or :attr:`UHID_OUTPUT_REPORT` 441 :param list data: a byte string with the data 442 """ 443 return 5 # EIO 444 445 def _set_report(self, req, rnum, rtype, size, data): 446 logger.debug('set report {} {} {} {} {} '.format(req, rnum, rtype, size, [f'{d:02x}' for d in data[:size]])) 447 error = self.set_report(req, rnum, rtype, [int(x) for x in data[:size]]) 448 self._call_set_report(req, error) 449 450 def get_report(self, req, rnum, rtype): 451 """ 452 Callback invoked when a process calls SetReport on this UHID device. 453 454 Return ``(0, [data bytes])`` on success or ``(errno, [])`` on 455 failure. 456 457 The default method always returns ``(EIO, [])`` for a failure. 458 Override this in your device if you want GetReport to succeed. 459 460 :param req: the request identifier 461 :param rnum: ??? 462 :param rtype: one of :attr:`UHID_FEATURE_REPORT`, :attr:`UHID_INPUT_REPORT`, or :attr:`UHID_OUTPUT_REPORT` 463 """ 464 return (5, []) # EIO 465 466 def _get_report(self, req, rnum, rtype): 467 logger.debug('get report {} {} {}'.format(req, rnum, rtype)) 468 error, data = self.get_report(req, rnum, rtype) 469 self._call_get_report(req, data, error) 470 471 def output_report(self, data, size, rtype): 472 """ 473 Callback invoked when a process sends raw data to the device. 474 475 :param data: the data sent by the kernel 476 :param size: size of the data 477 :param rtype: one of :attr:`UHID_FEATURE_REPORT`, :attr:`UHID_INPUT_REPORT`, or :attr:`UHID_OUTPUT_REPORT` 478 """ 479 logger.debug('output {} {} {}'.format(rtype, size, [f'{d:02x}' for d in data[:size]])) 480 481 def _process_one_event(self): 482 buf = os.read(self._fd, 4380) 483 assert len(buf) == 4380 484 evtype = struct.unpack_from('< L', buf)[0] 485 if evtype == UHIDDevice._UHID_START: 486 ev, flags = struct.unpack_from('< L Q', buf) 487 self.start(flags) 488 elif evtype == UHIDDevice._UHID_OPEN: 489 self._open() 490 elif evtype == UHIDDevice._UHID_STOP: 491 self._stop() 492 elif evtype == UHIDDevice._UHID_CLOSE: 493 self._close() 494 elif evtype == UHIDDevice._UHID_SET_REPORT: 495 ev, req, rnum, rtype, size, data = struct.unpack_from('< L L B B H 4096s', buf) 496 self._set_report(req, rnum, rtype, size, data) 497 elif evtype == UHIDDevice._UHID_GET_REPORT: 498 ev, req, rnum, rtype = struct.unpack_from('< L L B B', buf) 499 self._get_report(req, rnum, rtype) 500 elif evtype == UHIDDevice._UHID_OUTPUT: 501 ev, data, size, rtype = struct.unpack_from('< L 4096s H B', buf) 502 self._output_report(data, size, rtype) 503 504 def create_report(self, data, global_data=None, reportID=None, application=None): 505 """ 506 Convert the data object to an array of ints representing the report. 507 Each property of the given data object is matched against the field 508 usage name (think ``hasattr``) and filled in accordingly.:: 509 510 mouse = MouseData() 511 mouse.b1 = int(l) 512 mouse.b2 = int(r) 513 mouse.b3 = int(m) 514 mouse.x = x 515 mouse.y = y 516 517 data_bytes = uhid_device.create_report(mouse) 518 519 The :class:`UHIDDevice` will create the report according to the 520 device's report descriptor. 521 """ 522 return self.parsed_rdesc.create_report(data, global_data, reportID, application) 523