1"""liquidctl drivers for fifth generation Asetek 690LC liquid coolers.
2
3Supported devices:
4
5- EVGA CLC (120 CL12, 240, 280 or 360); modern generic Asetek 690LC
6- NZXT Kraken X (X31, X41 or X61); legacy generic Asetek 690LC
7- NZXT Kraken X (X40 or X60); legacy generic Asetek 690LC
8- Corsair Hydro H80i GT, H100i GTX or H110i GTX
9- Corsair Hydro H80i v2, H100i v2 or H115i
10
11Copyright (C) 2018–2021  Jonas Malaco and contributors
12
13Incorporates or uses as reference work by Kristóf Jakab, Sean Nelson
14and Chris Griffith.
15
16SPDX-License-Identifier: GPL-3.0-or-later
17"""
18
19import logging
20
21import usb
22
23from liquidctl.driver.usb import UsbDriver
24from liquidctl.error import NotSupportedByDevice
25from liquidctl.keyval import RuntimeStorage
26from liquidctl.util import clamp
27
28_LOGGER = logging.getLogger(__name__)
29
30_CMD_RUNTIME = 0x10
31_CMD_PROFILE = 0x11
32_CMD_OVERRIDE = 0x12
33_CMD_PUMP_PWM = 0x13
34_CMD_LUID = 0x14
35_CMD_READ_ONLY_RUNTIME = 0x20
36_CMD_STORE_SETTINGS = 0x21
37_CMD_EXTERNAL_TEMPERATURE = 0x22
38
39_FIXED_SPEED_CHANNELS = {    # (message type, minimum duty, maximum duty)
40    'pump':  (_CMD_PUMP_PWM, 50, 100),  # min/max must correspond to _MIN/MAX_PUMP_SPEED_CODE
41}
42_VARIABLE_SPEED_CHANNELS = {  # (message type, minimum duty, maximum duty)
43    'fan':   (_CMD_PROFILE, 0, 100)
44}
45_MAX_PROFILE_POINTS = 6
46_CRITICAL_TEMPERATURE = 60
47_HIGH_TEMPERATURE = 45
48_MIN_PUMP_SPEED_CODE = 0x32
49_MAX_PUMP_SPEED_CODE = 0x42
50_READ_ENDPOINT = 0x82
51_READ_LENGTH = 32
52_READ_TIMEOUT = 2000
53_WRITE_ENDPOINT = 0x2
54_WRITE_TIMEOUT = 2000
55
56_LEGACY_FIXED_SPEED_CHANNELS = {    # (message type, minimum duty, maximum duty)
57    'fan':  (_CMD_OVERRIDE, 0, 100),
58    'pump':  (_CMD_PUMP_PWM, 50, 100),
59}
60
61# USBXpress specific control parameters; from the USBXpress SDK
62# (Customization/CP21xx_Customization/AN721SW_Linux/silabs_usb.h)
63_USBXPRESS_REQUEST = 0x02
64_USBXPRESS_FLUSH_BUFFERS = 0x01
65_USBXPRESS_CLEAR_TO_SEND = 0x02
66_USBXPRESS_NOT_CLEAR_TO_SEND = 0x04
67_USBXPRESS_GET_PART_NUM = 0x08
68
69# Unknown control parameters; from Craig's libSiUSBXp and OpenCorsairLink
70_UNKNOWN_OPEN_REQUEST = 0x00
71_UNKNOWN_OPEN_VALUE = 0xffff
72
73# Control request type
74_USBXPRESS = usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE
75
76
77class _Base690Lc(UsbDriver):
78    """Common methods for Asetek 690LC devices."""
79
80    _LEGACY_690LC = False
81
82    @classmethod
83    def probe(cls, handle, legacy_690lc=False, **kwargs):
84        """Probe `handle` and yield corresponding driver instances."""
85        if legacy_690lc != cls._LEGACY_690LC:
86            return
87        yield from super().probe(handle, **kwargs)
88
89    @classmethod
90    def find_supported_devices(cls, **kwargs):
91        """Find devices specifically compatible with this driver.
92
93        Automatically sets the appropriate value for `legacy_690lc`.
94        """
95
96        return super().find_supported_devices(legacy_690lc=cls._LEGACY_690LC, **kwargs)
97
98    def _configure_flow_control(self, clear_to_send):
99        """Set the software clear-to-send flow control policy for device."""
100        _LOGGER.debug('set clear to send = %s', clear_to_send)
101        if clear_to_send:
102            self.device.ctrl_transfer(_USBXPRESS, _USBXPRESS_REQUEST, _USBXPRESS_CLEAR_TO_SEND)
103        else:
104            self.device.ctrl_transfer(_USBXPRESS, _USBXPRESS_REQUEST, _USBXPRESS_NOT_CLEAR_TO_SEND)
105
106    def _begin_transaction(self):
107        """Begin a new transaction before writing to the device."""
108        _LOGGER.debug('begin transaction')
109        self.device.claim()
110        self.device.ctrl_transfer(_USBXPRESS, _USBXPRESS_REQUEST, _USBXPRESS_FLUSH_BUFFERS)
111
112    def _write(self, data):
113        self.device.write(_WRITE_ENDPOINT, data, _WRITE_TIMEOUT)
114
115    def _end_transaction_and_read(self):
116        """End the transaction by reading from the device.
117
118        According to the official documentation, as well as Craig's open-source
119        implementation (libSiUSBXp), it should be necessary to check the queue
120        size and read the data in chunks.  However, leviathan and its
121        derivatives seem to work fine without this complexity; we also
122        successfully follow this approach.
123        """
124
125        msg = self.device.read(_READ_ENDPOINT, _READ_LENGTH, _READ_TIMEOUT)
126        self.device.release()
127        return msg
128
129    def _configure_device(self, color1=[0, 0, 0], color2=[0, 0, 0], color3=[255, 0, 0],
130                          alert_temp=_HIGH_TEMPERATURE, interval1=0, interval2=0,
131                          blackout=False, fading=False, blinking=False, enable_alert=True):
132        self._write([0x10] + color1 + color2 + color3
133                    + [alert_temp, interval1, interval2, not blackout, fading,
134                       blinking, enable_alert, 0x00, 0x01])
135
136    def _prepare_profile(self, profile, min_duty, max_duty, max_points):
137        opt = list(profile)
138        size = len(opt)
139        if size < 1:
140            raise ValueError('at least one PWM point required')
141        elif size > _MAX_PROFILE_POINTS:
142            raise ValueError(f'too many PWM points ({size}), only {max_points} supported')
143        for i, (temp, duty) in enumerate(opt):
144            opt[i] = (temp, clamp(duty, min_duty, max_duty))
145        missing = max_points - size
146        if missing:
147            # Some issues were observed when padding with (0°C, 0%), though
148            # they were hard to reproduce.  So far it *seems* that in some
149            # instances the device will store the last "valid" profile index
150            # somewhere, and would need another call to initialize() to clear
151            # that up.  Padding with (CRIT, 100%) appears to avoid all issues,
152            # at least within the reasonable range of operating temperatures.
153            _LOGGER.info('filling missing %d PWM points with (60°C, 100%%)', missing)
154            opt = opt + [(_CRITICAL_TEMPERATURE, 100)]*missing
155        return opt
156
157    def connect(self, **kwargs):
158        """Connect to the device.
159
160        Enables the device to send data to the host.
161        """
162
163        ret = super().connect(**kwargs)
164        self._configure_flow_control(clear_to_send=True)
165        return ret
166
167    def initialize(self, **kwargs):
168        """Initialize the device."""
169        self._begin_transaction()
170        self._configure_device()
171        self._end_transaction_and_read()
172
173    def disconnect(self, **kwargs):
174        """Disconnect from the device.
175
176        Implementation note: unlike SI_Close is supposed to do,¹ do not send
177        _USBXPRESS_NOT_CLEAR_TO_SEND to the device.  This allows one program to
178        disconnect without stopping reads from another.
179
180        Surrounding device.read() with _USBXPRESS_[NOT_]CLEAR_TO_SEND would
181        make more sense, but there seems to be a yet unknown minimum delay
182        necessary for that to work reliably.
183
184        ¹ https://github.com/craigshelley/SiUSBXp/blob/master/SiUSBXp.c
185        """
186
187        super().disconnect(**kwargs)
188
189
190class _ModernBase690Lc(_Base690Lc):
191
192    def get_status(self, **kwargs):
193        """Get a status report.
194
195        Returns a list of `(property, value, unit)` tuples.
196        """
197
198        self._begin_transaction()
199        self._write([_CMD_LUID, 0, 0, 0])
200        msg = self._end_transaction_and_read()
201        firmware = '{}.{}.{}.{}'.format(*tuple(msg[0x17:0x1b]))
202        return [
203            ('Liquid temperature', msg[10] + msg[14]/10, '°C'),
204            ('Fan speed', msg[0] << 8 | msg[1], 'rpm'),
205            ('Pump speed', msg[8] << 8 | msg[9], 'rpm'),
206            ('Firmware version', firmware, '')
207        ]
208
209    def set_color(self, channel, mode, colors, time_per_color=1, time_off=None,
210                  alert_threshold=_HIGH_TEMPERATURE, alert_color=[255, 0, 0],
211                  speed=3, **kwargs):
212        """Set the color mode for a specific channel."""
213        # keyword arguments may have been forwarded from cli args and need parsing
214        colors = list(colors)
215        self._begin_transaction()
216        if mode == 'rainbow':
217            if isinstance(speed, str):
218                speed = int(speed)
219            self._write([0x23, clamp(speed, 1, 6)])
220            # make sure to clear blinking or... chaos
221            self._configure_device(alert_temp=clamp(alert_threshold, 0, 100), color3=alert_color)
222        elif mode == 'fading':
223            self._configure_device(fading=True, color1=colors[0], color2=colors[1],
224                                   interval1=clamp(time_per_color, 1, 255),
225                                   alert_temp=clamp(alert_threshold, 0, 100), color3=alert_color)
226            self._write([0x23, 0])
227        elif mode == 'blinking':
228            if time_off is None:
229                time_off = time_per_color
230            self._configure_device(blinking=True, color1=colors[0],
231                                   interval1=clamp(time_off, 1, 255),
232                                   interval2=clamp(time_per_color, 1, 255),
233                                   alert_temp=clamp(alert_threshold, 0, 100), color3=alert_color)
234            self._write([0x23, 0])
235        elif mode == 'fixed':
236            self._configure_device(color1=colors[0], alert_temp=clamp(alert_threshold, 0, 100),
237                                   color3=alert_color)
238            self._write([0x23, 0])
239        elif mode == 'blackout':  # stronger than just 'off', suppresses alerts and rainbow
240            self._configure_device(blackout=True, alert_temp=clamp(alert_threshold, 0, 100),
241                                   color3=alert_color)
242        else:
243            raise KeyError(f'unknown lighting mode {mode}')
244        self._end_transaction_and_read()
245
246    def set_speed_profile(self, channel, profile, **kwargs):
247        """Set channel to follow a speed duty profile."""
248        mtype, dmin, dmax = _VARIABLE_SPEED_CHANNELS[channel]
249        adjusted = self._prepare_profile(profile, dmin, dmax, _MAX_PROFILE_POINTS)
250        for temp, duty in adjusted:
251            _LOGGER.info('setting %s PWM point: (%d°C, %d%%), device interpolated',
252                         channel, temp, duty)
253        temps, duties = map(list, zip(*adjusted))
254        self._begin_transaction()
255        self._write([mtype, 0] + temps + duties)
256        self._end_transaction_and_read()
257
258    def set_fixed_speed(self, channel, duty, **kwargs):
259        """Set channel to a fixed speed duty."""
260        if channel == 'fan':
261            # While devices seem to recognize a specific channel for fixed fan
262            # speeds (mtype == 0x12), its use can later conflict with custom
263            # profiles.
264            # Note for a future self: the conflict can be cleared with
265            # *another* call to initialize(), i.e.  with another
266            # configuration command.
267            _LOGGER.info('using a flat profile to set %s to a fixed duty', channel)
268            self.set_speed_profile(channel, [(0, duty), (_CRITICAL_TEMPERATURE - 1, duty)])
269            return
270        mtype, dmin, dmax = _FIXED_SPEED_CHANNELS[channel]
271        duty = clamp(duty, dmin, dmax)
272        total_levels = _MAX_PUMP_SPEED_CODE - _MIN_PUMP_SPEED_CODE + 1
273        level = round((duty - dmin)/(dmax - dmin)*total_levels)
274        effective_duty = round(dmin + level*(dmax - dmin)/total_levels)
275        _LOGGER.info('setting %s PWM duty to %d%% (level %d)', channel, effective_duty, level)
276        self._begin_transaction()
277        self._write([mtype, _MIN_PUMP_SPEED_CODE + level])
278        self._end_transaction_and_read()
279
280
281class Modern690Lc(_ModernBase690Lc):
282    """Modern fifth generation Asetek 690LC cooler."""
283
284    SUPPORTED_DEVICES = [
285        (0x2433, 0xb200, None, 'Asetek 690LC (assuming EVGA CLC)', {}),
286    ]
287
288    def downgrade_to_legacy(self):
289        """Take the device handle and return a new Legacy690Lc instance for it.
290
291        This method returns a new instance that takes the device handle from
292        `self`.  Because of this, the caller should immediately discard `self`,
293        as it is no longer valid to call any of its methods or access any of
294        its properties.
295
296        While it is sometimes possible to downgrade a device that has seen
297        modern traffic since it booted, this will generally not work.
298        Additionally, no attempt to disconnect from the device is made while
299        downgrading the instance.
300
301        Thus, callers are strongly advised to only call this function before
302        connecting to the device from this instance and, in fact, before
303        calling any other methods at all on the device, from any instance.
304
305        Finally, this method is not yet considered stable and its signature
306        and/or behavior may change.  Callers should follow the development of
307        liquidctl and the stabilization of this API.
308        """
309        legacy = Legacy690Lc(self.device, self._description)
310        self.device = None
311        self._description = None
312        return legacy
313
314
315class Legacy690Lc(_Base690Lc):
316    """Legacy fifth generation Asetek 690LC cooler."""
317
318    SUPPORTED_DEVICES = [
319        (0x2433, 0xb200, None, 'Asetek 690LC (assuming NZXT Kraken X)', {}),
320    ]
321
322    _LEGACY_690LC = True
323
324    def __init__(self, device, description, **kwargs):
325        super().__init__(device, description, **kwargs)
326        # --device causes drivers to be instantiated even if they are later
327        # discarded; defer instantiating the data storage until to connect()
328        self._data = None
329
330    def connect(self, runtime_storage=None, **kwargs):
331        ret = super().connect(**kwargs)
332        if runtime_storage:
333            self._data = runtime_storage
334        else:
335            ids = f'vid{self.vendor_id:04x}_pid{self.product_id:04x}'
336            loc = f'bus{self.bus}_port{"_".join(map(str, self.port))}'
337            self._data = RuntimeStorage(key_prefixes=[ids, loc, 'legacy'])
338        return ret
339
340    def _set_all_fixed_speeds(self):
341        self._begin_transaction()
342        for channel in ['pump', 'fan']:
343            mtype, dmin, dmax = _LEGACY_FIXED_SPEED_CHANNELS[channel]
344            duty = clamp(self._data.load(f'{channel}_duty', of_type=int, default=dmax), dmin, dmax)
345            _LOGGER.info('setting %s duty to %d%%', channel, duty)
346            self._write([mtype, duty])
347        return self._end_transaction_and_read()
348
349    def initialize(self, **kwargs):
350        super().initialize(**kwargs)
351        self._data.store('pump_duty', None)
352        self._data.store('fan_duty', None)
353        self._set_all_fixed_speeds()
354
355    def get_status(self, **kwargs):
356        """Get a status report.
357
358        Returns a list of `(property, value, unit)` tuples.
359        """
360
361        msg = self._set_all_fixed_speeds()
362        firmware = '{}.{}.{}.{}'.format(*tuple(msg[0x17:0x1b]))
363        return [
364            ('Liquid temperature', msg[10] + msg[14]/10, '°C'),
365            ('Fan speed', msg[0] << 8 | msg[1], 'rpm'),
366            ('Pump speed', msg[8] << 8 | msg[9], 'rpm'),
367            ('Firmware version', firmware, '')
368        ]
369
370    def set_color(self, channel, mode, colors, time_per_color=None, time_off=None,
371                  alert_threshold=_HIGH_TEMPERATURE, alert_color=[255, 0, 0],
372                  **kwargs):
373        """Set the color mode for a specific channel."""
374        # keyword arguments may have been forwarded from cli args and need parsing
375        colors = list(colors)
376        self._begin_transaction()
377        if mode == 'fading':
378            if time_per_color is None:
379                time_per_color = 5
380            self._configure_device(fading=True, color1=colors[0], color2=colors[1],
381                                   interval1=clamp(time_per_color, 1, 255),
382                                   alert_temp=clamp(alert_threshold, 0, 100), color3=alert_color)
383        elif mode == 'blinking':
384            if time_per_color is None:
385                time_per_color = 1
386            if time_off is None:
387                time_off = time_per_color
388            self._configure_device(blinking=True, color1=colors[0],
389                                   interval1=clamp(time_off, 1, 255),
390                                   interval2=clamp(time_per_color, 1, 255),
391                                   alert_temp=clamp(alert_threshold, 0, 100), color3=alert_color)
392        elif mode == 'fixed':
393            self._configure_device(color1=colors[0], alert_temp=clamp(alert_threshold, 0, 100),
394                                   color3=alert_color)
395        elif mode == 'blackout':  # stronger than just 'off', suppresses alerts and rainbow
396            self._configure_device(blackout=True, alert_temp=clamp(alert_threshold, 0, 100),
397                                   color3=alert_color)
398        else:
399            raise KeyError(f'unsupported lighting mode {mode}')
400        self._end_transaction_and_read()
401
402    def set_fixed_speed(self, channel, duty, **kwargs):
403        """Set channel to a fixed speed duty."""
404        mtype, dmin, dmax = _LEGACY_FIXED_SPEED_CHANNELS[channel]
405        duty = clamp(duty, dmin, dmax)
406        self._data.store(f'{channel}_duty', duty)
407        self._set_all_fixed_speeds()
408
409    def set_speed_profile(self, channel, profile, **kwargs):
410        """Not supported by this device."""
411        raise NotSupportedByDevice
412
413
414class Hydro690Lc(_ModernBase690Lc):
415    """Corsair-branded fifth generation Asetek 690LC cooler."""
416
417    SUPPORTED_DEVICES = [
418        (0x1b1c, 0x0c02, None, 'Corsair Hydro H80i GT (experimental)', {}),
419        (0x1b1c, 0x0c03, None, 'Corsair Hydro H100i GTX (experimental)', {}),
420        (0x1b1c, 0x0c07, None, 'Corsair Hydro H110i GTX (experimental)', {}),
421        (0x1b1c, 0x0c08, None, 'Corsair Hydro H80i v2', {}),
422        (0x1b1c, 0x0c09, None, 'Corsair Hydro H100i v2', {}),
423        (0x1b1c, 0x0c0a, None, 'Corsair Hydro H115i', {}),
424    ]
425
426    def set_color(self, channel, mode, colors, **kwargs):
427        """Set the color mode for a specific channel."""
428        if mode == 'rainbow':
429            raise KeyError(f'unsupported lighting mode {mode}')
430        super().set_color(channel, mode, colors, **kwargs)
431
432
433# deprecated aliases
434AsetekDriver = Modern690Lc
435LegacyAsetekDriver = Legacy690Lc
436CorsairAsetekDriver = Hydro690Lc
437