1# Copyright (c) 2020 Yubico AB
2# All rights reserved.
3#
4#   Redistribution and use in source and binary forms, with or
5#   without modification, are permitted provided that the following
6#   conditions are met:
7#
8#    1. Redistributions of source code must retain the above copyright
9#       notice, this list of conditions and the following disclaimer.
10#    2. Redistributions in binary form must reproduce the above
11#       copyright notice, this list of conditions and the following
12#       disclaimer in the documentation and/or other materials provided
13#       with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
17# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
18# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
19# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
20# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
21# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
23# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
25# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
27
28from . import Connection, CommandError, TimeoutError, Version
29
30from time import sleep
31from threading import Event
32from typing import Optional, Callable
33import abc
34import struct
35import logging
36
37logger = logging.getLogger(__name__)
38
39
40class CommandRejectedError(CommandError):
41    """The issues command was rejected by the YubiKey"""
42
43
44class OtpConnection(Connection, metaclass=abc.ABCMeta):
45    @abc.abstractmethod
46    def receive(self) -> bytes:
47        """Reads an 8 byte feature report"""
48
49    @abc.abstractmethod
50    def send(self, data: bytes) -> None:
51        """Writes an 8 byte feature report"""
52
53
54CRC_OK_RESIDUAL = 0xF0B8
55
56
57def calculate_crc(data: bytes) -> int:
58    crc = 0xFFFF
59    for index in range(len(data)):
60        crc ^= data[index]
61        for i in range(8):
62            j = crc & 1
63            crc >>= 1
64            if j == 1:
65                crc ^= 0x8408
66    return crc & 0xFFFF
67
68
69def check_crc(data: bytes) -> bool:
70    return calculate_crc(data) == CRC_OK_RESIDUAL
71
72
73_MODHEX = "cbdefghijklnrtuv"
74
75
76def modhex_encode(data: bytes) -> str:
77    """Encode a bytes-like object using Modhex (modified hexadecimal) encoding."""
78    return "".join(_MODHEX[b >> 4] + _MODHEX[b & 0xF] for b in data)
79
80
81def modhex_decode(string: str) -> bytes:
82    """Decode the Modhex (modified hexadecimal) string."""
83    return bytes(
84        _MODHEX.index(string[i]) << 4 | _MODHEX.index(string[i + 1])
85        for i in range(0, len(string), 2)
86    )
87
88
89FEATURE_RPT_SIZE = 8
90FEATURE_RPT_DATA_SIZE = FEATURE_RPT_SIZE - 1
91
92SLOT_DATA_SIZE = 64
93FRAME_SIZE = SLOT_DATA_SIZE + 6
94
95RESP_PENDING_FLAG = 0x40  # Response pending flag
96SLOT_WRITE_FLAG = 0x80  # Write flag - set by app - cleared by device
97RESP_TIMEOUT_WAIT_FLAG = 0x20  # Waiting for timeout operation
98DUMMY_REPORT_WRITE = 0x8F  # Write a dummy report to force update or abort
99
100SEQUENCE_MASK = 0x1F
101
102STATUS_OFFSET_PROG_SEQ = 0x4
103STATUS_OFFSET_TOUCH_LOW = 0x5
104CONFIG_STATUS_MASK = 0x1F
105
106STATUS_PROCESSING = 1
107STATUS_UPNEEDED = 2
108
109
110def _should_send(packet, seq):
111    """All-zero packets are skipped, except for the very first and last packets"""
112    return seq in (0, 9) or any(packet)
113
114
115def _format_frame(slot, payload):
116    return payload + struct.pack("<BH", slot, calculate_crc(payload)) + b"\0\0\0"
117
118
119class OtpProtocol:
120    def __init__(self, otp_connection: OtpConnection):
121        self.connection = otp_connection
122        report = self._receive()
123        self.version = Version.from_bytes(report[1:4])
124        if self.version[0] == 3:  # NEO, may have cached pgmSeq in arbitrator
125            try:  # Force communication with applet to refresh pgmSeq
126                # Write an invalid scan map, does nothing
127                self.send_and_receive(0x12, b"c" * 51)
128            except CommandRejectedError:
129                pass  # This is expected
130
131    def close(self) -> None:
132        self.connection.close()
133
134    def send_and_receive(
135        self,
136        slot: int,
137        data: Optional[bytes] = None,
138        event: Optional[Event] = None,
139        on_keepalive: Optional[Callable[[int], None]] = None,
140    ) -> bytes:
141        """Sends a command to the YubiKey, and reads the response.
142
143        If the command results in a configuration update, the programming sequence
144        number is verified and the updated status bytes are returned.
145
146        @param slot  the slot to send to
147        @param data  the data payload to send
148        @param state optional CommandState for listening for user presence requirement
149            and for cancelling a command.
150        @return response data (including CRC) in the case of data, or an updated status
151            struct
152        """
153        payload = (data or b"").ljust(SLOT_DATA_SIZE, b"\0")
154        if len(payload) > SLOT_DATA_SIZE:
155            raise ValueError("Payload too large for HID frame")
156        if not on_keepalive:
157            on_keepalive = lambda x: None  # noqa
158        frame = _format_frame(slot, payload)
159
160        logger.debug("SEND: %s", frame.hex())
161        response = self._read_frame(
162            self._send_frame(frame), event or Event(), on_keepalive
163        )
164        logger.debug("RECV: %s", response.hex())
165        return response
166
167    def _receive(self):
168        report = self.connection.receive()
169        if len(report) != FEATURE_RPT_SIZE:
170            raise Exception(
171                f"Incorrect reature report size (was {len(report)}, "
172                f"expected {FEATURE_RPT_SIZE})"
173            )
174        return report
175
176    def read_status(self) -> bytes:
177        """Receive status bytes from YubiKey
178
179        @return status bytes (first 3 bytes are the firmware version)
180        @throws IOException in case of communication error
181        """
182        return self._receive()[1:-1]
183
184    def _await_ready_to_write(self):
185        """Sleep for up to ~1s waiting for the WRITE flag to be unset"""
186        for _ in range(20):
187            if (self._receive()[FEATURE_RPT_DATA_SIZE] & SLOT_WRITE_FLAG) == 0:
188                return
189            sleep(0.05)
190        raise Exception("Timeout waiting for YubiKey to become ready to receive")
191
192    def _send_frame(self, buf):
193        """Sends a 70 byte frame"""
194        prog_seq = self._receive()[STATUS_OFFSET_PROG_SEQ]
195        seq = 0
196        while buf:
197            report, buf = buf[:FEATURE_RPT_DATA_SIZE], buf[FEATURE_RPT_DATA_SIZE:]
198            if _should_send(report, seq):
199                report += struct.pack(">B", 0x80 | seq)
200                self._await_ready_to_write()
201                self.connection.send(report)
202            seq += 1
203
204        return prog_seq
205
206    def _read_frame(self, prog_seq, event, on_keepalive):
207        """Reads one frame"""
208        response = b""
209        seq = 0
210        needs_touch = False
211
212        try:
213            while True:
214                report = self._receive()
215                status_byte = report[FEATURE_RPT_DATA_SIZE]
216                if (status_byte & RESP_PENDING_FLAG) != 0:  # Response packet
217                    if seq == (status_byte & SEQUENCE_MASK):
218                        # Correct sequence
219                        response += report[:FEATURE_RPT_DATA_SIZE]
220                        seq += 1
221                    elif 0 == (status_byte & SEQUENCE_MASK):
222                        # Transmission complete
223                        self._reset_state()
224                        return response
225                elif status_byte == 0:  # Status response
226                    next_prog_seq = report[STATUS_OFFSET_PROG_SEQ]
227                    if response:
228                        raise Exception("Incomplete transfer")
229                    elif next_prog_seq == prog_seq + 1 or (
230                        prog_seq > 0
231                        and next_prog_seq == 0
232                        and report[STATUS_OFFSET_TOUCH_LOW] & CONFIG_STATUS_MASK == 0
233                    ):  # Note: If no valid configurations exist, prog_seq resets to 0.
234                        # Sequence updated, return status.
235                        return report[1:-1]
236                    elif needs_touch:
237                        raise TimeoutError("Timed out waiting for touch")
238                    else:
239                        raise CommandRejectedError("No data")
240                else:  # Need to wait
241                    if (status_byte & RESP_TIMEOUT_WAIT_FLAG) != 0:
242                        on_keepalive(STATUS_UPNEEDED)
243                        needs_touch = True
244                        timeout = 0.1
245                    else:
246                        on_keepalive(STATUS_PROCESSING)
247                        timeout = 0.02
248                    sleep(timeout)
249                    if event.wait(timeout):
250                        self._reset_state()
251                        raise TimeoutError("Command cancelled by Event")
252        except KeyboardInterrupt:
253            logger.debug("Keyboard interrupt, reset state...")
254            self._reset_state()
255            raise
256
257    def _reset_state(self):
258        """Reset the state of YubiKey from reading"""
259        self.connection.send(b"\xff".rjust(FEATURE_RPT_SIZE, b"\0"))
260