1# Copyright (c) 2020 Yubico AB 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or 5# without modification, are permitted provided that the following 6# conditions are met: 7# 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above 11# copyright notice, this list of conditions and the following 12# disclaimer in the documentation and/or other materials provided 13# with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 16# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 17# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 18# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 19# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 20# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 21# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 22# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 23# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 25# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26# POSSIBILITY OF SUCH DAMAGE. 27 28from . import Connection, CommandError, TimeoutError, Version 29 30from time import sleep 31from threading import Event 32from typing import Optional, Callable 33import abc 34import struct 35import logging 36 37logger = logging.getLogger(__name__) 38 39 40class CommandRejectedError(CommandError): 41 """The issues command was rejected by the YubiKey""" 42 43 44class OtpConnection(Connection, metaclass=abc.ABCMeta): 45 @abc.abstractmethod 46 def receive(self) -> bytes: 47 """Reads an 8 byte feature report""" 48 49 @abc.abstractmethod 50 def send(self, data: bytes) -> None: 51 """Writes an 8 byte feature report""" 52 53 54CRC_OK_RESIDUAL = 0xF0B8 55 56 57def calculate_crc(data: bytes) -> int: 58 crc = 0xFFFF 59 for index in range(len(data)): 60 crc ^= data[index] 61 for i in range(8): 62 j = crc & 1 63 crc >>= 1 64 if j == 1: 65 crc ^= 0x8408 66 return crc & 0xFFFF 67 68 69def check_crc(data: bytes) -> bool: 70 return calculate_crc(data) == CRC_OK_RESIDUAL 71 72 73_MODHEX = "cbdefghijklnrtuv" 74 75 76def modhex_encode(data: bytes) -> str: 77 """Encode a bytes-like object using Modhex (modified hexadecimal) encoding.""" 78 return "".join(_MODHEX[b >> 4] + _MODHEX[b & 0xF] for b in data) 79 80 81def modhex_decode(string: str) -> bytes: 82 """Decode the Modhex (modified hexadecimal) string.""" 83 return bytes( 84 _MODHEX.index(string[i]) << 4 | _MODHEX.index(string[i + 1]) 85 for i in range(0, len(string), 2) 86 ) 87 88 89FEATURE_RPT_SIZE = 8 90FEATURE_RPT_DATA_SIZE = FEATURE_RPT_SIZE - 1 91 92SLOT_DATA_SIZE = 64 93FRAME_SIZE = SLOT_DATA_SIZE + 6 94 95RESP_PENDING_FLAG = 0x40 # Response pending flag 96SLOT_WRITE_FLAG = 0x80 # Write flag - set by app - cleared by device 97RESP_TIMEOUT_WAIT_FLAG = 0x20 # Waiting for timeout operation 98DUMMY_REPORT_WRITE = 0x8F # Write a dummy report to force update or abort 99 100SEQUENCE_MASK = 0x1F 101 102STATUS_OFFSET_PROG_SEQ = 0x4 103STATUS_OFFSET_TOUCH_LOW = 0x5 104CONFIG_STATUS_MASK = 0x1F 105 106STATUS_PROCESSING = 1 107STATUS_UPNEEDED = 2 108 109 110def _should_send(packet, seq): 111 """All-zero packets are skipped, except for the very first and last packets""" 112 return seq in (0, 9) or any(packet) 113 114 115def _format_frame(slot, payload): 116 return payload + struct.pack("<BH", slot, calculate_crc(payload)) + b"\0\0\0" 117 118 119class OtpProtocol: 120 def __init__(self, otp_connection: OtpConnection): 121 self.connection = otp_connection 122 report = self._receive() 123 self.version = Version.from_bytes(report[1:4]) 124 if self.version[0] == 3: # NEO, may have cached pgmSeq in arbitrator 125 try: # Force communication with applet to refresh pgmSeq 126 # Write an invalid scan map, does nothing 127 self.send_and_receive(0x12, b"c" * 51) 128 except CommandRejectedError: 129 pass # This is expected 130 131 def close(self) -> None: 132 self.connection.close() 133 134 def send_and_receive( 135 self, 136 slot: int, 137 data: Optional[bytes] = None, 138 event: Optional[Event] = None, 139 on_keepalive: Optional[Callable[[int], None]] = None, 140 ) -> bytes: 141 """Sends a command to the YubiKey, and reads the response. 142 143 If the command results in a configuration update, the programming sequence 144 number is verified and the updated status bytes are returned. 145 146 @param slot the slot to send to 147 @param data the data payload to send 148 @param state optional CommandState for listening for user presence requirement 149 and for cancelling a command. 150 @return response data (including CRC) in the case of data, or an updated status 151 struct 152 """ 153 payload = (data or b"").ljust(SLOT_DATA_SIZE, b"\0") 154 if len(payload) > SLOT_DATA_SIZE: 155 raise ValueError("Payload too large for HID frame") 156 if not on_keepalive: 157 on_keepalive = lambda x: None # noqa 158 frame = _format_frame(slot, payload) 159 160 logger.debug("SEND: %s", frame.hex()) 161 response = self._read_frame( 162 self._send_frame(frame), event or Event(), on_keepalive 163 ) 164 logger.debug("RECV: %s", response.hex()) 165 return response 166 167 def _receive(self): 168 report = self.connection.receive() 169 if len(report) != FEATURE_RPT_SIZE: 170 raise Exception( 171 f"Incorrect reature report size (was {len(report)}, " 172 f"expected {FEATURE_RPT_SIZE})" 173 ) 174 return report 175 176 def read_status(self) -> bytes: 177 """Receive status bytes from YubiKey 178 179 @return status bytes (first 3 bytes are the firmware version) 180 @throws IOException in case of communication error 181 """ 182 return self._receive()[1:-1] 183 184 def _await_ready_to_write(self): 185 """Sleep for up to ~1s waiting for the WRITE flag to be unset""" 186 for _ in range(20): 187 if (self._receive()[FEATURE_RPT_DATA_SIZE] & SLOT_WRITE_FLAG) == 0: 188 return 189 sleep(0.05) 190 raise Exception("Timeout waiting for YubiKey to become ready to receive") 191 192 def _send_frame(self, buf): 193 """Sends a 70 byte frame""" 194 prog_seq = self._receive()[STATUS_OFFSET_PROG_SEQ] 195 seq = 0 196 while buf: 197 report, buf = buf[:FEATURE_RPT_DATA_SIZE], buf[FEATURE_RPT_DATA_SIZE:] 198 if _should_send(report, seq): 199 report += struct.pack(">B", 0x80 | seq) 200 self._await_ready_to_write() 201 self.connection.send(report) 202 seq += 1 203 204 return prog_seq 205 206 def _read_frame(self, prog_seq, event, on_keepalive): 207 """Reads one frame""" 208 response = b"" 209 seq = 0 210 needs_touch = False 211 212 try: 213 while True: 214 report = self._receive() 215 status_byte = report[FEATURE_RPT_DATA_SIZE] 216 if (status_byte & RESP_PENDING_FLAG) != 0: # Response packet 217 if seq == (status_byte & SEQUENCE_MASK): 218 # Correct sequence 219 response += report[:FEATURE_RPT_DATA_SIZE] 220 seq += 1 221 elif 0 == (status_byte & SEQUENCE_MASK): 222 # Transmission complete 223 self._reset_state() 224 return response 225 elif status_byte == 0: # Status response 226 next_prog_seq = report[STATUS_OFFSET_PROG_SEQ] 227 if response: 228 raise Exception("Incomplete transfer") 229 elif next_prog_seq == prog_seq + 1 or ( 230 prog_seq > 0 231 and next_prog_seq == 0 232 and report[STATUS_OFFSET_TOUCH_LOW] & CONFIG_STATUS_MASK == 0 233 ): # Note: If no valid configurations exist, prog_seq resets to 0. 234 # Sequence updated, return status. 235 return report[1:-1] 236 elif needs_touch: 237 raise TimeoutError("Timed out waiting for touch") 238 else: 239 raise CommandRejectedError("No data") 240 else: # Need to wait 241 if (status_byte & RESP_TIMEOUT_WAIT_FLAG) != 0: 242 on_keepalive(STATUS_UPNEEDED) 243 needs_touch = True 244 timeout = 0.1 245 else: 246 on_keepalive(STATUS_PROCESSING) 247 timeout = 0.02 248 sleep(timeout) 249 if event.wait(timeout): 250 self._reset_state() 251 raise TimeoutError("Command cancelled by Event") 252 except KeyboardInterrupt: 253 logger.debug("Keyboard interrupt, reset state...") 254 self._reset_state() 255 raise 256 257 def _reset_state(self): 258 """Reset the state of YubiKey from reading""" 259 self.connection.send(b"\xff".rjust(FEATURE_RPT_SIZE, b"\0")) 260