1import os
2from collections import deque, namedtuple
3from datetime import timedelta
4from enum import Enum, unique
5from tempfile import mkdtemp
6
7from liquidctl.driver.base import *
8from liquidctl.keyval import RuntimeStorage, _FilesystemBackend
9
10Report = namedtuple('Report', ['number', 'data'])
11
12
13def noop(*args, **kwargs):
14    return None
15
16
17class MockRuntimeStorage(RuntimeStorage):
18    def __init__(self, key_prefixes, backend=None):
19        if not backend:
20            run_dir = mkdtemp('run_dir')
21            backend = _FilesystemBackend(key_prefixes, runtime_dirs=[run_dir])
22        super().__init__(key_prefixes, backend)
23
24
25class MockHidapiDevice:
26    def __init__(self, vendor_id=None, product_id=None, release_number=None,
27                 serial_number=None, bus=None, address=None):
28        self.vendor_id = vendor_id
29        self.product_id = product_id
30        self.release_number = release_number
31        self.serial_number = serial_number
32        self.bus = bus
33        self.address = address
34        self.port = None
35
36        self.open = noop
37        self.close = noop
38        self.clear_enqueued_reports = noop
39
40        self._read = deque()
41        self.sent = list()
42
43    def preload_read(self, report):
44        self._read.append(report)
45
46    def read(self, length):
47        if self._read:
48            number, data = self._read.popleft()
49            if number:
50                return [number] + list(data)[:length]
51            else:
52                return list(data)[:length]
53        return None
54
55    def write(self, data):
56        data = bytes(data)  # ensure data is convertible to bytes
57        self.sent.append(Report(data[0], list(data[1:])))
58        return len(data)
59
60    def get_feature_report(self, report_id, length):
61        if self._read:
62            try:
63                report = next(filter(lambda x: x.number == report_id, self._read))
64                number, data = report
65                self._read.remove(report)
66            except StopIteration:
67                return None
68            # length dictates the size of the buffer, and if it's not large
69            # enough "ioctl (GFEATURE): Value too large for defined data type"
70            # may happen on Linux; see:
71            # https://github.com/liquidctl/liquidctl/issues/151#issuecomment-665119675
72            assert length >= len(data) + 1, 'buffer not large enough for received report'
73            return [number] + list(data)[:length]
74        return None
75
76    def send_feature_report(self, data):
77        return self.write(data)
78
79
80class MockPyusbDevice():
81    def __init__(self, vendor_id=None, product_id=None, release_number=None,
82                 serial_number=None, bus=None, address=None, port=None):
83        self.vendor_id = vendor_id
84        self.product_id = product_id
85        self.release_numer = release_number
86        self.serial_number = serial_number
87        self.bus = bus
88        self.address = address
89        self.port = port
90
91        self.open = noop
92        self.claim = noop
93        self.release = noop
94        self.close = noop
95
96        self._reset_sent()
97
98    def read(self, endpoint, length, timeout=None):
99        if len(self._responses):
100            return self._responses.popleft()
101        return [0] * length
102
103    def write(self, endpoint, data, timeout=None):
104        self._sent_xfers.append(('write', endpoint, data))
105
106    def ctrl_transfer(self, bmRequestType, bRequest, wValue=0, wIndex=0,
107                      data_or_wLength=None, timeout=None):
108        self._sent_xfers.append(('ctrl_transfer', bmRequestType, bRequest,
109                                 wValue, wIndex, data_or_wLength))
110
111    def _reset_sent(self):
112        self._sent_xfers = deque()
113        self._responses = deque()
114
115
116VirtualEeprom = namedtuple('VirtualEeprom', ['name', 'data'])
117
118
119class VirtualSmbus:
120    def __init__(self, address_count=256, register_count=256, name='i2c-99',
121                 description='Virtual', parent_vendor=0xff01, parent_device=0xff02,
122                 parent_subsystem_vendor=0xff10, parent_subsystem_device=0xff20,
123                 parent_driver='virtual'):
124
125        self._open = False
126        self._data = [[0] * register_count for _ in range(address_count)]
127
128        self.name = name
129        self.description = description
130        self.parent_vendor = parent_vendor
131        self.parent_device = parent_device
132        self.parent_subsystem_vendor = parent_subsystem_vendor
133        self.parent_subsystem_device = parent_subsystem_device
134        self.parent_driver = parent_driver
135
136    def open(self):
137        self._open = True
138
139    def read_byte(self, address):
140        if not self._open:
141            raise OSError('closed')
142        return self._data[address][0]
143
144    def read_byte_data(self, address, register):
145        if not self._open:
146            raise OSError('closed')
147        return self._data[address][register]
148
149    def read_word_data(self, address, register):
150        if not self._open:
151            raise OSError('closed')
152        return self._data[address][register]
153
154    def read_block_data(self, address, register):
155        if not self._open:
156            raise OSError('closed')
157        return self._data[address][register]
158
159    def write_byte(self, address, value):
160        if not self._open:
161            raise OSError('closed')
162        self._data[address][0] = value
163
164    def write_byte_data(self, address, register, value):
165        if not self._open:
166            raise OSError('closed')
167        self._data[address][register] = value
168
169    def write_word_data(self, address, register, value):
170        if not self._open:
171            raise OSError('closed')
172        self._data[address][register] = value
173
174    def write_block_data(self, address, register, data):
175        if not self._open:
176            raise OSError('closed')
177        self._data[address][register] = data
178
179    def close(self):
180        self._open = False
181
182    def emulate_eeprom_at(self, address, name, data):
183        self._data[address] = VirtualEeprom(name, data)  # hack
184
185    def load_eeprom(self, address):
186        return self._data[address]  # hack
187
188
189@unique
190class VirtualControlMode(Enum):
191    QUIET = 0x0
192    BALANCED = 0x1
193    EXTREME = 0x2
194
195
196CallArgs = namedtuple('CallArgs', ['args', 'kwargs'])
197
198
199class VirtualBusDevice(BaseDriver):
200    def __init__(self, *args, **kwargs):
201        self.call_args = dict()
202        self.call_args['__init__'] = CallArgs(args, kwargs)
203        self.connected = False
204
205    def connect(self, *args, **kwargs):
206        self.call_args['connect'] = CallArgs(args, kwargs)
207        self.connected = True
208        return self
209
210    def disconnect(self, *args, **kwargs):
211        self.call_args['disconnect'] = CallArgs(args, kwargs)
212        self.connected = False
213
214    def initialize(self, *args, **kwargs):
215        self.call_args['initialize'] = CallArgs(args, kwargs)
216        return [
217            ('Firmware version', '3.14.16', ''),
218        ]
219
220    def get_status(self, *args, **kwargs):
221        self.call_args['status'] = CallArgs(args, kwargs)
222        return [
223            ('Temperature', 30.4, '°C'),
224            ('Fan control mode', VirtualControlMode.QUIET, ''),
225            ('Animation', None, ''),
226            ('Uptime', timedelta(hours=18, minutes=23, seconds=12), ''),
227            ('Hardware mode', True, ''),
228        ]
229
230    def set_fixed_speed(self, *args, **kwargs):
231        self.call_args['set_fixed_speed'] = CallArgs(args, kwargs)
232
233    def set_speed_profile(self, *args, **kwargs):
234        self.call_args['set_speed_profile'] = CallArgs(args, kwargs)
235
236    def set_color(self, *args, **kwargs):
237        self.call_args['set_color'] = CallArgs(args, kwargs)
238
239    @property
240    def description(self):
241        return 'Virtual Bus Device (experimental)'
242
243    @property
244    def vendor_id(self):
245        return 0x1234
246
247    @property
248    def product_id(self):
249        return 0xabcd
250
251    @property
252    def release_number(self):
253        None
254
255    @property
256    def serial_number(self):
257        raise OSError()
258
259    @property
260    def bus(self):
261        return 'virtual'
262
263    @property
264    def address(self):
265        return 'virtual_address'
266
267    @property
268    def port(self):
269        return None
270
271
272class VirtualBus(BaseBus):
273    def find_devices(self, **kwargs):
274        yield from [VirtualBusDevice()]
275