1#!/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com>
5# Copyright (c) 2017 Red Hat, Inc.
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program.  If not, see <http://www.gnu.org/licenses/>.
19#
20
21import argparse
22import fcntl
23import libevdev
24import os
25import resource
26import sys
27import unittest
28
29# FIXME: this is really wrong :)
30sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/..')  # noqa
31
32import logging
33
34import hidtools.hid as hid # noqa
35from hidtools.util import twos_comp, to_twos_comp # noqa
36from hidtools.uhid import UHIDDevice  # noqa
37
38logger = logging.getLogger('hidtools.test.base')
39
40
41class UHIDTestDevice(UHIDDevice):
42    def __init__(self, name, application, rdesc_str=None, rdesc=None, info=None):
43        if rdesc_str is None and rdesc is None:
44            raise Exception('Please provide at least a rdesc or rdesc_str')
45        super().__init__()
46        if name is None:
47            name = f'uhid test {self.__class__.__name__}'
48        if info is None:
49            info = (3, 1, 2)
50        self.name = name
51        self.info = info
52        self.default_reportID = None
53        if not name.startswith('uhid test '):
54            self.name = 'uhid test ' + self.name
55        self.opened = False
56        self.application = application
57        self.input_nodes = {}
58        self._opened_files = []
59        if rdesc is None:
60            self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str)
61        else:
62            self.rdesc = rdesc
63
64    def udev_event(self, event):
65        if event.action != 'add':
66            return
67
68        device = event
69
70        if 'DEVNAME' not in device.properties:
71            return
72
73        devname = device.properties['DEVNAME']
74        if not devname.startswith('/dev/input/event'):
75            return
76
77        # associate the Input type to the matching HID application
78        # we reuse the guess work from udev
79        type = None
80        if 'ID_INPUT_TOUCHSCREEN' in device.properties:
81            type = 'Touch Screen'
82        elif 'ID_INPUT_TOUCHPAD' in device.properties:
83            type = 'Touch Pad'
84        elif 'ID_INPUT_TABLET' in device.properties:
85            type = 'Pen'
86        elif 'ID_INPUT_MOUSE' in device.properties:
87            type = 'Mouse'
88        elif 'ID_INPUT_KEY' in device.properties:
89            type = 'Key'
90        else:
91            # abort, the device has not been processed by udev
92            print('abort', devname, list(device.properties.items()))
93            return
94
95        event_node = open(devname, 'rb')
96        self._opened_files.append(event_node)
97        evdev = libevdev.Device(event_node)
98        fd = evdev.fd.fileno()
99        flag = fcntl.fcntl(fd, fcntl.F_GETFD)
100        fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
101
102        self.input_nodes[type] = evdev
103
104    def open(self):
105        self.opened = True
106
107    def __del__(self):
108        for evdev in self._opened_files:
109            evdev.close()
110
111    def close(self):
112        self.opened = False
113
114    def start(self, flags):
115        pass
116
117    def stop(self):
118        to_remove = []
119        for name, evdev in self.input_nodes.items():
120            evdev.fd.close()
121            to_remove.append(name)
122
123        for name in to_remove:
124            del(self.input_nodes[name])
125
126    def next_sync_events(self):
127        return list(self.evdev.events())
128
129    @property
130    def evdev(self):
131        if self.application not in self.input_nodes:
132            return None
133
134        return self.input_nodes[self.application]
135
136
137def skipIfUHDev(condition, reason):
138    def decorator(func):
139        func.skip_test_if_uhdev = condition
140        func.skip_test_if_uhdev_reason = reason
141        return func
142    return decorator
143
144
145class BaseTestCase:
146    class ContextTest(unittest.TestCase):
147        """A unit test where setUp/tearDown are amalgamated into a
148        single generator
149
150        see https://stackoverflow.com/questions/13874859/python-with-statement-and-its-use-in-a-class"""
151        def context(self):
152            """Put both setUp and tearDown code in this generator method
153            with a single `yield` between"""
154            yield
155
156        def setUp(self):
157            with open('/proc/sys/kernel/tainted') as f:
158                self.__taint = int(f.readline())
159            self.__context = self.context()
160            next(self.__context)
161
162        def tearDown(self):
163            for _ in self.__context:
164                raise RuntimeError("context method should only yield once")
165            with open('/proc/sys/kernel/tainted') as f:
166                taint = int(f.readline())
167                self.assertEqual(self.__taint, taint)
168
169    class TestUhid(ContextTest):
170        syn_event = libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)
171        key_event = libevdev.InputEvent(libevdev.EV_KEY)
172        abs_event = libevdev.InputEvent(libevdev.EV_ABS)
173        rel_event = libevdev.InputEvent(libevdev.EV_REL)
174        msc_event = libevdev.InputEvent(libevdev.EV_MSC.MSC_SCAN)
175
176        def assertInputEventsIn(self, expected_events, effective_events):
177            effective_events = effective_events.copy()
178            for ev in expected_events:
179                self.assertIn(ev, effective_events)
180                effective_events.remove(ev)
181            return effective_events
182
183        def assertInputEvents(self, expected_events, effective_events):
184            remaining = self.assertInputEventsIn(expected_events, effective_events)
185            self.assertEqual(remaining, [])
186
187        @classmethod
188        def debug_reports(cls, reports, uhdev=None, events=None):
189            data = [' '.join([f'{v:02x}' for v in r]) for r in reports]
190
191            if uhdev is not None:
192                human_data = [uhdev.parsed_rdesc.format_report(r, split_lines=True) for r in reports]
193                try:
194                    human_data = [f'\n\t       {" " * h.index("/")}'.join(h.split('\n')) for h in human_data]
195                except ValueError:
196                    # '/' not found: not a numbered report
197                    human_data = [f'\n\t      '.join(h.split('\n')) for h in human_data]
198                data = [f'{d}\n\t ====> {h}' for d, h in zip(data, human_data)]
199
200            reports = data
201
202            if len(reports) == 1:
203                print(f'sending 1 report:')
204            else:
205                print(f'sending {len(reports)} reports:')
206            for report in reports:
207                print('\t', report)
208
209            if events is not None:
210                print('events received:', events)
211
212        def create_device(self):
213            raise Exception('please reimplement me in subclasses')
214
215        def assertName(self, uhdev):
216            self.assertEqual(uhdev.evdev.name, uhdev.name)
217
218        def _skip_conditions(self, udev):
219            method = getattr(self, self._testMethodName)
220            try:
221                skip_test_if_uhdev = method.skip_test_if_uhdev
222            except AttributeError:
223                return
224
225            if skip_test_if_uhdev(self.uhdev):
226                self.skipTest(method.skip_test_if_uhdev_reason)
227
228        def context(self):
229            with self.create_device() as self.uhdev:
230                self._skip_conditions(self.uhdev)
231                self.uhdev.create_kernel_device()
232                while self.uhdev.application not in self.uhdev.input_nodes:
233                    self.uhdev.dispatch(10)
234                self.assertIsNotNone(self.uhdev.evdev)
235                yield
236
237        def test_creation(self):
238            """Make sure the device gets processed by the kernel and creates
239            the expected application input node.
240
241            If this fail, there is something wrong in the device report
242            descriptors."""
243            uhdev = self.uhdev
244            self.assertName(uhdev)
245            self.assertEqual(len(uhdev.next_sync_events()), 0)
246            uhdev.destroy()
247            while uhdev.opened:
248                if uhdev.dispatch(100) == 0:
249                    break
250            with self.assertRaises(OSError):
251                uhdev.evdev.fd.read()
252
253
254def reload_udev_rules():
255    import subprocess
256    subprocess.run("udevadm control --reload-rules".split())
257    subprocess.run("udevadm hwdb --update".split())
258
259
260def create_udev_rule(uuid):
261    os.makedirs('/run/udev/rules.d', exist_ok=True)
262    with open(f'/run/udev/rules.d/91-uhid-test-device-REMOVEME-{uuid}.rules', 'w') as f:
263        f.write('KERNELS=="*input*", ATTRS{name}=="uhid test *", ENV{LIBINPUT_IGNORE_DEVICE}="1"\n')
264        f.write('KERNELS=="*input*", ATTRS{name}=="uhid test * System Multi Axis", ENV{ID_INPUT_TOUCHSCREEN}="", ENV{ID_INPUT_SYSTEM_MULTIAXIS}="1"\n')
265    reload_udev_rules()
266
267
268def teardown_udev_rule(uuid):
269    os.remove(f'/run/udev/rules.d/91-uhid-test-device-REMOVEME-{uuid}.rules')
270    reload_udev_rules()
271
272
273def setUpModule():
274    # create a udev rule to make libinput ignore the test devices
275    if 'PYTEST_RUNNING' not in os.environ:
276        create_udev_rule('XXXXX')
277
278
279def tearDownModule():
280    # clean up after ourselves
281    if 'PYTEST_RUNNING' not in os.environ:
282        teardown_udev_rule('XXXXX')
283
284
285def parse(input_string):
286    parser_test = argparse.ArgumentParser("Testsuite for hid devices")
287    ns, rest = parser_test.parse_known_args(input_string)
288    return rest
289
290
291def main(argv):
292    if not os.geteuid() == 0:
293        sys.exit('Script must be run as root')
294
295    resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
296
297    args = parse(argv)
298
299    unittest.main(argv=[sys.argv[0], *args])
300
301
302if __name__ == '__main__':
303    from test_mouse import *  # noqa
304    from test_multitouch import *  # noqa
305    main(sys.argv[1:])
306