1"""liquidctl drivers for sixth generation Asetek liquid coolers. 2 3Copyright (C) 2020–2021 Andrew Robertson, Jonas Malaco and contributors 4 5SPDX-License-Identifier: GPL-3.0-or-later 6""" 7 8import itertools 9import logging 10 11from liquidctl.driver.asetek import _Base690Lc 12from liquidctl.error import NotSupportedByDevice 13from liquidctl.util import clamp 14 15_LOGGER = logging.getLogger(__name__) 16 17_READ_ENDPOINT = 0x81 18_READ_MAX_LENGTH = 32 19_READ_TIMEOUT = 2000 20_WRITE_ENDPOINT = 0x1 21_WRITE_TIMEOUT = 2000 22 23_MAX_PROFILE_POINTS = 7 24 25_CMD_READ_AIO_TEMP = 0xa9 26_CMD_READ_FAN_SPEED = 0x41 27_CMD_READ_FIRMWARE = 0xaa 28_CMD_READ_PUMP_MODE = 0x33 29_CMD_READ_PUMP_SPEED = 0x31 30_CMD_WRITE_COLOR_LIST = 0x56 31_CMD_WRITE_COLOR_SPEED = 0x53 32_CMD_WRITE_FAN_CURVE = 0x40 33_CMD_WRITE_FAN_SPEED = 0x42 34_CMD_WRITE_PUMP_MODE = 0x32 35 36_PUMP_MODES = ['quiet', 'balanced', 'performance'] 37 38_COLOR_SPEEDS = ['slower', 'normal', 'faster'] 39 40_COLOR_SPEEDS_VALUES = { 41 'shift': [ 42 0x46, # slower 43 0x28, # normal 44 0x0F # faster 45 ], 46 'pulse':[ 47 0x50, # slower 48 0x37, # normal 49 0x1E # faster 50 ], 51 'blinking': [ 52 0x0F, # slower 53 0x0A, # normal 54 0x05 # faster 55 ], 56} 57 58_COLOR_CHANGE_MODES = { 59 'alert': [], 60 'shift': [0x55, 0x01], 61 'pulse': [0x52, 0x01], 62 'blinking': [0x58, 0x01], 63 'fixed': [0x55, 0x01], 64} 65 66# FIXME unknown required and maximum values 67_COLOR_COUNT_BOUNDS = { 68 'alert': (3, 3), 69 'shift': (2, 4), 70 'pulse': (1, 4), 71 'blinking': (1, 4), 72 'fixed': (1, 1), 73} 74 75 76def _quoted(*names): 77 return ', '.join(map(repr, names)) 78 79 80# we inherit from _Base690Lc to reuse its implementation of connect 81# and disconnect, that emulates the stock SiUSBXp driver on Windows 82class CorsairAsetekProDriver(_Base690Lc): 83 """liquidctl driver for Corsair-branded sixth generation Asetek coolers.""" 84 85 SUPPORTED_DEVICES = [ 86 (0x1b1c, 0x0c12, None, 'Corsair Hydro H150i Pro (experimental)', {'fan_count': 3}), 87 (0x1b1c, 0x0c13, None, 'Corsair Hydro H115i Pro (experimental)', {'fan_count': 2}), 88 (0x1b1c, 0x0c15, None, 'Corsair Hydro H100i Pro (experimental)', {'fan_count': 2}) 89 ] 90 91 def __init__(self, device, description, fan_count, **kwargs): 92 super().__init__(device, description, **kwargs) 93 self._fan_count = fan_count 94 self._data = None 95 96 def _post(self, data, *, read_length=None): 97 """Write `data` and return response of up to `read_length` bytes.""" 98 99 assert read_length is not None and read_length <= _READ_MAX_LENGTH 100 101 self.device.write(_WRITE_ENDPOINT, data, _WRITE_TIMEOUT) 102 return self.device.read(_READ_ENDPOINT, read_length, _READ_TIMEOUT)[0:read_length] 103 104 def initialize(self, pump_mode='balanced', **kwargs): 105 """Initialize the device.""" 106 107 pump_mode = pump_mode.lower() 108 109 if pump_mode not in _PUMP_MODES: 110 raise ValueError(f'unknown pump mode, should be one of: {_quoted(*_PUMP_MODES)}') 111 112 self._post([_CMD_WRITE_PUMP_MODE, _PUMP_MODES.index(pump_mode)], read_length=5) 113 self.device.release() 114 115 def get_status(self, **kwargs): 116 """Get a status report. 117 118 Returns a list of `(property, value, unit)` tuples. 119 """ 120 121 msg = self._post([_CMD_READ_AIO_TEMP], read_length=6) 122 aio_temp = msg[3] + msg[4]/10 123 124 speeds = self._get_fan_speeds() 125 126 msg = self._post([_CMD_READ_PUMP_MODE], read_length=4) 127 pump_mode = _PUMP_MODES[msg[3]] 128 129 msg = self._post([_CMD_READ_PUMP_SPEED], read_length=5) 130 pump_speed = (msg[3] << 8) + msg[4] 131 132 msg = self._post([_CMD_READ_FIRMWARE], read_length=7) 133 firmware = '{}.{}.{}.{}'.format(*tuple(msg[3:7])) 134 135 self.device.release() 136 137 status = [('Liquid temperature', aio_temp, '°C')] 138 139 for i, speed in enumerate(speeds): 140 if speed is not None: 141 status.append((f'Fan {i + 1} speed', speed, 'rpm')) 142 143 return status + [ 144 ('Pump mode', pump_mode, ""), 145 ('Pump speed', pump_speed, 'rpm'), 146 ('Firmware version', firmware, '') 147 ] 148 149 def _get_fan_speeds(self): 150 """Read the RPM speed of the fans.""" 151 152 speeds = [] 153 154 for i in range(self._fan_count): 155 msg = self._post([_CMD_READ_FAN_SPEED, i], read_length=6) 156 157 if msg[0] != 0x41 or msg[1] != 0x12 or msg[2] != 0x34 or msg[3] != i: 158 _LOGGER.warning('failed to get current speed of fan %d', i) 159 speeds.append(None) 160 continue 161 162 speeds.append((msg[4] << 8) + msg[5]) 163 return speeds 164 165 def set_color(self, channel, mode, colors, speed='normal', **kwargs): 166 """Set the color mode for a specific channel.""" 167 168 mode = mode.lower() 169 speed = speed.lower() 170 colors = list(colors) 171 172 if mode not in _COLOR_CHANGE_MODES: 173 valid = _quoted(*_COLOR_CHANGE_MODES.keys()) 174 raise ValueError(f'unknown lighting mode, should be one of: {valid}') 175 176 if speed not in _COLOR_SPEEDS: 177 valid = _quoted(*_COLOR_SPEEDS) 178 raise ValueError(f'unknown speed value, should be one of {valid}') 179 180 if mode == 'alert': 181 # FIXME this mode is far from being completely implemented; for 182 # one, the temperatures are hardcoded; additionally, it may also be 183 # possible to combine it with other modes, but exploring that would 184 # require some experimentation 185 temps = (30, 40, 50) 186 self._post([0x5f, temps[0], 0x00, temps[1], 0x00, temps[1], 0x00] 187 + colors[0] + colors[1] + colors[2], read_length=6) 188 self._post([0x5e, 0x01], read_length=3) 189 self.device.release() 190 return 191 192 colors = self._check_color_count_bounds(colors, mode) 193 194 if mode == 'fixed': 195 colors = [colors[0], colors[0]] 196 197 set_colors = list(itertools.chain(*colors)) 198 self._post([_CMD_WRITE_COLOR_LIST, len(colors)] + set_colors, read_length=3) 199 200 if mode != 'fixed': 201 magic_value = _COLOR_SPEEDS_VALUES[mode][_COLOR_SPEEDS.index(speed)] 202 self._post([_CMD_WRITE_COLOR_SPEED, magic_value], read_length=3) 203 204 self._post(_COLOR_CHANGE_MODES[mode], read_length=3) 205 self.device.release() 206 207 def _check_color_count_bounds(self, color_list, mode_name): 208 requires, maximum = _COLOR_COUNT_BOUNDS[mode_name] 209 210 if len(color_list) < requires: 211 raise ValueError(f'{mode_name} mode requires {requires} colors') 212 213 if len(color_list) > maximum: 214 _LOGGER.debug('too many colors, dropping to %d', maximum) 215 color_list = color_list[:maximum] 216 217 return color_list 218 219 def set_speed_profile(self, channel, profile, **kwargs): 220 """Set channel to follow a speed duty profile.""" 221 222 channel = channel.lower() 223 fan_indexes = self._fan_indexes(channel) 224 225 adjusted = self._prepare_profile(profile, 0, 100, _MAX_PROFILE_POINTS) 226 for temp, duty in adjusted: 227 _LOGGER.info('setting %s PWM point: (%i°C, %i%%), device interpolated', 228 channel, temp, duty) 229 230 temps, duties = map(list, zip(*adjusted)) 231 for i in fan_indexes: 232 self._post([_CMD_WRITE_FAN_CURVE, i] + temps + duties, read_length=32) 233 234 self.device.release() 235 236 def set_fixed_speed(self, channel, duty, **kwargs): 237 """Set channel to a fixed speed duty.""" 238 239 channel = channel.lower() 240 duty = clamp(duty, 0, 100) 241 242 for i in self._fan_indexes(channel): 243 _LOGGER.info('setting speed for fan %d to %d', i + 1, duty) 244 self._post([_CMD_WRITE_FAN_SPEED, i, duty], read_length=32) 245 self.device.release() 246 247 def _fan_indexes(self, channel): 248 if channel.startswith('fan'): 249 if len(channel) > 3: 250 channel_num = int(channel[3:]) - 1 251 if channel_num >= self._fan_count: 252 raise ValueError(f'unknown channel: {channel}') 253 return [channel_num] 254 return range(self._fan_count) 255 elif channel == 'pump': 256 raise NotSupportedByDevice() 257 else: 258 raise ValueError(f'unknown channel: {channel}') 259 260 @classmethod 261 def probe(cls, handle, **kwargs): 262 return super().probe(handle, **kwargs) 263