1"""liquidctl drivers for sixth generation Asetek liquid coolers.
2
3Copyright (C) 2020–2021  Andrew Robertson, Jonas Malaco and contributors
4
5SPDX-License-Identifier: GPL-3.0-or-later
6"""
7
8import itertools
9import logging
10
11from liquidctl.driver.asetek import _Base690Lc
12from liquidctl.error import NotSupportedByDevice
13from liquidctl.util import clamp
14
15_LOGGER = logging.getLogger(__name__)
16
17_READ_ENDPOINT = 0x81
18_READ_MAX_LENGTH = 32
19_READ_TIMEOUT = 2000
20_WRITE_ENDPOINT = 0x1
21_WRITE_TIMEOUT = 2000
22
23_MAX_PROFILE_POINTS = 7
24
25_CMD_READ_AIO_TEMP = 0xa9
26_CMD_READ_FAN_SPEED = 0x41
27_CMD_READ_FIRMWARE = 0xaa
28_CMD_READ_PUMP_MODE = 0x33
29_CMD_READ_PUMP_SPEED = 0x31
30_CMD_WRITE_COLOR_LIST = 0x56
31_CMD_WRITE_COLOR_SPEED = 0x53
32_CMD_WRITE_FAN_CURVE = 0x40
33_CMD_WRITE_FAN_SPEED = 0x42
34_CMD_WRITE_PUMP_MODE = 0x32
35
36_PUMP_MODES = ['quiet', 'balanced', 'performance']
37
38_COLOR_SPEEDS = ['slower', 'normal', 'faster']
39
40_COLOR_SPEEDS_VALUES = {
41    'shift': [
42        0x46, # slower
43        0x28, # normal
44        0x0F  # faster
45    ],
46    'pulse':[
47        0x50, # slower
48        0x37, # normal
49        0x1E  # faster
50    ],
51    'blinking': [
52        0x0F, # slower
53        0x0A, # normal
54        0x05  # faster
55    ],
56}
57
58_COLOR_CHANGE_MODES = {
59    'alert': [],
60    'shift': [0x55, 0x01],
61    'pulse': [0x52, 0x01],
62    'blinking': [0x58, 0x01],
63    'fixed': [0x55, 0x01],
64}
65
66# FIXME unknown required and maximum values
67_COLOR_COUNT_BOUNDS = {
68    'alert': (3, 3),
69    'shift': (2, 4),
70    'pulse': (1, 4),
71    'blinking': (1, 4),
72    'fixed': (1, 1),
73}
74
75
76def _quoted(*names):
77    return ', '.join(map(repr, names))
78
79
80# we inherit from _Base690Lc to reuse its implementation of connect
81# and disconnect, that emulates the stock SiUSBXp driver on Windows
82class CorsairAsetekProDriver(_Base690Lc):
83    """liquidctl driver for Corsair-branded sixth generation Asetek coolers."""
84
85    SUPPORTED_DEVICES = [
86        (0x1b1c, 0x0c12, None, 'Corsair Hydro H150i Pro (experimental)', {'fan_count': 3}),
87        (0x1b1c, 0x0c13, None, 'Corsair Hydro H115i Pro (experimental)', {'fan_count': 2}),
88        (0x1b1c, 0x0c15, None, 'Corsair Hydro H100i Pro (experimental)', {'fan_count': 2})
89    ]
90
91    def __init__(self, device, description, fan_count, **kwargs):
92        super().__init__(device, description, **kwargs)
93        self._fan_count = fan_count
94        self._data = None
95
96    def _post(self, data, *, read_length=None):
97        """Write `data` and return response of up to `read_length` bytes."""
98
99        assert read_length is not None and read_length <= _READ_MAX_LENGTH
100
101        self.device.write(_WRITE_ENDPOINT, data, _WRITE_TIMEOUT)
102        return self.device.read(_READ_ENDPOINT, read_length, _READ_TIMEOUT)[0:read_length]
103
104    def initialize(self, pump_mode='balanced', **kwargs):
105        """Initialize the device."""
106
107        pump_mode = pump_mode.lower()
108
109        if pump_mode not in _PUMP_MODES:
110            raise ValueError(f'unknown pump mode, should be one of: {_quoted(*_PUMP_MODES)}')
111
112        self._post([_CMD_WRITE_PUMP_MODE, _PUMP_MODES.index(pump_mode)], read_length=5)
113        self.device.release()
114
115    def get_status(self, **kwargs):
116        """Get a status report.
117
118        Returns a list of `(property, value, unit)` tuples.
119        """
120
121        msg = self._post([_CMD_READ_AIO_TEMP], read_length=6)
122        aio_temp = msg[3] + msg[4]/10
123
124        speeds = self._get_fan_speeds()
125
126        msg = self._post([_CMD_READ_PUMP_MODE], read_length=4)
127        pump_mode = _PUMP_MODES[msg[3]]
128
129        msg = self._post([_CMD_READ_PUMP_SPEED], read_length=5)
130        pump_speed = (msg[3] << 8) + msg[4]
131
132        msg = self._post([_CMD_READ_FIRMWARE], read_length=7)
133        firmware = '{}.{}.{}.{}'.format(*tuple(msg[3:7]))
134
135        self.device.release()
136
137        status = [('Liquid temperature', aio_temp, '°C')]
138
139        for i, speed in enumerate(speeds):
140            if speed is not None:
141                status.append((f'Fan {i + 1} speed', speed, 'rpm'))
142
143        return status + [
144            ('Pump mode', pump_mode, ""),
145            ('Pump speed', pump_speed, 'rpm'),
146            ('Firmware version', firmware, '')
147        ]
148
149    def _get_fan_speeds(self):
150        """Read the RPM speed of the fans."""
151
152        speeds = []
153
154        for i in range(self._fan_count):
155            msg = self._post([_CMD_READ_FAN_SPEED, i], read_length=6)
156
157            if msg[0] != 0x41 or msg[1] != 0x12 or msg[2] != 0x34 or msg[3] != i:
158                _LOGGER.warning('failed to get current speed of fan %d', i)
159                speeds.append(None)
160                continue
161
162            speeds.append((msg[4] << 8) + msg[5])
163        return speeds
164
165    def set_color(self, channel, mode, colors, speed='normal', **kwargs):
166        """Set the color mode for a specific channel."""
167
168        mode = mode.lower()
169        speed = speed.lower()
170        colors = list(colors)
171
172        if mode not in _COLOR_CHANGE_MODES:
173            valid = _quoted(*_COLOR_CHANGE_MODES.keys())
174            raise ValueError(f'unknown lighting mode, should be one of: {valid}')
175
176        if speed not in _COLOR_SPEEDS:
177            valid = _quoted(*_COLOR_SPEEDS)
178            raise ValueError(f'unknown speed value, should be one of {valid}')
179
180        if mode == 'alert':
181            # FIXME this mode is far from being completely implemented; for
182            # one, the temperatures are hardcoded; additionally, it may also be
183            # possible to combine it with other modes, but exploring that would
184            # require some experimentation
185            temps = (30, 40, 50)
186            self._post([0x5f, temps[0], 0x00, temps[1], 0x00, temps[1], 0x00]
187                       + colors[0] + colors[1] + colors[2], read_length=6)
188            self._post([0x5e, 0x01], read_length=3)
189            self.device.release()
190            return
191
192        colors = self._check_color_count_bounds(colors, mode)
193
194        if mode == 'fixed':
195            colors = [colors[0], colors[0]]
196
197        set_colors = list(itertools.chain(*colors))
198        self._post([_CMD_WRITE_COLOR_LIST, len(colors)] + set_colors, read_length=3)
199
200        if mode != 'fixed':
201            magic_value = _COLOR_SPEEDS_VALUES[mode][_COLOR_SPEEDS.index(speed)]
202            self._post([_CMD_WRITE_COLOR_SPEED, magic_value], read_length=3)
203
204        self._post(_COLOR_CHANGE_MODES[mode], read_length=3)
205        self.device.release()
206
207    def _check_color_count_bounds(self, color_list, mode_name):
208        requires, maximum = _COLOR_COUNT_BOUNDS[mode_name]
209
210        if len(color_list) < requires:
211            raise ValueError(f'{mode_name} mode requires {requires} colors')
212
213        if len(color_list) > maximum:
214            _LOGGER.debug('too many colors, dropping to %d', maximum)
215            color_list = color_list[:maximum]
216
217        return color_list
218
219    def set_speed_profile(self, channel, profile, **kwargs):
220        """Set channel to follow a speed duty profile."""
221
222        channel = channel.lower()
223        fan_indexes = self._fan_indexes(channel)
224
225        adjusted = self._prepare_profile(profile, 0, 100, _MAX_PROFILE_POINTS)
226        for temp, duty in adjusted:
227            _LOGGER.info('setting %s PWM point: (%i°C, %i%%), device interpolated',
228                        channel, temp, duty)
229
230        temps, duties = map(list, zip(*adjusted))
231        for i in fan_indexes:
232            self._post([_CMD_WRITE_FAN_CURVE, i] + temps + duties, read_length=32)
233
234        self.device.release()
235
236    def set_fixed_speed(self, channel, duty, **kwargs):
237        """Set channel to a fixed speed duty."""
238
239        channel = channel.lower()
240        duty = clamp(duty, 0, 100)
241
242        for i in self._fan_indexes(channel):
243            _LOGGER.info('setting speed for fan %d to %d', i + 1, duty)
244            self._post([_CMD_WRITE_FAN_SPEED, i, duty], read_length=32)
245        self.device.release()
246
247    def _fan_indexes(self, channel):
248        if channel.startswith('fan'):
249            if len(channel) > 3:
250                channel_num = int(channel[3:]) - 1
251                if channel_num >= self._fan_count:
252                    raise ValueError(f'unknown channel: {channel}')
253                return [channel_num]
254            return range(self._fan_count)
255        elif channel == 'pump':
256            raise NotSupportedByDevice()
257        else:
258            raise ValueError(f'unknown channel: {channel}')
259
260    @classmethod
261    def probe(cls, handle, **kwargs):
262        return super().probe(handle, **kwargs)
263