1"""liquidctl drivers for Corsair Commander Pro devices. 2 3Supported devices: 4 5- Corsair Commander Pro 6- Corsair Lighting Node Pro 7 8 9NOTE: 10 This device currently only has hardware control implemented but it also supports a software control mode. 11 Software control will be enabled at a future time. 12 13 14Copyright (C) 2020–2021 Marshall Asch and contributors 15SPDX-License-Identifier: GPL-3.0-or-later 16""" 17 18import itertools 19import logging 20import re 21from enum import Enum, unique 22 23from liquidctl.driver.usb import UsbHidDriver 24from liquidctl.error import NotSupportedByDevice 25from liquidctl.keyval import RuntimeStorage 26from liquidctl.pmbus import compute_pec 27from liquidctl.util import clamp, fraction_of_byte, u16be_from, u16le_from, \ 28 normalize_profile, check_unsafe, map_direction 29 30_LOGGER = logging.getLogger(__name__) 31 32_REPORT_LENGTH = 64 33_RESPONSE_LENGTH = 16 34 35 36_CMD_GET_FIRMWARE = 0x02 37_CMD_GET_BOOTLOADER = 0x06 38_CMD_GET_TEMP_CONFIG = 0x10 39_CMD_GET_TEMP = 0x11 40_CMD_GET_VOLTS = 0x12 41_CMD_GET_FAN_MODES = 0x20 42_CMD_GET_FAN_RPM = 0x21 43_CMD_SET_FAN_DUTY = 0x23 44_CMD_SET_FAN_PROFILE = 0x25 45 46_CMD_RESET_LED_CHANNEL = 0x37 47_CMD_BEGIN_LED_EFFECT = 0x34 48_CMD_SET_LED_CHANNEL_STATE = 0x38 49_CMD_LED_EFFECT = 0x35 50_CMD_LED_COMMIT = 0x33 51 52_LED_PORT_STATE_HARDWARE = 0x01 53_LED_PORT_STATE_SOFTWARE = 0x02 54_LED_SPEED_FAST = 0x00 55_LED_SPEED_MEDIUM = 0x01 56_LED_SPEED_SLOW = 0x02 57 58_LED_DIRECTION_FORWARD = 0x01 59_LED_DIRECTION_BACKWARD = 0x00 60 61_FAN_MODE_DISCONNECTED = 0x00 62_FAN_MODE_DC = 0x01 63_FAN_MODE_PWM = 0x02 64 65 66_PROFILE_LENGTH = 6 67_CRITICAL_TEMPERATURE = 60 68_CRITICAL_TEMPERATURE_HIGH = 100 69_MAX_FAN_RPM = 5000 # I have no idea if this is a good value or not 70_MAX_LEDS = 204 71 72_MODES = { 73 'off': 0x04, # this is a special case of fixed 74 'rainbow': 0x00, 75 'color_shift': 0x01, 76 'color_pulse': 0x02, 77 'color_wave': 0x03, 78 'fixed': 0x04, 79 # 'temperature': 0x05, # ignore this 80 'visor': 0x06, 81 'marquee': 0x07, 82 'blink': 0x08, 83 'sequential': 0x09, 84 'rainbow2': 0x0a, 85} 86 87 88def _prepare_profile(original, critcalTempature): 89 clamped = ((temp, clamp(duty, 0, _MAX_FAN_RPM)) for temp, duty in original) 90 normal = normalize_profile(clamped, critcalTempature, _MAX_FAN_RPM) 91 missing = _PROFILE_LENGTH - len(normal) 92 if missing < 0: 93 raise ValueError(f'too many points in profile (remove {-missing})') 94 if missing > 0: 95 normal += missing * [(critcalTempature, _MAX_FAN_RPM)] 96 return normal 97 98 99def _quoted(*names): 100 return ', '.join(map(repr, names)) 101 102 103def _fan_mode_desc(mode): 104 """This will convert the fan mode value to a descriptive name. 105 """ 106 107 if mode == _FAN_MODE_DC: 108 return 'DC' 109 elif mode == _FAN_MODE_PWM: 110 return 'PWM' 111 else: 112 if mode != _FAN_MODE_DISCONNECTED: 113 _LOGGER.warning('unknown fan mode: {mode:#04x}') 114 return None 115 116 117class CommanderPro(UsbHidDriver): 118 """Corsair Commander Pro LED and fan hub""" 119 120 SUPPORTED_DEVICES = [ 121 (0x1b1c, 0x0c10, None, 'Corsair Commander Pro', 122 {'fan_count': 6, 'temp_probs': 4, 'led_channels': 2}), 123 (0x1b1c, 0x0c0b, None, 'Corsair Lighting Node Pro (experimental)', 124 {'fan_count': 0, 'temp_probs': 0, 'led_channels': 2}), 125 (0x1b1c, 0x0c1a, None, 'Corsair Lighting Node Core (experimental)', 126 {'fan_count': 0, 'temp_probs': 0, 'led_channels': 1}), 127 (0x1b1c, 0x1d00, None, 'Corsair Obsidian 1000D (experimental)', 128 {'fan_count': 6, 'temp_probs': 4, 'led_channels': 2}), 129 ] 130 131 def __init__(self, device, description, fan_count, temp_probs, led_channels, **kwargs): 132 super().__init__(device, description, **kwargs) 133 134 # the following fields are only initialized in connect() 135 self._data = None 136 self._fan_names = [f'fan{i+1}' for i in range(fan_count)] 137 if led_channels == 1: 138 self._led_names = ['led'] 139 else: 140 self._led_names = [f'led{i+1}' for i in range(led_channels)] 141 self._temp_probs = temp_probs 142 self._fan_count = fan_count 143 144 def connect(self, runtime_storage=None, **kwargs): 145 """Connect to the device.""" 146 ret = super().connect(**kwargs) 147 if runtime_storage: 148 self._data = runtime_storage 149 else: 150 ids = f'vid{self.vendor_id:04x}_pid{self.product_id:04x}' 151 # must use the HID path because there is no serial number; however, 152 # these can be quite long on Windows and macOS, so only take the 153 # numbers, since they are likely the only parts that vary between two 154 # devices of the same model 155 loc = 'loc' + '_'.join(re.findall(r'\d+', self.address)) 156 self._data = RuntimeStorage(key_prefixes=[ids, loc]) 157 return ret 158 159 def initialize(self, **kwargs): 160 """Initialize the device and get the fan modes. 161 162 The device should be initialized every time it is powered on, including when 163 the system resumes from suspending to memory. 164 165 Returns a list of `(property, value, unit)` tuples. 166 """ 167 168 res = self._send_command(_CMD_GET_FIRMWARE) 169 fw_version = (res[1], res[2], res[3]) 170 171 res = self._send_command(_CMD_GET_BOOTLOADER) 172 bootloader_version = (res[1], res[2]) # is it possible for there to be a third value? 173 174 status = [ 175 ('Firmware version', '{}.{}.{}'.format(*fw_version), ''), 176 ('Bootloader version', '{}.{}'.format(*bootloader_version), ''), 177 ] 178 179 if self._temp_probs > 0: 180 res = self._send_command(_CMD_GET_TEMP_CONFIG) 181 temp_connected = res[1:5] 182 self._data.store('temp_sensors_connected', temp_connected) 183 status += [ 184 (f'Temperature probe {i + 1}', bool(temp_connected[i]), '') 185 for i in range(4) 186 ] 187 188 if self._fan_count > 0: 189 # get the information about how the fans are connected, probably want to save this for later 190 res = self._send_command(_CMD_GET_FAN_MODES) 191 fanModes = res[1:self._fan_count+1] 192 self._data.store('fan_modes', fanModes) 193 status += [ 194 (f'Fan {i + 1} control mode', _fan_mode_desc(fanModes[i]), '') 195 for i in range(6) 196 ] 197 198 return status 199 200 def get_status(self, **kwargs): 201 """Get a status report. 202 203 Returns a list of `(property, value, unit)` tuples. 204 """ 205 206 if self._fan_count == 0 or self._temp_probs == 0: 207 _LOGGER.debug('only Commander Pro and Obsidian 1000D report status') 208 return [] 209 210 temp_probes = self._data.load('temp_sensors_connected', default=[0]*self._temp_probs) 211 fan_modes = self._data.load('fan_modes', default=[0]*self._fan_count) 212 213 status = [] 214 215 # get the temperature sensor values 216 for i, probe_enabled in enumerate(temp_probes): 217 if probe_enabled: 218 temp = self._get_temp(i) 219 status.append((f'Temperature {i + 1}', temp, '°C')) 220 221 # get fan RPMs of connected fans 222 for i, fan_mode in enumerate(fan_modes): 223 if fan_mode == _FAN_MODE_DC or fan_mode == _FAN_MODE_PWM: 224 speed = self._get_fan_rpm(i) 225 status.append((f'Fan {i + 1} speed', speed, 'rpm')) 226 227 # get the real power supply voltages 228 for i, rail in enumerate(["+12V", "+5V", "+3.3V"]): 229 raw = self._send_command(_CMD_GET_VOLTS, [i]) 230 voltage = u16be_from(raw, offset=1) / 1000 231 status.append((f'{rail} rail', voltage, 'V')) 232 233 return status 234 235 def _get_temp(self, sensor_num): 236 """This will get the temperature in degrees celsius for the specified temp sensor. 237 238 sensor number MUST be in range of 0-3 239 """ 240 241 if self._temp_probs == 0: 242 raise ValueError('this device does not have a temperature sensor') 243 244 if sensor_num < 0 or sensor_num > 3: 245 raise ValueError(f'sensor_num {sensor_num} invalid, must be between 0 and 3') 246 247 res = self._send_command(_CMD_GET_TEMP, [sensor_num]) 248 temp = u16be_from(res, offset=1) / 100 249 250 return temp 251 252 def _get_fan_rpm(self, fan_num): 253 """This will get the rpm value of the fan. 254 255 fan number MUST be in range of 0-5 256 """ 257 258 if self._fan_count == 0: 259 raise ValueError('this device does not have any fans') 260 261 if fan_num < 0 or fan_num > 5: 262 raise ValueError(f'fan_num {fan_num} invalid, must be between 0 and 5') 263 264 res = self._send_command(_CMD_GET_FAN_RPM, [fan_num]) 265 speed = u16be_from(res, offset=1) 266 267 return speed 268 269 def _get_hw_fan_channels(self, channel): 270 """This will get a list of all the fan channels that the command should be sent to 271 It will look up the name of the fan channel given and return a list of the real fan number 272 """ 273 if channel == 'sync': 274 return list(range(len(self._fan_names))) 275 elif channel in self._fan_names: 276 return [self._fan_names.index(channel)] 277 elif len(self._fan_names) > 1: 278 raise ValueError(f'unknown channel, should be one of: {_quoted("sync", *self._fan_names)}') 279 280 def _get_hw_led_channels(self, channel): 281 """This will get a list of all the led channels that the command should be sent to 282 It will look up the name of the led channel given and return a list of the real led device number 283 """ 284 if channel == 'sync': 285 return list(range(len(self._led_names))) 286 elif channel in self._led_names: 287 return [self._led_names.index(channel)] 288 elif len(self._led_names) > 1: 289 raise ValueError(f'unknown channel, should be one of: {_quoted("sync", *self._led_names)}') 290 291 def set_fixed_speed(self, channel, duty, **kwargs): 292 """Set fan or fans to a fixed speed duty. 293 294 Valid channel values are 'fanN', where N >= 1 is the fan number, and 295 'fan', to simultaneously configure all fans. Unconfigured fan channels 296 may default to 100% duty. 297 298 Different commands for sending fixed percent (0x23) and fixed rpm (0x24) 299 Probably want to use fixed percent for this untill the rpm flag is enabled. 300 Can only send one fan command at a time, if fan mode is unset will need to send 6? 301 messages (or 1 per enabled fan) 302 """ 303 304 if self._fan_count == 0: 305 raise NotSupportedByDevice() 306 307 duty = clamp(duty, 0, 100) 308 fan_channels = self._get_hw_fan_channels(channel) 309 fan_modes = self._data.load('fan_modes', default=[0]*self._fan_count) 310 311 for fan in fan_channels: 312 mode = fan_modes[fan] 313 if mode == _FAN_MODE_DC or mode == _FAN_MODE_PWM: 314 self._send_command(_CMD_SET_FAN_DUTY, [fan, duty]) 315 316 def set_speed_profile(self, channel, profile, temperature_sensor=1, **kwargs): 317 """Set fan or fans to follow a speed duty profile. 318 319 Valid channel values are 'fanN', where N >= 1 is the fan number, and 320 'fan', to simultaneously configure all fans. Unconfigured fan channels 321 may default to 100% duty. 322 323 Up to six (temperature, duty) pairs can be supplied in `profile`, 324 with temperatures in Celsius and duty values in percentage. The last 325 point should set the fan to 100% duty cycle, or be omitted; in the 326 latter case the fan will be set to max out at 60°C. 327 """ 328 329 # send fan num, temp sensor, check to make sure it is actually enabled, and do not let the user send external sensor 330 # 6 2-byte big endian temps (celsius * 100), then 6 2-byte big endian rpms 331 # need to figure out how to find out what the max rpm is for the given fan 332 333 if self._fan_count == 0: 334 raise NotSupportedByDevice() 335 336 profile = list(profile) 337 338 criticalTemp = _CRITICAL_TEMPERATURE_HIGH if check_unsafe('high_temperature', **kwargs) else _CRITICAL_TEMPERATURE 339 profile = _prepare_profile(profile, criticalTemp) 340 341 # fan_type = kwargs['fan_type'] # need to make sure this is set 342 temp_sensor = clamp(temperature_sensor, 1, self._temp_probs) 343 344 sensors = self._data.load('temp_sensors_connected', default=[0]*self._temp_probs) 345 346 if sensors[temp_sensor-1] != 1: 347 raise ValueError('the specified temperature sensor is not connected') 348 349 buf = bytearray(26) 350 buf[1] = temp_sensor-1 # 0 # use temp sensor 1 351 352 for i, entry in enumerate(profile): 353 temp = entry[0]*100 354 rpm = entry[1] 355 356 # convert both values to 2 byte big endian values 357 buf[2 + i*2] = temp.to_bytes(2, byteorder='big')[0] 358 buf[3 + i*2] = temp.to_bytes(2, byteorder='big')[1] 359 buf[14 + i*2] = rpm.to_bytes(2, byteorder='big')[0] 360 buf[15 + i*2] = rpm.to_bytes(2, byteorder='big')[1] 361 362 fan_channels = self._get_hw_fan_channels(channel) 363 fan_modes = self._data.load('fan_modes', default=[0]*self._fan_count) 364 365 for fan in fan_channels: 366 mode = fan_modes[fan] 367 if mode == _FAN_MODE_DC or mode == _FAN_MODE_PWM: 368 buf[0] = fan 369 self._send_command(_CMD_SET_FAN_PROFILE, buf) 370 371 def set_color(self, channel, mode, colors, direction='forward', 372 speed='medium', start_led=1, maximum_leds=_MAX_LEDS, **kwargs): 373 """Set the color of each LED. 374 375 The table bellow summarizes the available channels, modes, and their 376 associated maximum number of colors for each device family. 377 378 | Channel | Mode | Num colors | 379 | -------- | ----------- | ---------- | 380 | led | off | 0 | 381 | led | fixed | 1 | 382 | led | color_shift | 2 | 383 | led | color_pulse | 2 | 384 | led | color_wave | 2 | 385 | led | visor | 2 | 386 | led | blink | 2 | 387 | led | marquee | 1 | 388 | led | sequential | 1 | 389 | led | rainbow | 0 | 390 | led | rainbow2 | 0 | 391 """ 392 393 # a special mode to clear the current led settings. 394 # this is usefull if the the user wants to use a led mode for multiple devices 395 if mode == 'clear': 396 self._data.store('saved_effects', None) 397 return 398 399 colors = list(colors) 400 expanded = colors[:3] 401 c = itertools.chain(*((r, g, b) for r, g, b in expanded)) 402 colors = list(c) 403 404 direction = map_direction(direction, _LED_DIRECTION_FORWARD, _LED_DIRECTION_BACKWARD) 405 speed = _LED_SPEED_SLOW if speed == 'slow' else _LED_SPEED_FAST if speed == 'fast' else _LED_SPEED_MEDIUM 406 start_led = clamp(start_led, 1, _MAX_LEDS) - 1 407 num_leds = clamp(maximum_leds, 1, _MAX_LEDS - start_led) 408 random_colors = 0x00 if mode == 'off' or len(colors) != 0 else 0x01 409 mode_val = _MODES.get(mode, -1) 410 411 if mode_val == -1: 412 raise ValueError(f'mode "{mode}" is not valid') 413 414 # FIXME clears on 'off', while the docs only mention this behavior for 'clear' 415 saved_effects = [] if mode == 'off' else self._data.load('saved_effects', default=[]) 416 417 for led_channel in self._get_hw_led_channels(channel): 418 419 lighting_effect = { 420 'channel': led_channel, 421 'start_led': start_led, 422 'num_leds': num_leds, 423 'mode': mode_val, 424 'speed': speed, 425 'direction': direction, 426 'random_colors': random_colors, 427 'colors': colors 428 } 429 430 saved_effects += [lighting_effect] 431 432 # check to make sure that too many LED effects are not being sent. 433 # the max seems to be 8 as found here https://github.com/liquidctl/liquidctl/issues/154#issuecomment-762372583 434 if len(saved_effects) > 8: 435 _LOGGER.warning(f'too many lighting effects. Run `liquidctl set {channel} color clear` to reset the effect') 436 return 437 438 # start sending the led commands 439 self._send_command(_CMD_RESET_LED_CHANNEL, [led_channel]) 440 self._send_command(_CMD_BEGIN_LED_EFFECT, [led_channel]) 441 self._send_command(_CMD_SET_LED_CHANNEL_STATE, [led_channel, 0x01]) 442 443 # FIXME clears on 'off', while the docs only mention this behavior for 'clear' 444 self._data.store('saved_effects', None if mode == 'off' else saved_effects) 445 446 for effect in saved_effects: 447 config = [effect.get('channel'), 448 effect.get('start_led'), 449 effect.get('num_leds'), 450 effect.get('mode'), 451 effect.get('speed'), 452 effect.get('direction'), 453 effect.get('random_colors'), 454 0xff 455 ] + effect.get('colors') 456 self._send_command(_CMD_LED_EFFECT, config) 457 458 self._send_command(_CMD_LED_COMMIT, [0xff]) 459 460 def _send_command(self, command, data=None): 461 # self.device.write expects buf[0] to be the report number or 0 if not used 462 buf = bytearray(_REPORT_LENGTH + 1) 463 buf[1] = command 464 start_at = 2 465 466 if data: 467 data = data[:_REPORT_LENGTH-1] 468 buf[start_at: start_at + len(data)] = data 469 470 self.device.clear_enqueued_reports() 471 self.device.write(buf) 472 buf = bytes(self.device.read(_RESPONSE_LENGTH)) 473 return buf 474