1#!/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com>
5# Copyright (c) 2017 Red Hat, Inc.
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program.  If not, see <http://www.gnu.org/licenses/>.
19#
20
21import hidtools.hid
22import os
23import pyudev
24import select
25import struct
26import uuid
27
28import logging
29logger = logging.getLogger('hidtools.hid.uhid')
30
31
32class UHIDIncompleteException(Exception):
33    """
34    An exception raised when a UHIDDevice does not have sufficient
35    information to create a kernel device.
36    """
37    pass
38
39
40class UHIDDevice(object):
41    """
42    A uhid device. uhid is a kernel interface to create virtual HID devices
43    based on a report descriptor.
44
45    This class also acts as context manager for any :class:`UHIDDevice`
46    objects. See :meth:`dispatch` for details.
47
48    .. attribute:: device_nodes
49
50        A list of evdev nodes associated with this HID device. Populating
51        this list requires udev events to be processed, ensure that
52        :meth:`dispatch` is called and that you wait for some reasonable
53        time after creating the device.
54
55    .. attribute:: hidraw_nodes
56
57        A list of hidraw nodes associated with this HID device. Populating
58        this list requires udev events to be processed, ensure that
59        :meth:`dispatch` is called and that you wait for some reasonable
60        time after creating the device.
61
62    .. attribute:: uniq
63
64        A uniq string assigned to this device. This string is autogenerated
65        and can be used to reliably identify the device.
66
67    """
68    __UHID_LEGACY_CREATE = 0
69    _UHID_DESTROY = 1
70    _UHID_START = 2
71    _UHID_STOP = 3
72    _UHID_OPEN = 4
73    _UHID_CLOSE = 5
74    _UHID_OUTPUT = 6
75    __UHID_LEGACY_OUTPUT_EV = 7
76    __UHID_LEGACY_INPUT = 8
77    _UHID_GET_REPORT = 9
78    _UHID_GET_REPORT_REPLY = 10
79    _UHID_CREATE2 = 11
80    _UHID_INPUT2 = 12
81    _UHID_SET_REPORT = 13
82    _UHID_SET_REPORT_REPLY = 14
83
84    UHID_FEATURE_REPORT = 0
85    UHID_OUTPUT_REPORT = 1
86    UHID_INPUT_REPORT = 2
87
88    _polling_functions = {}
89    _poll = select.poll()
90    _devices = []
91
92    _pyudev_context = None
93    _pyudev_monitor = None
94
95    @classmethod
96    def dispatch(cls, timeout=None):
97        """
98        Process any events available on any internally registered file
99        descriptor and deal with the events.
100
101        The caller must call this function regularly to make sure things
102        like udev events are processed correctly. There's no indicator of
103        when to call :meth:`dispatch` yet, call it whenever you're idle.
104
105        :returns: the number of devices data was available on
106        """
107        devices = cls._poll.poll(timeout)
108        for fd, mask in devices:
109            if mask & select.POLLIN:
110                fun = cls._polling_functions[fd]
111                fun()
112        return len(devices)
113
114    @classmethod
115    def _append_fd_to_poll(cls, fd, read_function, mask=select.POLLIN):
116        cls._poll.register(fd, mask)
117        cls._polling_functions[fd] = read_function
118
119    @classmethod
120    def _remove_fd_from_poll(cls, fd):
121        cls._poll.unregister(fd)
122
123    @classmethod
124    def _init_pyudev(cls):
125        if cls._pyudev_context is None:
126            cls._pyudev_context = pyudev.Context()
127            cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context)
128            cls._pyudev_monitor.start()
129
130            cls._append_fd_to_poll(cls._pyudev_monitor.fileno(),
131                                   cls._cls_udev_event_callback)
132
133    @classmethod
134    def _cls_udev_event_callback(cls):
135        event = cls._pyudev_monitor.poll()
136
137        if event is None:
138            return
139
140        for d in cls._devices:
141            if d.udev_device is not None and d.udev_device.sys_path in event.sys_path:
142                d._udev_event(event)
143
144    def __init__(self):
145        self._name = None
146        self._phys = ''
147        self._rdesc = None
148        self.parsed_rdesc = None
149        self._info = None
150        self._fd = os.open('/dev/uhid', os.O_RDWR)
151        self._start = self.start
152        self._stop = self.stop
153        self._open = self.open
154        self._close = self.close
155        self._output_report = self.output_report
156        self._udev_device = None
157        self._ready = False
158        self._is_destroyed = False
159        self.device_nodes = []
160        self.hidraw_nodes = []
161        self.uniq = f'uhid_{str(uuid.uuid4())}'
162        self._append_fd_to_poll(self._fd, self._process_one_event)
163        self._init_pyudev()
164        UHIDDevice._devices.append(self)
165
166    def __enter__(self):
167        return self
168
169    def __exit__(self, *exc_details):
170        if not self._is_destroyed:
171            self.destroy()
172
173    def udev_event(self, event):
174        """
175        Callback invoked on a udev event.
176        """
177        pass
178
179    def _udev_event(self, event):
180        # we do not need to process the udev events if the device is being
181        # removed
182        if not self._ready:
183            return
184
185        if event.action == 'add':
186            device = event
187
188            try:
189                devname = device.properties['DEVNAME']
190                if devname.startswith('/dev/input/event'):
191                    self.device_nodes.append(devname)
192                elif devname.startswith('/dev/hidraw'):
193                    self.hidraw_nodes.append(devname)
194            except KeyError:
195                pass
196
197        self.udev_event(event)
198
199    @property
200    def fd(self):
201        """
202        The fd to the ``/dev/uhid`` device node
203        """
204        return self._fd
205
206    @property
207    def rdesc(self):
208        """
209        The device's report descriptor
210        """
211        return self._rdesc
212
213    @rdesc.setter
214    def rdesc(self, rdesc):
215        parsed_rdesc = rdesc
216        if not isinstance(rdesc, hidtools.hid.ReportDescriptor):
217            if isinstance(rdesc, str):
218                rdesc = f'XXX {rdesc}'
219                parsed_rdesc = hidtools.hid.ReportDescriptor.from_string(rdesc)
220            else:
221                parsed_rdesc = hidtools.hid.ReportDescriptor.from_bytes(rdesc)
222        self.parsed_rdesc = parsed_rdesc
223        self._rdesc = parsed_rdesc.bytes
224
225    @property
226    def phys(self):
227        """
228        The device's phys string
229        """
230        return self._phys
231
232    @phys.setter
233    def phys(self, phys):
234        self._phys = phys
235
236    @property
237    def name(self):
238        """
239        The devices HID name
240        """
241        return self._name
242
243    @name.setter
244    def name(self, name):
245        self._name = name
246
247    @property
248    def info(self):
249        """
250        The devices's bus, vendor ID and product ID as tuple
251        """
252        return self._info
253
254    @info.setter
255    def info(self, info):
256        self._info = info
257
258    @property
259    def bus(self):
260        """
261        The device's bus type (0x3 for USB, 0x5 for Bluetooth, etc.)
262        """
263        return self._info[0]
264
265    @property
266    def vid(self):
267        """
268        The device's 16-bit vendor ID
269        """
270        return self._info[1]
271
272    @property
273    def pid(self):
274        """
275        The device's 16-bit product ID
276        """
277        return self._info[2]
278
279    def _call_set_report(self, req, err):
280        buf = struct.pack('< L L H',
281                          UHIDDevice._UHID_SET_REPORT_REPLY,
282                          req,
283                          err)
284        os.write(self._fd, buf)
285
286    def _call_get_report(self, req, data, err):
287        data = bytes(data)
288        buf = struct.pack('< L L H H 4096s',
289                          UHIDDevice._UHID_GET_REPORT_REPLY,
290                          req,
291                          err,
292                          len(data),
293                          data)
294        os.write(self._fd, buf)
295
296    def call_input_event(self, data):
297        """
298        Send an input event from this device.
299
300        :param list data: a list of 8-bit integers representing the HID
301            report for this input event
302        """
303        data = bytes(data)
304        buf = struct.pack('< L H 4096s',
305                          UHIDDevice._UHID_INPUT2,
306                          len(data),
307                          data)
308        os.write(self._fd, buf)
309
310    @property
311    def udev_device(self):
312        """
313        The devices' udev device.
314
315        The device may be None if udev hasn't processed the device yet.
316        """
317        if self._udev_device is None:
318            for device in self._pyudev_context.list_devices(subsystem='hid'):
319                try:
320                    if self.uniq == device.properties['HID_UNIQ']:
321                        self._udev_device = device
322                        break
323                except KeyError:
324                    pass
325        return self._udev_device
326
327    @property
328    def sys_path(self):
329        """
330        The device's /sys path
331        """
332        return self.udev_device.sys_path
333
334    def create_kernel_device(self):
335        """
336        Create a kernel device from this device. Note that the device is not
337        immediately ready to go after creation, you must wait for
338        :meth:`start` and ideally for :meth:`open` to be called.
339
340        :raises: :class:`UHIDIncompleteException` if the device does not
341            have a name, report descriptor or the info bits set.
342        """
343        if (self._name is None or
344           self._rdesc is None or
345           self._info is None):
346            raise UHIDIncompleteException("missing uhid initialization")
347
348        buf = struct.pack('< L 128s 64s 64s H H L L L L 4096s',
349                          UHIDDevice._UHID_CREATE2,
350                          bytes(self._name, 'utf-8'),  # name
351                          bytes(self._phys, 'utf-8'),  # phys
352                          bytes(self.uniq, 'utf-8'),  # uniq
353                          len(self._rdesc),  # rd_size
354                          self.bus,  # bus
355                          self.vid,  # vendor
356                          self.pid,  # product
357                          0,  # version
358                          0,  # country
359                          bytes(self._rdesc))  # rd_data[HID_MAX_DESCRIPTOR_SIZE]
360
361        n = os.write(self._fd, buf)
362        assert n == len(buf)
363        self._ready = True
364
365    def destroy(self):
366        """
367        Destroy the device. The kernel will trigger the appropriate
368        messages in response before removing the device.
369
370        This function is called automatically on __exit__()
371        """
372
373        if self._ready:
374            buf = struct.pack('< L', UHIDDevice._UHID_DESTROY)
375            os.write(self._fd, buf)
376            self._ready = False
377            # equivalent to dispatch() but just for our device.
378            # this ensures that the callbacks are called correctly
379            poll = select.poll()
380            poll.register(self._fd, select.POLLIN)
381            if poll.poll(100):
382                fun = self._polling_functions[self._fd]
383                fun()
384
385        UHIDDevice._devices.remove(self)
386        self._remove_fd_from_poll(self._fd)
387        os.close(self._fd)
388        self._is_destroyed = True
389
390    def start(self, flags):
391        """
392        Called when the uhid device is ready to accept IO.
393
394        This message is sent by the kernel, to receive this message you must
395        call :meth:`dispatch`
396        """
397        logger.debug('start')
398
399    def stop(self):
400        """
401        Called when the uhid device no longer accepts IO.
402
403        This message is sent by the kernel, to receive this message you must
404        call :meth:`dispatch`
405        """
406        logger.debug('stop')
407
408    def open(self):
409        """
410        Called when a userspace client opens the created kernel device.
411
412        This message is sent by the kernel, to receive this message you must
413        call :meth:`dispatch`
414        """
415        logger.debug('open {}'.format(self.sys_path))
416
417    def close(self):
418        """
419        Called when a userspace client closes the created kernel device.
420
421        Sending events on a closed device will not result in anyone reading
422        it.
423
424        This message is sent by the kernel, to receive this message you must
425        call :meth:`dispatch`
426        """
427        logger.debug('close')
428
429    def set_report(self, req, rnum, rtype, data):
430        """
431        Callback invoked when a process calls SetReport on this UHID device.
432
433        Return ``0`` on success or an errno on failure.
434
435        The default method always returns ``EIO`` for a failure. Override
436        this in your device if you want SetReport to succeed.
437
438        :param req: the request identifier
439        :param rnum: ???
440        :param rtype: one of :attr:`UHID_FEATURE_REPORT`, :attr:`UHID_INPUT_REPORT`, or :attr:`UHID_OUTPUT_REPORT`
441        :param list data: a byte string with the data
442        """
443        return 5  # EIO
444
445    def _set_report(self, req, rnum, rtype, size, data):
446        logger.debug('set report {} {} {} {} {} '.format(req, rnum, rtype, size, [f'{d:02x}' for d in data[:size]]))
447        error = self.set_report(req, rnum, rtype, [int(x) for x in data[:size]])
448        self._call_set_report(req, error)
449
450    def get_report(self, req, rnum, rtype):
451        """
452        Callback invoked when a process calls SetReport on this UHID device.
453
454        Return ``(0, [data bytes])`` on success or ``(errno, [])`` on
455        failure.
456
457        The default method always returns ``(EIO, [])`` for a failure.
458        Override this in your device if you want GetReport to succeed.
459
460        :param req: the request identifier
461        :param rnum: ???
462        :param rtype: one of :attr:`UHID_FEATURE_REPORT`, :attr:`UHID_INPUT_REPORT`, or :attr:`UHID_OUTPUT_REPORT`
463        """
464        return (5, [])  # EIO
465
466    def _get_report(self, req, rnum, rtype):
467        logger.debug('get report {} {} {}'.format(req, rnum, rtype))
468        error, data = self.get_report(req, rnum, rtype)
469        self._call_get_report(req, data, error)
470
471    def output_report(self, data, size, rtype):
472        """
473        Callback invoked when a process sends raw data to the device.
474
475        :param data: the data sent by the kernel
476        :param size: size of the data
477        :param rtype: one of :attr:`UHID_FEATURE_REPORT`, :attr:`UHID_INPUT_REPORT`, or :attr:`UHID_OUTPUT_REPORT`
478        """
479        logger.debug('output {} {} {}'.format(rtype, size, [f'{d:02x}' for d in data[:size]]))
480
481    def _process_one_event(self):
482        buf = os.read(self._fd, 4380)
483        assert len(buf) == 4380
484        evtype = struct.unpack_from('< L', buf)[0]
485        if evtype == UHIDDevice._UHID_START:
486            ev, flags = struct.unpack_from('< L Q', buf)
487            self.start(flags)
488        elif evtype == UHIDDevice._UHID_OPEN:
489            self._open()
490        elif evtype == UHIDDevice._UHID_STOP:
491            self._stop()
492        elif evtype == UHIDDevice._UHID_CLOSE:
493            self._close()
494        elif evtype == UHIDDevice._UHID_SET_REPORT:
495            ev, req, rnum, rtype, size, data = struct.unpack_from('< L L B B H 4096s', buf)
496            self._set_report(req, rnum, rtype, size, data)
497        elif evtype == UHIDDevice._UHID_GET_REPORT:
498            ev, req, rnum, rtype = struct.unpack_from('< L L B B', buf)
499            self._get_report(req, rnum, rtype)
500        elif evtype == UHIDDevice._UHID_OUTPUT:
501            ev, data, size, rtype = struct.unpack_from('< L 4096s H B', buf)
502            self._output_report(data, size, rtype)
503
504    def create_report(self, data, global_data=None, reportID=None, application=None):
505        """
506        Convert the data object to an array of ints representing the report.
507        Each property of the given data object is matched against the field
508        usage name (think ``hasattr``) and filled in accordingly.::
509
510            mouse = MouseData()
511            mouse.b1 = int(l)
512            mouse.b2 = int(r)
513            mouse.b3 = int(m)
514            mouse.x = x
515            mouse.y = y
516
517            data_bytes = uhid_device.create_report(mouse)
518
519        The :class:`UHIDDevice` will create the report according to the
520        device's report descriptor.
521        """
522        return self.parsed_rdesc.create_report(data, global_data, reportID, application)
523