1import os 2from collections import deque, namedtuple 3from datetime import timedelta 4from enum import Enum, unique 5from tempfile import mkdtemp 6 7from liquidctl.driver.base import * 8from liquidctl.keyval import RuntimeStorage, _FilesystemBackend 9 10Report = namedtuple('Report', ['number', 'data']) 11 12 13def noop(*args, **kwargs): 14 return None 15 16 17class MockRuntimeStorage(RuntimeStorage): 18 def __init__(self, key_prefixes, backend=None): 19 if not backend: 20 run_dir = mkdtemp('run_dir') 21 backend = _FilesystemBackend(key_prefixes, runtime_dirs=[run_dir]) 22 super().__init__(key_prefixes, backend) 23 24 25class MockHidapiDevice: 26 def __init__(self, vendor_id=None, product_id=None, release_number=None, 27 serial_number=None, bus=None, address=None): 28 self.vendor_id = vendor_id 29 self.product_id = product_id 30 self.release_number = release_number 31 self.serial_number = serial_number 32 self.bus = bus 33 self.address = address 34 self.port = None 35 36 self.open = noop 37 self.close = noop 38 self.clear_enqueued_reports = noop 39 40 self._read = deque() 41 self.sent = list() 42 43 def preload_read(self, report): 44 self._read.append(report) 45 46 def read(self, length): 47 if self._read: 48 number, data = self._read.popleft() 49 if number: 50 return [number] + list(data)[:length] 51 else: 52 return list(data)[:length] 53 return None 54 55 def write(self, data): 56 data = bytes(data) # ensure data is convertible to bytes 57 self.sent.append(Report(data[0], list(data[1:]))) 58 return len(data) 59 60 def get_feature_report(self, report_id, length): 61 if self._read: 62 try: 63 report = next(filter(lambda x: x.number == report_id, self._read)) 64 number, data = report 65 self._read.remove(report) 66 except StopIteration: 67 return None 68 # length dictates the size of the buffer, and if it's not large 69 # enough "ioctl (GFEATURE): Value too large for defined data type" 70 # may happen on Linux; see: 71 # https://github.com/liquidctl/liquidctl/issues/151#issuecomment-665119675 72 assert length >= len(data) + 1, 'buffer not large enough for received report' 73 return [number] + list(data)[:length] 74 return None 75 76 def send_feature_report(self, data): 77 return self.write(data) 78 79 80class MockPyusbDevice(): 81 def __init__(self, vendor_id=None, product_id=None, release_number=None, 82 serial_number=None, bus=None, address=None, port=None): 83 self.vendor_id = vendor_id 84 self.product_id = product_id 85 self.release_numer = release_number 86 self.serial_number = serial_number 87 self.bus = bus 88 self.address = address 89 self.port = port 90 91 self.open = noop 92 self.claim = noop 93 self.release = noop 94 self.close = noop 95 96 self._reset_sent() 97 98 def read(self, endpoint, length, timeout=None): 99 if len(self._responses): 100 return self._responses.popleft() 101 return [0] * length 102 103 def write(self, endpoint, data, timeout=None): 104 self._sent_xfers.append(('write', endpoint, data)) 105 106 def ctrl_transfer(self, bmRequestType, bRequest, wValue=0, wIndex=0, 107 data_or_wLength=None, timeout=None): 108 self._sent_xfers.append(('ctrl_transfer', bmRequestType, bRequest, 109 wValue, wIndex, data_or_wLength)) 110 111 def _reset_sent(self): 112 self._sent_xfers = deque() 113 self._responses = deque() 114 115 116VirtualEeprom = namedtuple('VirtualEeprom', ['name', 'data']) 117 118 119class VirtualSmbus: 120 def __init__(self, address_count=256, register_count=256, name='i2c-99', 121 description='Virtual', parent_vendor=0xff01, parent_device=0xff02, 122 parent_subsystem_vendor=0xff10, parent_subsystem_device=0xff20, 123 parent_driver='virtual'): 124 125 self._open = False 126 self._data = [[0] * register_count for _ in range(address_count)] 127 128 self.name = name 129 self.description = description 130 self.parent_vendor = parent_vendor 131 self.parent_device = parent_device 132 self.parent_subsystem_vendor = parent_subsystem_vendor 133 self.parent_subsystem_device = parent_subsystem_device 134 self.parent_driver = parent_driver 135 136 def open(self): 137 self._open = True 138 139 def read_byte(self, address): 140 if not self._open: 141 raise OSError('closed') 142 return self._data[address][0] 143 144 def read_byte_data(self, address, register): 145 if not self._open: 146 raise OSError('closed') 147 return self._data[address][register] 148 149 def read_word_data(self, address, register): 150 if not self._open: 151 raise OSError('closed') 152 return self._data[address][register] 153 154 def read_block_data(self, address, register): 155 if not self._open: 156 raise OSError('closed') 157 return self._data[address][register] 158 159 def write_byte(self, address, value): 160 if not self._open: 161 raise OSError('closed') 162 self._data[address][0] = value 163 164 def write_byte_data(self, address, register, value): 165 if not self._open: 166 raise OSError('closed') 167 self._data[address][register] = value 168 169 def write_word_data(self, address, register, value): 170 if not self._open: 171 raise OSError('closed') 172 self._data[address][register] = value 173 174 def write_block_data(self, address, register, data): 175 if not self._open: 176 raise OSError('closed') 177 self._data[address][register] = data 178 179 def close(self): 180 self._open = False 181 182 def emulate_eeprom_at(self, address, name, data): 183 self._data[address] = VirtualEeprom(name, data) # hack 184 185 def load_eeprom(self, address): 186 return self._data[address] # hack 187 188 189@unique 190class VirtualControlMode(Enum): 191 QUIET = 0x0 192 BALANCED = 0x1 193 EXTREME = 0x2 194 195 196CallArgs = namedtuple('CallArgs', ['args', 'kwargs']) 197 198 199class VirtualBusDevice(BaseDriver): 200 def __init__(self, *args, **kwargs): 201 self.call_args = dict() 202 self.call_args['__init__'] = CallArgs(args, kwargs) 203 self.connected = False 204 205 def connect(self, *args, **kwargs): 206 self.call_args['connect'] = CallArgs(args, kwargs) 207 self.connected = True 208 return self 209 210 def disconnect(self, *args, **kwargs): 211 self.call_args['disconnect'] = CallArgs(args, kwargs) 212 self.connected = False 213 214 def initialize(self, *args, **kwargs): 215 self.call_args['initialize'] = CallArgs(args, kwargs) 216 return [ 217 ('Firmware version', '3.14.16', ''), 218 ] 219 220 def get_status(self, *args, **kwargs): 221 self.call_args['status'] = CallArgs(args, kwargs) 222 return [ 223 ('Temperature', 30.4, '°C'), 224 ('Fan control mode', VirtualControlMode.QUIET, ''), 225 ('Animation', None, ''), 226 ('Uptime', timedelta(hours=18, minutes=23, seconds=12), ''), 227 ('Hardware mode', True, ''), 228 ] 229 230 def set_fixed_speed(self, *args, **kwargs): 231 self.call_args['set_fixed_speed'] = CallArgs(args, kwargs) 232 233 def set_speed_profile(self, *args, **kwargs): 234 self.call_args['set_speed_profile'] = CallArgs(args, kwargs) 235 236 def set_color(self, *args, **kwargs): 237 self.call_args['set_color'] = CallArgs(args, kwargs) 238 239 @property 240 def description(self): 241 return 'Virtual Bus Device (experimental)' 242 243 @property 244 def vendor_id(self): 245 return 0x1234 246 247 @property 248 def product_id(self): 249 return 0xabcd 250 251 @property 252 def release_number(self): 253 None 254 255 @property 256 def serial_number(self): 257 raise OSError() 258 259 @property 260 def bus(self): 261 return 'virtual' 262 263 @property 264 def address(self): 265 return 'virtual_address' 266 267 @property 268 def port(self): 269 return None 270 271 272class VirtualBus(BaseBus): 273 def find_devices(self, **kwargs): 274 yield from [VirtualBusDevice()] 275