1"""liquidctl drivers for Corsair Commander Pro devices.
2
3Supported devices:
4
5- Corsair Commander Pro
6- Corsair Lighting Node Pro
7
8
9NOTE:
10    This device currently only has hardware control implemented but it also supports a software control mode.
11    Software control will be enabled at a future time.
12
13
14Copyright (C) 2020–2021  Marshall Asch and contributors
15SPDX-License-Identifier: GPL-3.0-or-later
16"""
17
18import itertools
19import logging
20import re
21from enum import Enum, unique
22
23from liquidctl.driver.usb import UsbHidDriver
24from liquidctl.error import NotSupportedByDevice
25from liquidctl.keyval import RuntimeStorage
26from liquidctl.pmbus import compute_pec
27from liquidctl.util import clamp, fraction_of_byte, u16be_from, u16le_from, \
28                           normalize_profile, check_unsafe, map_direction
29
30_LOGGER = logging.getLogger(__name__)
31
32_REPORT_LENGTH = 64
33_RESPONSE_LENGTH = 16
34
35
36_CMD_GET_FIRMWARE = 0x02
37_CMD_GET_BOOTLOADER = 0x06
38_CMD_GET_TEMP_CONFIG = 0x10
39_CMD_GET_TEMP = 0x11
40_CMD_GET_VOLTS = 0x12
41_CMD_GET_FAN_MODES = 0x20
42_CMD_GET_FAN_RPM = 0x21
43_CMD_SET_FAN_DUTY = 0x23
44_CMD_SET_FAN_PROFILE = 0x25
45
46_CMD_RESET_LED_CHANNEL = 0x37
47_CMD_BEGIN_LED_EFFECT = 0x34
48_CMD_SET_LED_CHANNEL_STATE = 0x38
49_CMD_LED_EFFECT = 0x35
50_CMD_LED_COMMIT = 0x33
51
52_LED_PORT_STATE_HARDWARE = 0x01
53_LED_PORT_STATE_SOFTWARE = 0x02
54_LED_SPEED_FAST = 0x00
55_LED_SPEED_MEDIUM = 0x01
56_LED_SPEED_SLOW = 0x02
57
58_LED_DIRECTION_FORWARD = 0x01
59_LED_DIRECTION_BACKWARD = 0x00
60
61_FAN_MODE_DISCONNECTED = 0x00
62_FAN_MODE_DC = 0x01
63_FAN_MODE_PWM = 0x02
64
65
66_PROFILE_LENGTH = 6
67_CRITICAL_TEMPERATURE = 60
68_CRITICAL_TEMPERATURE_HIGH = 100
69_MAX_FAN_RPM = 5000             # I have no idea if this is a good value or not
70_MAX_LEDS = 204
71
72_MODES = {
73    'off': 0x04,            # this is a special case of fixed
74    'rainbow': 0x00,
75    'color_shift': 0x01,
76    'color_pulse': 0x02,
77    'color_wave': 0x03,
78    'fixed': 0x04,
79    # 'temperature': 0x05,    # ignore this
80    'visor': 0x06,
81    'marquee': 0x07,
82    'blink': 0x08,
83    'sequential': 0x09,
84    'rainbow2': 0x0a,
85}
86
87
88def _prepare_profile(original, critcalTempature):
89    clamped = ((temp, clamp(duty, 0, _MAX_FAN_RPM)) for temp, duty in original)
90    normal = normalize_profile(clamped, critcalTempature, _MAX_FAN_RPM)
91    missing = _PROFILE_LENGTH - len(normal)
92    if missing < 0:
93        raise ValueError(f'too many points in profile (remove {-missing})')
94    if missing > 0:
95        normal += missing * [(critcalTempature, _MAX_FAN_RPM)]
96    return normal
97
98
99def _quoted(*names):
100    return ', '.join(map(repr, names))
101
102
103def _fan_mode_desc(mode):
104    """This will convert the fan mode value to a descriptive name.
105    """
106
107    if mode == _FAN_MODE_DC:
108        return 'DC'
109    elif mode == _FAN_MODE_PWM:
110        return 'PWM'
111    else:
112        if mode != _FAN_MODE_DISCONNECTED:
113            _LOGGER.warning('unknown fan mode: {mode:#04x}')
114        return None
115
116
117class CommanderPro(UsbHidDriver):
118    """Corsair Commander Pro LED and fan hub"""
119
120    SUPPORTED_DEVICES = [
121        (0x1b1c, 0x0c10, None, 'Corsair Commander Pro',
122            {'fan_count': 6, 'temp_probs': 4, 'led_channels': 2}),
123        (0x1b1c, 0x0c0b, None, 'Corsair Lighting Node Pro (experimental)',
124            {'fan_count': 0, 'temp_probs': 0, 'led_channels': 2}),
125        (0x1b1c, 0x0c1a, None, 'Corsair Lighting Node Core (experimental)',
126            {'fan_count': 0, 'temp_probs': 0, 'led_channels': 1}),
127        (0x1b1c, 0x1d00, None, 'Corsair Obsidian 1000D (experimental)',
128            {'fan_count': 6, 'temp_probs': 4, 'led_channels': 2}),
129    ]
130
131    def __init__(self, device, description, fan_count, temp_probs, led_channels, **kwargs):
132        super().__init__(device, description, **kwargs)
133
134        # the following fields are only initialized in connect()
135        self._data = None
136        self._fan_names = [f'fan{i+1}' for i in range(fan_count)]
137        if led_channels == 1:
138            self._led_names = ['led']
139        else:
140            self._led_names = [f'led{i+1}' for i in range(led_channels)]
141        self._temp_probs = temp_probs
142        self._fan_count = fan_count
143
144    def connect(self, runtime_storage=None, **kwargs):
145        """Connect to the device."""
146        ret = super().connect(**kwargs)
147        if runtime_storage:
148            self._data = runtime_storage
149        else:
150            ids = f'vid{self.vendor_id:04x}_pid{self.product_id:04x}'
151            # must use the HID path because there is no serial number; however,
152            # these can be quite long on Windows and macOS, so only take the
153            # numbers, since they are likely the only parts that vary between two
154            # devices of the same model
155            loc = 'loc' + '_'.join(re.findall(r'\d+', self.address))
156            self._data = RuntimeStorage(key_prefixes=[ids, loc])
157        return ret
158
159    def initialize(self, **kwargs):
160        """Initialize the device and get the fan modes.
161
162        The device should be initialized every time it is powered on, including when
163        the system resumes from suspending to memory.
164
165        Returns a list of `(property, value, unit)` tuples.
166        """
167
168        res = self._send_command(_CMD_GET_FIRMWARE)
169        fw_version = (res[1], res[2], res[3])
170
171        res = self._send_command(_CMD_GET_BOOTLOADER)
172        bootloader_version = (res[1], res[2])               # is it possible for there to be a third value?
173
174        status = [
175            ('Firmware version', '{}.{}.{}'.format(*fw_version), ''),
176            ('Bootloader version', '{}.{}'.format(*bootloader_version), ''),
177        ]
178
179        if self._temp_probs > 0:
180            res = self._send_command(_CMD_GET_TEMP_CONFIG)
181            temp_connected = res[1:5]
182            self._data.store('temp_sensors_connected', temp_connected)
183            status += [
184                (f'Temperature probe {i + 1}', bool(temp_connected[i]), '')
185                for i in range(4)
186            ]
187
188        if self._fan_count > 0:
189            # get the information about how the fans are connected, probably want to save this for later
190            res = self._send_command(_CMD_GET_FAN_MODES)
191            fanModes = res[1:self._fan_count+1]
192            self._data.store('fan_modes', fanModes)
193            status += [
194                (f'Fan {i + 1} control mode', _fan_mode_desc(fanModes[i]), '')
195                for i in range(6)
196            ]
197
198        return status
199
200    def get_status(self, **kwargs):
201        """Get a status report.
202
203        Returns a list of `(property, value, unit)` tuples.
204        """
205
206        if self._fan_count == 0 or self._temp_probs == 0:
207            _LOGGER.debug('only Commander Pro and Obsidian 1000D report status')
208            return []
209
210        temp_probes = self._data.load('temp_sensors_connected', default=[0]*self._temp_probs)
211        fan_modes = self._data.load('fan_modes', default=[0]*self._fan_count)
212
213        status = []
214
215        # get the temperature sensor values
216        for i, probe_enabled in enumerate(temp_probes):
217            if probe_enabled:
218                temp = self._get_temp(i)
219                status.append((f'Temperature {i + 1}', temp, '°C'))
220
221        # get fan RPMs of connected fans
222        for i, fan_mode in enumerate(fan_modes):
223            if fan_mode == _FAN_MODE_DC or fan_mode == _FAN_MODE_PWM:
224                speed = self._get_fan_rpm(i)
225                status.append((f'Fan {i + 1} speed', speed, 'rpm'))
226
227        # get the real power supply voltages
228        for i, rail in enumerate(["+12V", "+5V", "+3.3V"]):
229            raw = self._send_command(_CMD_GET_VOLTS, [i])
230            voltage = u16be_from(raw, offset=1) / 1000
231            status.append((f'{rail} rail', voltage, 'V'))
232
233        return status
234
235    def _get_temp(self, sensor_num):
236        """This will get the temperature in degrees celsius for the specified temp sensor.
237
238        sensor number MUST be in range of 0-3
239        """
240
241        if self._temp_probs == 0:
242            raise ValueError('this device does not have a temperature sensor')
243
244        if sensor_num < 0 or sensor_num > 3:
245            raise ValueError(f'sensor_num {sensor_num} invalid, must be between 0 and 3')
246
247        res = self._send_command(_CMD_GET_TEMP, [sensor_num])
248        temp = u16be_from(res, offset=1) / 100
249
250        return temp
251
252    def _get_fan_rpm(self, fan_num):
253        """This will get the rpm value of the fan.
254
255        fan number MUST be in range of 0-5
256        """
257
258        if self._fan_count == 0:
259            raise ValueError('this device does not have any fans')
260
261        if fan_num < 0 or fan_num > 5:
262            raise ValueError(f'fan_num {fan_num} invalid, must be between 0 and 5')
263
264        res = self._send_command(_CMD_GET_FAN_RPM, [fan_num])
265        speed = u16be_from(res, offset=1)
266
267        return speed
268
269    def _get_hw_fan_channels(self, channel):
270        """This will get a list of all the fan channels that the command should be sent to
271        It will look up the name of the fan channel given and return a list of the real fan number
272        """
273        if channel == 'sync':
274            return list(range(len(self._fan_names)))
275        elif channel in self._fan_names:
276            return [self._fan_names.index(channel)]
277        elif len(self._fan_names) > 1:
278            raise ValueError(f'unknown channel, should be one of: {_quoted("sync", *self._fan_names)}')
279
280    def _get_hw_led_channels(self, channel):
281        """This will get a list of all the led channels that the command should be sent to
282        It will look up the name of the led channel given and return a list of the real led device number
283        """
284        if channel == 'sync':
285            return list(range(len(self._led_names)))
286        elif channel in self._led_names:
287            return [self._led_names.index(channel)]
288        elif len(self._led_names) > 1:
289            raise ValueError(f'unknown channel, should be one of: {_quoted("sync", *self._led_names)}')
290
291    def set_fixed_speed(self, channel, duty, **kwargs):
292        """Set fan or fans to a fixed speed duty.
293
294        Valid channel values are 'fanN', where N >= 1 is the fan number, and
295        'fan', to simultaneously configure all fans.  Unconfigured fan channels
296        may default to 100% duty.
297
298        Different commands for sending fixed percent (0x23) and fixed rpm (0x24)
299        Probably want to use fixed percent for this untill the rpm flag is enabled.
300        Can only send one fan command at a time, if fan mode is unset will need to send 6?
301        messages (or 1 per enabled fan)
302        """
303
304        if self._fan_count == 0:
305            raise NotSupportedByDevice()
306
307        duty = clamp(duty, 0, 100)
308        fan_channels = self._get_hw_fan_channels(channel)
309        fan_modes = self._data.load('fan_modes', default=[0]*self._fan_count)
310
311        for fan in fan_channels:
312            mode = fan_modes[fan]
313            if mode == _FAN_MODE_DC or mode == _FAN_MODE_PWM:
314                self._send_command(_CMD_SET_FAN_DUTY, [fan, duty])
315
316    def set_speed_profile(self, channel, profile, temperature_sensor=1, **kwargs):
317        """Set fan or fans to follow a speed duty profile.
318
319        Valid channel values are 'fanN', where N >= 1 is the fan number, and
320        'fan', to simultaneously configure all fans.  Unconfigured fan channels
321        may default to 100% duty.
322
323        Up to six (temperature, duty) pairs can be supplied in `profile`,
324        with temperatures in Celsius and duty values in percentage.  The last
325        point should set the fan to 100% duty cycle, or be omitted; in the
326        latter case the fan will be set to max out at 60°C.
327        """
328
329        # send fan num, temp sensor, check to make sure it is actually enabled, and do not let the user send external sensor
330        # 6 2-byte big endian temps (celsius * 100), then 6 2-byte big endian rpms
331        # need to figure out how to find out what the max rpm is for the given fan
332
333        if self._fan_count == 0:
334            raise NotSupportedByDevice()
335
336        profile = list(profile)
337
338        criticalTemp = _CRITICAL_TEMPERATURE_HIGH if check_unsafe('high_temperature', **kwargs) else _CRITICAL_TEMPERATURE
339        profile = _prepare_profile(profile, criticalTemp)
340
341        # fan_type = kwargs['fan_type'] # need to make sure this is set
342        temp_sensor = clamp(temperature_sensor, 1, self._temp_probs)
343
344        sensors = self._data.load('temp_sensors_connected', default=[0]*self._temp_probs)
345
346        if sensors[temp_sensor-1] != 1:
347            raise ValueError('the specified temperature sensor is not connected')
348
349        buf = bytearray(26)
350        buf[1] = temp_sensor-1  # 0  # use temp sensor 1
351
352        for i, entry in enumerate(profile):
353            temp = entry[0]*100
354            rpm = entry[1]
355
356            # convert both values to 2 byte big endian values
357            buf[2 + i*2] = temp.to_bytes(2, byteorder='big')[0]
358            buf[3 + i*2] = temp.to_bytes(2, byteorder='big')[1]
359            buf[14 + i*2] = rpm.to_bytes(2, byteorder='big')[0]
360            buf[15 + i*2] = rpm.to_bytes(2, byteorder='big')[1]
361
362        fan_channels = self._get_hw_fan_channels(channel)
363        fan_modes = self._data.load('fan_modes', default=[0]*self._fan_count)
364
365        for fan in fan_channels:
366            mode = fan_modes[fan]
367            if mode == _FAN_MODE_DC or mode == _FAN_MODE_PWM:
368                buf[0] = fan
369                self._send_command(_CMD_SET_FAN_PROFILE, buf)
370
371    def set_color(self, channel, mode, colors, direction='forward',
372                  speed='medium', start_led=1, maximum_leds=_MAX_LEDS, **kwargs):
373        """Set the color of each LED.
374
375        The table bellow summarizes the available channels, modes, and their
376        associated maximum number of colors for each device family.
377
378        | Channel  | Mode        | Num colors |
379        | -------- | ----------- | ---------- |
380        | led      | off         |          0 |
381        | led      | fixed       |          1 |
382        | led      | color_shift |          2 |
383        | led      | color_pulse |          2 |
384        | led      | color_wave  |          2 |
385        | led      | visor       |          2 |
386        | led      | blink       |          2 |
387        | led      | marquee     |          1 |
388        | led      | sequential  |          1 |
389        | led      | rainbow     |          0 |
390        | led      | rainbow2    |          0 |
391        """
392
393        # a special mode to clear the current led settings.
394        # this is usefull if the the user wants to use a led mode for multiple devices
395        if mode == 'clear':
396            self._data.store('saved_effects', None)
397            return
398
399        colors = list(colors)
400        expanded = colors[:3]
401        c = itertools.chain(*((r, g, b) for r, g, b in expanded))
402        colors = list(c)
403
404        direction = map_direction(direction, _LED_DIRECTION_FORWARD, _LED_DIRECTION_BACKWARD)
405        speed = _LED_SPEED_SLOW if speed == 'slow' else _LED_SPEED_FAST if speed == 'fast' else _LED_SPEED_MEDIUM
406        start_led = clamp(start_led, 1, _MAX_LEDS) - 1
407        num_leds = clamp(maximum_leds, 1, _MAX_LEDS - start_led)
408        random_colors = 0x00 if mode == 'off' or len(colors) != 0 else 0x01
409        mode_val = _MODES.get(mode, -1)
410
411        if mode_val == -1:
412            raise ValueError(f'mode "{mode}" is not valid')
413
414        # FIXME clears on 'off', while the docs only mention this behavior for 'clear'
415        saved_effects = [] if mode == 'off' else self._data.load('saved_effects', default=[])
416
417        for led_channel in self._get_hw_led_channels(channel):
418
419            lighting_effect = {
420                    'channel': led_channel,
421                    'start_led': start_led,
422                    'num_leds': num_leds,
423                    'mode': mode_val,
424                    'speed': speed,
425                    'direction': direction,
426                    'random_colors': random_colors,
427                    'colors': colors
428                }
429
430            saved_effects += [lighting_effect]
431
432            # check to make sure that too many LED effects are not being sent.
433            # the max seems to be 8 as found here https://github.com/liquidctl/liquidctl/issues/154#issuecomment-762372583
434            if len(saved_effects) > 8:
435                _LOGGER.warning(f'too many lighting effects. Run `liquidctl set {channel} color clear` to reset the effect')
436                return
437
438            # start sending the led commands
439            self._send_command(_CMD_RESET_LED_CHANNEL, [led_channel])
440            self._send_command(_CMD_BEGIN_LED_EFFECT, [led_channel])
441            self._send_command(_CMD_SET_LED_CHANNEL_STATE, [led_channel, 0x01])
442
443        # FIXME clears on 'off', while the docs only mention this behavior for 'clear'
444        self._data.store('saved_effects', None if mode == 'off' else saved_effects)
445
446        for effect in saved_effects:
447            config = [effect.get('channel'),
448                      effect.get('start_led'),
449                      effect.get('num_leds'),
450                      effect.get('mode'),
451                      effect.get('speed'),
452                      effect.get('direction'),
453                      effect.get('random_colors'),
454                      0xff
455                      ] + effect.get('colors')
456            self._send_command(_CMD_LED_EFFECT, config)
457
458        self._send_command(_CMD_LED_COMMIT, [0xff])
459
460    def _send_command(self, command, data=None):
461        # self.device.write expects buf[0] to be the report number or 0 if not used
462        buf = bytearray(_REPORT_LENGTH + 1)
463        buf[1] = command
464        start_at = 2
465
466        if data:
467            data = data[:_REPORT_LENGTH-1]
468            buf[start_at: start_at + len(data)] = data
469
470        self.device.clear_enqueued_reports()
471        self.device.write(buf)
472        buf = bytes(self.device.read(_RESPONSE_LENGTH))
473        return buf
474