1# This file is part of the TREZOR project. 2# 3# Copyright (C) 2012-2016 Marek Palatinus <slush@satoshilabs.com> 4# Copyright (C) 2012-2016 Pavol Rusnak <stick@satoshilabs.com> 5# Copyright (C) 2016 Jochen Hoenicke <hoenicke@gmail.com> 6# 7# This library is free software: you can redistribute it and/or modify 8# it under the terms of the GNU Lesser General Public License as published by 9# the Free Software Foundation, either version 3 of the License, or 10# (at your option) any later version. 11# 12# This library is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU Lesser General Public License for more details. 16# 17# You should have received a copy of the GNU Lesser General Public License 18# along with this library. If not, see <http://www.gnu.org/licenses/>. 19# 20# The script has been modified for KeepKey Device. 21 22from __future__ import print_function, absolute_import 23 24import os 25import sys 26import time 27import binascii 28import hashlib 29import unicodedata 30import json 31import getpass 32import copy 33 34from mnemonic import Mnemonic 35 36from . import tools 37from . import mapping 38from . import messages_pb2 as proto 39from . import messages_eos_pb2 as eos_proto 40from . import messages_nano_pb2 as nano_proto 41from . import messages_cosmos_pb2 as cosmos_proto 42from . import messages_ripple_pb2 as ripple_proto 43from . import messages_tendermint_pb2 as tendermint_proto 44from . import messages_thorchain_pb2 as thorchain_proto 45from . import types_pb2 as types 46from . import eos 47from . import nano 48from .debuglink import DebugLink 49 50 51# try: 52# from PIL import Image 53# SCREENSHOT = True 54# except: 55# SCREENSHOT = False 56 57SCREENSHOT = False 58 59DEFAULT_CURVE = 'secp256k1' 60 61# monkeypatching: text formatting of protobuf messages 62tools.monkeypatch_google_protobuf_text_format() 63 64def get_buttonrequest_value(code): 65 # Converts integer code to its string representation of ButtonRequestType 66 return [ k for k, v in types.ButtonRequestType.items() if v == code][0] 67 68def pprint(msg): 69 msg_class = msg.__class__.__name__ 70 msg_size = msg.ByteSize() 71 """ 72 msg_ser = msg.SerializeToString() 73 msg_id = mapping.get_type(msg) 74 msg_json = json.dumps(protobuf_json.pb2json(msg)) 75 """ 76 if isinstance(msg, proto.FirmwareUpload): 77 return "<%s> (%d bytes):\n" % (msg_class, msg_size) 78 else: 79 return "<%s> (%d bytes):\n%s" % (msg_class, msg_size, msg) 80 81def log(msg): 82 sys.stderr.write(msg + '\n') 83 sys.stderr.flush() 84 85def log_cr(msg): 86 sys.stdout.write('\r' + msg) 87 sys.stdout.flush() 88 89def format_mnemonic(word_pos, character_pos): 90 return "WORD %d: %s" % (word_pos, character_pos * '*') 91 92def getch(): 93 try: 94 import termios 95 except ImportError: 96 # Non-POSIX. Return msvcrt's (Windows') getch. 97 import msvcrt 98 return msvcrt.getch() 99 100 # POSIX system. Create and return a getch that manipulates the tty. 101 import sys, tty 102 def _getch(): 103 fd = sys.stdin.fileno() 104 old_settings = termios.tcgetattr(fd) 105 try: 106 tty.setraw(fd) 107 ch = sys.stdin.read(1) 108 finally: 109 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) 110 return ch 111 112 return _getch() 113 114class CallException(Exception): 115 def __init__(self, code, message): 116 super(CallException, self).__init__() 117 self.args = [code, message] 118 119class PinException(CallException): 120 pass 121 122class field(object): 123 # Decorator extracts single value from 124 # protobuf object. If the field is not 125 # present, raises an exception. 126 def __init__(self, field): 127 self.field = field 128 129 def __call__(self, f): 130 def wrapped_f(*args, **kwargs): 131 ret = f(*args, **kwargs) 132 ret.HasField(self.field) 133 return getattr(ret, self.field) 134 return wrapped_f 135 136class expect(object): 137 # Decorator checks if the method 138 # returned one of expected protobuf messages 139 # or raises an exception 140 def __init__(self, *expected): 141 self.expected = expected 142 143 def __call__(self, f): 144 def wrapped_f(*args, **kwargs): 145 ret = f(*args, **kwargs) 146 if not isinstance(ret, self.expected): 147 raise Exception("Got %s, expected %s" % (ret.__class__, self.expected)) 148 return ret 149 return wrapped_f 150 151def session(f): 152 # Decorator wraps a BaseClient method 153 # with session activation / deactivation 154 def wrapped_f(*args, **kwargs): 155 client = args[0] 156 try: 157 client.transport.session_begin() 158 return f(*args, **kwargs) 159 finally: 160 client.transport.session_end() 161 return wrapped_f 162 163def normalize_nfc(txt): 164 if sys.version_info[0] < 3: 165 if isinstance(txt, unicode): 166 return unicodedata.normalize('NFC', txt) 167 if isinstance(txt, str): 168 return unicodedata.normalize('NFC', txt.decode('utf-8')) 169 else: 170 if isinstance(txt, bytes): 171 return unicodedata.normalize('NFC', txt.decode('utf-8')) 172 if isinstance(txt, str): 173 return unicodedata.normalize('NFC', txt) 174 175 raise Exception('unicode/str or bytes/str expected') 176 177class BaseClient(object): 178 # Implements very basic layer of sending raw protobuf 179 # messages to device and getting its response back. 180 def __init__(self, transport, **kwargs): 181 self.transport = transport 182 self.verbose = False 183 super(BaseClient, self).__init__() # *args, **kwargs) 184 185 def cancel(self): 186 self.transport.write(proto.Cancel()) 187 188 @session 189 def call_raw(self, msg): 190 self.transport.write(msg) 191 return self.transport.read_blocking() 192 193 @session 194 def call_bridge(self, msg): 195 self.transport.bridgeWrite(msg) 196 return 197 198 @session 199 def call_bridge_read(self): 200 return self.transport.bridge_read_blocking() 201 202 @session 203 def call(self, msg): 204 resp = self.call_raw(msg) 205 handler_name = "callback_%s" % resp.__class__.__name__ 206 handler = getattr(self, handler_name, None) 207 208 if handler != None: 209 msg = handler(resp) 210 if msg == None: 211 raise Exception("Callback %s must return protobuf message, not None" % handler) 212 resp = self.call(msg) 213 214 return resp 215 216 def callback_Failure(self, msg): 217 if msg.code in (types.Failure_PinInvalid, 218 types.Failure_PinCancelled, types.Failure_PinExpected): 219 raise PinException(msg.code, msg.message) 220 221 raise CallException(msg.code, msg.message) 222 223 def close(self): 224 self.transport.close() 225 226class DebugWireMixin(object): 227 def call_raw(self, msg): 228 log("SENDING " + pprint(msg)) 229 resp = super(DebugWireMixin, self).call_raw(msg) 230 log("RECEIVED " + pprint(resp)) 231 return resp 232 233class TextUIMixin(object): 234 # This class demonstrates easy test-based UI 235 # integration between the device and wallet. 236 # You can implement similar functionality 237 # by implementing your own GuiMixin with 238 # graphical widgets for every type of these callbacks. 239 240 def __init__(self, *args, **kwargs): 241 super(TextUIMixin, self).__init__(*args, **kwargs) 242 self.character_request_first_pass = True 243 244 def callback_ButtonRequest(self, msg): 245 # log("Sending ButtonAck for %s " % get_buttonrequest_value(msg.code)) 246 return proto.ButtonAck() 247 248 def callback_RecoveryMatrix(self, msg): 249 if self.recovery_matrix_first_pass: 250 self.recovery_matrix_first_pass = False 251 log("Use the numeric keypad to describe positions. For the word list use only left and right keys. The layout is:") 252 log(" 7 8 9 7 | 9") 253 log(" 4 5 6 4 | 6") 254 log(" 1 2 3 1 | 3") 255 while True: 256 character = getch() 257 if character in ('\x03', '\x04'): 258 return proto.Cancel() 259 260 if character in ('\x08', '\x7f'): 261 return proto.WordAck(word='\x08') 262 263 # ignore middle column if only 6 keys requested. 264 if (msg.type == types.WordRequestType_Matrix6 and 265 character in ('2', '5', '8')): 266 continue 267 268 if (ord(character) >= ord('1') and ord(character) <= ord('9')): 269 return proto.WordAck(word=character) 270 271 def callback_PinMatrixRequest(self, msg): 272 if msg.type == 1: 273 desc = 'current PIN' 274 elif msg.type == 2: 275 desc = 'new PIN' 276 elif msg.type == 3: 277 desc = 'new PIN again' 278 else: 279 desc = 'PIN' 280 281 log("Use the numeric keypad to describe number positions. The layout is:") 282 log(" 7 8 9") 283 log(" 4 5 6") 284 log(" 1 2 3") 285 log("Please enter %s: " % desc) 286 pin = getpass.getpass('') 287 return proto.PinMatrixAck(pin=pin) 288 289 def callback_PassphraseRequest(self, msg): 290 log("Passphrase required: ") 291 passphrase = getpass.getpass('') 292 log("Confirm your Passphrase: ") 293 if passphrase == getpass.getpass(''): 294 passphrase = normalize_nfc(passphrase) 295 return proto.PassphraseAck(passphrase=passphrase) 296 else: 297 log("Passphrase did not match! ") 298 exit() 299 300 def callback_CharacterRequest(self, msg): 301 if self.character_request_first_pass: 302 self.character_request_first_pass = False 303 log("Use recovery cipher on device to input mnemonic. Words are autocompleted at 3 or 4 characters.") 304 log("(use spacebar to progress to next word after match, use backspace to correct bad character or word entries)") 305 306 # format mnemonic for console 307 formatted_console = format_mnemonic(msg.word_pos + 1, msg.character_pos) 308 309 # clear the runway before we display formatted mnemonic 310 log_cr(' ' * 14) 311 log_cr(formatted_console) 312 313 while True: 314 character = getch().lower() 315 316 # capture escape 317 if character in ('\x03', '\x04'): 318 return proto.Cancel() 319 320 character_ascii = ord(character) 321 322 if character_ascii >= 97 and character_ascii <= 122 \ 323 and msg.character_pos != 4: 324 # capture characters a-z 325 return proto.CharacterAck(character=character) 326 327 elif character_ascii == 32 and msg.word_pos < 23 \ 328 and msg.character_pos >= 3: 329 # capture spaces 330 return proto.CharacterAck(character=' ') 331 332 elif character_ascii == 8 or character_ascii == 127 \ 333 and (msg.word_pos > 0 or msg.character_pos > 0): 334 # capture backspaces 335 return proto.CharacterAck(delete=True) 336 337 elif character_ascii == 13 and msg.word_pos in (11, 17, 23): 338 # capture returns 339 log("") 340 return proto.CharacterAck(done=True) 341 342class DebugLinkMixin(object): 343 # This class implements automatic responses 344 # and other functionality for unit tests 345 # for various callbacks, created in order 346 # to automatically pass unit tests. 347 # 348 # This mixing should be used only for purposes 349 # of unit testing, because it will fail to work 350 # without special DebugLink interface provided 351 # by the device. 352 353 def __init__(self, *args, **kwargs): 354 super(DebugLinkMixin, self).__init__(*args, **kwargs) 355 self.debug = None 356 self.in_with_statement = 0 357 self.button_wait = 0 358 self.screenshot_id = 0 359 360 # Always press Yes and provide correct pin 361 self.setup_debuglink(True, True) 362 self.auto_button = True 363 364 # Do not expect any specific response from device 365 self.expected_responses = None 366 367 # Use blank passphrase 368 self.set_passphrase('') 369 370 def close(self): 371 super(DebugLinkMixin, self).close() 372 if self.debug: 373 self.debug.close() 374 375 def set_debuglink(self, debug_transport): 376 self.debug = DebugLink(debug_transport) 377 378 def set_buttonwait(self, secs): 379 self.button_wait = secs 380 381 def __enter__(self): 382 # For usage in with/expected_responses 383 self.in_with_statement += 1 384 return self 385 386 def __exit__(self, _type, value, traceback): 387 self.in_with_statement -= 1 388 389 if _type != None: 390 # Another exception raised 391 return False 392 393 # return isinstance(value, TypeError) 394 # Evaluate missed responses in 'with' statement 395 if self.expected_responses != None and len(self.expected_responses): 396 raise Exception("Some of expected responses didn't come from device: %s" % \ 397 [ pprint(x) for x in self.expected_responses ]) 398 399 # Cleanup 400 self.expected_responses = None 401 return False 402 403 def set_expected_responses(self, expected): 404 if not self.in_with_statement: 405 raise Exception("Must be called inside 'with' statement") 406 self.expected_responses = expected 407 408 def setup_debuglink(self, button, pin_correct): 409 self.button = button # True -> YES button, False -> NO button 410 self.pin_correct = pin_correct 411 412 def set_passphrase(self, passphrase): 413 self.passphrase = normalize_nfc(passphrase) 414 415 def set_mnemonic(self, mnemonic): 416 self.mnemonic = normalize_nfc(mnemonic).split(' ') 417 418 def call_raw(self, msg): 419 420 if SCREENSHOT and self.debug: 421 layout = self.debug.read_layout() 422 im = Image.new("RGB", (128, 64)) 423 pix = im.load() 424 for x in range(128): 425 for y in range(64): 426 rx, ry = 127 - x, 63 - y 427 if (ord(layout[rx + (ry / 8) * 128]) & (1 << (ry % 8))) > 0: 428 pix[x, y] = (255, 255, 255) 429 im.save('scr%05d.png' % self.screenshot_id) 430 self.screenshot_id += 1 431 432 resp = super(DebugLinkMixin, self).call_raw(msg) 433 self._check_request(resp) 434 return resp 435 436 def _check_request(self, msg): 437 if self.expected_responses != None: 438 try: 439 expected = self.expected_responses.pop(0) 440 except IndexError: 441 raise CallException(types.Failure_Other, 442 "Got %s, but no message has been expected" % pprint(msg)) 443 444 if msg.__class__ != expected.__class__: 445 raise CallException(types.Failure_Other, 446 "Expected %s, got %s" % (pprint(expected), pprint(msg))) 447 448 fields = expected.ListFields() # only filled (including extensions) 449 for field, value in fields: 450 if not msg.HasField(field.name) or getattr(msg, field.name) != value: 451 raise CallException(types.Failure_Other, 452 "Expected %s, got %s" % (pprint(expected), pprint(msg))) 453 454 def callback_ButtonRequest(self, msg): 455 if self.verbose: 456 log("ButtonRequest code: " + get_buttonrequest_value(msg.code)) 457 458 if self.auto_button: 459 if self.verbose: 460 log("Pressing button " + str(self.button)) 461 if self.button_wait: 462 if self.verbose: 463 log("Waiting %d seconds " % self.button_wait) 464 time.sleep(self.button_wait) 465 self.debug.press_button(self.button) 466 467 return proto.ButtonAck() 468 469 def callback_PinMatrixRequest(self, msg): 470 if self.pin_correct: 471 pin = self.debug.read_pin_encoded() 472 else: 473 pin = '444222' 474 return proto.PinMatrixAck(pin=pin) 475 476 def callback_PassphraseRequest(self, msg): 477 if self.verbose: 478 log("Provided passphrase: '%s'" % self.passphrase) 479 return proto.PassphraseAck(passphrase=self.passphrase) 480 481 482class ProtocolMixin(object): 483 PRIME_DERIVATION_FLAG = 0x80000000 484 VENDORS = ('keepkey.com',) 485 486 def __init__(self, *args, **kwargs): 487 super(ProtocolMixin, self).__init__(*args, **kwargs) 488 self.init_device() 489 self.tx_api = None 490 491 def set_tx_api(self, tx_api): 492 self.tx_api = tx_api 493 494 def get_tx_api(self): 495 return self.tx_api 496 497 def init_device(self): 498 self.features = expect(proto.Features)(self.call)(proto.Initialize()) 499 if str(self.features.vendor) not in self.VENDORS: 500 raise Exception("Unsupported device") 501 502 def _get_local_entropy(self): 503 return os.urandom(32) 504 505 def _convert_prime(self, n): 506 # Convert minus signs to uint32 with flag 507 return [ int(abs(x) | self.PRIME_DERIVATION_FLAG) if x < 0 else x for x in n ] 508 509 @staticmethod 510 def expand_path(n): 511 # Convert string of bip32 path to list of uint32 integers with prime flags 512 # 0/-1/1' -> [0, 0x80000001, 0x80000001] 513 if not n: 514 return [] 515 516 n = n.split('/') 517 518 # m/a/b/c => a/b/c 519 if n[0] == 'm': 520 n = n[1:] 521 522 # coin_name/a/b/c => 44'/SLIP44_constant'/a/b/c 523 # https://github.com/satoshilabs/slips/blob/master/slip-0044.md 524 coins = { 525 "Bitcoin": 0, 526 "Testnet": 1, 527 "Litecoin": 2, 528 "Dogecoin": 3, 529 "Dash": 5, 530 "Namecoin": 7, 531 "Bitsend": 91, 532 "Groestlcoin": 17, 533 "Zcash": 133, 534 "BitcoinCash": 145, 535 "Bitcore": 160, 536 "Megacoin": 217, 537 "Bitcloud": 218, 538 "Axe": 4242, 539 } 540 541 if n[0] in coins: 542 n = ["44'", "%d'" % coins[n[0]] ] + n[1:] 543 544 path = [] 545 for x in n: 546 prime = False 547 if x.endswith("'"): 548 x = x.replace('\'', '') 549 prime = True 550 if x.startswith('-'): 551 prime = True 552 553 x = abs(int(x)) 554 555 if prime: 556 x |= ProtocolMixin.PRIME_DERIVATION_FLAG 557 558 path.append(x) 559 560 return path 561 562 @expect(proto.PublicKey) 563 def get_public_node(self, n, ecdsa_curve_name=DEFAULT_CURVE, show_display=False, coin_name=None, script_type=types.SPENDADDRESS): 564 n = self._convert_prime(n) 565 if not ecdsa_curve_name: 566 ecdsa_curve_name=DEFAULT_CURVE 567 return self.call(proto.GetPublicKey(address_n=n, ecdsa_curve_name=ecdsa_curve_name, show_display=show_display, coin_name=coin_name, script_type=script_type)) 568 569 @field('address') 570 @expect(proto.Address) 571 def get_address(self, coin_name, n, show_display=False, multisig=None, script_type=types.SPENDADDRESS): 572 n = self._convert_prime(n) 573 if multisig: 574 return self.call(proto.GetAddress(address_n=n, coin_name=coin_name, show_display=show_display, multisig=multisig, script_type=script_type)) 575 else: 576 return self.call(proto.GetAddress(address_n=n, coin_name=coin_name, show_display=show_display, script_type=script_type)) 577 578 @field('address') 579 @expect(proto.EthereumAddress) 580 def ethereum_get_address(self, n, show_display=False, multisig=None): 581 n = self._convert_prime(n) 582 return self.call(proto.EthereumGetAddress(address_n=n, show_display=show_display)) 583 584 @session 585 def ethereum_sign_tx(self, n, nonce, gas_limit, value, gas_price=None, max_fee_per_gas=None, max_priority_fee_per_gas=None, to=None, to_n=None, address_type=None, exchange_type=None, data=None, chain_id=None): 586 from keepkeylib.tools import int_to_big_endian 587 588 if gas_price is None and max_fee_per_gas is None: 589 raise Exception("Either gas_price or max_fee_per_gas must be provided") 590 591 n = self._convert_prime(n) 592 if address_type == types.TRANSFER: #Ethereum transfer transaction 593 msg = proto.EthereumSignTx( 594 address_n=n, 595 nonce=int_to_big_endian(nonce), 596 gas_price=int_to_big_endian(gas_price) if gas_price else None, 597 gas_limit=int_to_big_endian(gas_limit), 598 max_fee_per_gas=int_to_big_endian(max_fee_per_gas) if max_fee_per_gas else None , 599 max_priority_fee_per_gas=int_to_big_endian(max_priority_fee_per_gas) if max_priority_fee_per_gas else None, 600 value=int_to_big_endian(value), 601 to_address_n=to_n, 602 address_type=address_type, 603 type=2 if max_fee_per_gas else None 604 ) 605 elif address_type == types.EXCHANGE: #Ethereum exchange transaction 606 msg = proto.EthereumSignTx( 607 address_n=n, 608 nonce=int_to_big_endian(nonce), 609 gas_price=int_to_big_endian(gas_price) if gas_price else None, 610 gas_limit=int_to_big_endian(gas_limit), 611 max_fee_per_gas=int_to_big_endian(max_fee_per_gas) if max_fee_per_gas else None, 612 max_priority_fee_per_gas=int_to_big_endian(max_priority_fee_per_gas) if max_priority_fee_per_gas else None, 613 value=int_to_big_endian(value), 614 to_address_n=to_n, 615 exchange_type=exchange_type, 616 address_type=address_type, 617 type=2 if max_fee_per_gas else None 618 ) 619 else: 620 msg = proto.EthereumSignTx( 621 address_n=n, 622 nonce=int_to_big_endian(nonce), 623 gas_price=int_to_big_endian(gas_price) if gas_price else None, 624 gas_limit=int_to_big_endian(gas_limit), 625 max_fee_per_gas=int_to_big_endian(max_fee_per_gas) if max_fee_per_gas else None, 626 max_priority_fee_per_gas=int_to_big_endian(max_priority_fee_per_gas) if max_priority_fee_per_gas else None, 627 value=int_to_big_endian(value), 628 type=2 if max_fee_per_gas else None 629 ) 630 631 if to: 632 msg.to = to 633 634 if data: 635 msg.data_length = len(data) 636 data, chunk = data[1024:], data[:1024] 637 msg.data_initial_chunk = chunk 638 639 if chain_id: 640 msg.chain_id = chain_id 641 642 response = self.call(msg) 643 644 while response.HasField('data_length'): 645 data_length = response.data_length 646 data, chunk = data[data_length:], data[:data_length] 647 response = self.call(proto.EthereumTxAck(data_chunk=chunk)) 648 649 if address_type: 650 return response.signature_v, response.signature_r, response.signature_s, response.hash, response.signature_der 651 else: 652 return response.signature_v, response.signature_r, response.signature_s 653 654 @expect(eos_proto.EosPublicKey) 655 def eos_get_public_key(self, address_n, show_display=True, legacy=True): 656 msg = eos_proto.EosGetPublicKey( 657 address_n=address_n, 658 show_display=show_display, 659 kind = eos_proto.EOS if legacy else eos_proto.EOS_K1 660 ) 661 return self.call(msg) 662 663 @session 664 def eos_sign_tx_raw(self, msg, actions): 665 response = self.call(msg) 666 667 for common, action in actions: 668 if isinstance(action, eos_proto.EosActionTransfer): 669 msg = eos_proto.EosTxActionAck(common=common, transfer=action) 670 elif isinstance(action, eos_proto.EosActionDelegate): 671 msg = eos_proto.EosTxActionAck(common=common, delegate=action) 672 elif isinstance(action, eos_proto.EosActionUndelegate): 673 msg = eos_proto.EosTxActionAck(common=common, undelegate=action) 674 elif isinstance(action, eos_proto.EosActionRefund): 675 msg = eos_proto.EosTxActionAck(common=common, refund=action) 676 elif isinstance(action, eos_proto.EosActionBuyRam): 677 msg = eos_proto.EosTxActionAck(common=common, buy_ram=action) 678 elif isinstance(action, eos_proto.EosActionBuyRamBytes): 679 msg = eos_proto.EosTxActionAck(common=common, buy_ram_bytes=action) 680 elif isinstance(action, eos_proto.EosActionSellRam): 681 msg = eos_proto.EosTxActionAck(common=common, sell_ram=action) 682 elif isinstance(action, eos_proto.EosActionVoteProducer): 683 msg = eos_proto.EosTxActionAck(common=common, vote_producer=action) 684 elif isinstance(action, eos_proto.EosActionUpdateAuth): 685 msg = eos_proto.EosTxActionAck(common=common, update_auth=action) 686 elif isinstance(action, eos_proto.EosActionDeleteAuth): 687 msg = eos_proto.EosTxActionAck(common=common, delete_auth=action) 688 elif isinstance(action, eos_proto.EosActionUnlinkAuth): 689 msg = eos_proto.EosTxActionAck(common=common, unlink_auth=action) 690 elif isinstance(action, eos_proto.EosActionLinkAuth): 691 msg = eos_proto.EosTxActionAck(common=common, link_auth=action) 692 elif isinstance(action, eos_proto.EosActionNewAccount): 693 msg = eos_proto.EosTxActionAck(common=common, new_account=action) 694 elif isinstance(action, eos_proto.EosActionUnknown): 695 msg = eos_proto.EosTxActionAck(common=common, unknown=action) 696 else: 697 raise Exception("Unknown EOS Action") 698 699 response = self.call(msg) 700 701 if not isinstance(response, eos_proto.EosSignedTx): 702 raise Exception("Unexpected EOS signing response") 703 704 return response 705 706 @session 707 def eos_sign_tx(self, n, transaction): 708 tx = eos.parse_transaction_json(copy.deepcopy(transaction)) 709 710 header = eos_proto.EosTxHeader( 711 expiration=tx.expiration, 712 ref_block_num=tx.ref_block_num, 713 ref_block_prefix=tx.ref_block_prefix, 714 max_net_usage_words=tx.net_usage_words, 715 max_cpu_usage_ms=tx.max_cpu_usage_ms, 716 delay_sec=tx.delay_sec) 717 718 msg = eos_proto.EosSignTx( 719 address_n=n, 720 chain_id=tx.chain_id, 721 header=header, 722 num_actions=tx.num_actions) 723 724 response = self.call(msg) 725 726 try: 727 while isinstance(response, eos_proto.EosTxActionRequest): 728 a = eos.parse_action(tx.actions.pop(0)) 729 if isinstance(a, list): 730 while len(a) and isinstance(response, eos_proto.EosTxActionRequest): 731 response = self.call(a.pop(0)) 732 else: 733 response = self.call(a) 734 except IndexError: 735 # pop from empty list 736 raise Exception("Unexpected EOS signing response") 737 738 if not isinstance(response, eos_proto.EosSignedTx): 739 raise Exception("Unexpected EOS signing response") 740 741 return response 742 743 744 @expect(nano_proto.NanoAddress) 745 def nano_get_address(self, coin_name, address_n, show_display=False): 746 msg = nano_proto.NanoGetAddress( 747 coin_name=coin_name, 748 address_n=address_n, 749 show_display=show_display) 750 return self.call(msg) 751 752 753 @expect(nano_proto.NanoSignedTx) 754 def nano_sign_tx( 755 self, coin_name, address_n, 756 grandparent_hash=None, 757 parent_link=None, 758 parent_representative=None, 759 parent_balance=None, 760 link_hash=None, 761 link_recipient=None, 762 link_recipient_n=None, 763 representative=None, 764 balance=None, 765 ): 766 parent_block = None 767 if (grandparent_hash is not None or 768 parent_link is not None or 769 parent_representative is not None or 770 parent_balance is not None): 771 parent_block = nano_proto.NanoSignTx.ParentBlock( 772 parent_hash=grandparent_hash, 773 link=parent_link, 774 representative=parent_representative, 775 balance=nano.encode_balance(parent_balance), 776 ) 777 778 msg = nano_proto.NanoSignTx( 779 coin_name=coin_name, 780 address_n=address_n, 781 parent_block=parent_block, 782 link_hash=link_hash, 783 link_recipient=link_recipient, 784 link_recipient_n=link_recipient_n, 785 representative=representative, 786 balance=nano.encode_balance(balance), 787 ) 788 return self.call(msg) 789 790 @field('address') 791 @expect(cosmos_proto.CosmosAddress) 792 def cosmos_get_address(self, address_n, show_display=False): 793 return self.call( 794 cosmos_proto.CosmosGetAddress(address_n=address_n, show_display=show_display) 795 ) 796 797 @session 798 def cosmos_sign_tx( 799 self, 800 address_n, 801 account_number, 802 chain_id, 803 fee, 804 gas, 805 msgs, 806 memo, 807 sequence, 808 exchange_types=None 809 ): 810 resp = self.call(cosmos_proto.CosmosSignTx( 811 address_n=address_n, 812 account_number=account_number, 813 chain_id=chain_id, 814 fee_amount=fee, 815 gas=gas, 816 memo=memo, 817 sequence=sequence, 818 msg_count=len(msgs) 819 )) 820 821 for (msg, exchange_type) in zip(msgs, exchange_types or [None] * len(msgs)): 822 if not isinstance(resp, cosmos_proto.CosmosMsgRequest): 823 raise CallException( 824 "Cosmos.ExpectedMsgRequest", 825 "Message request expected but not received.", 826 ) 827 828 if msg['type'] == "cosmos-sdk/MsgSend": 829 if len(msg['value']['amount']) != 1: 830 raise CallException("Cosmos.MsgSend", "Multiple amounts per msg not supported") 831 832 denom = msg['value']['amount'][0]['denom'] 833 if denom != 'uatom': 834 raise CallException("Cosmos.MsgSend", "Unsupported denomination: " + denom) 835 836 resp = self.call(cosmos_proto.CosmosMsgAck( 837 send=cosmos_proto.CosmosMsgSend( 838 from_address=msg['value']['from_address'], 839 to_address=msg['value']['to_address'], 840 amount=int(msg['value']['amount'][0]['amount']), 841 address_type=types.EXCHANGE if exchange_type is not None else types.SPEND, 842 exchange_type=exchange_type 843 ) 844 )) 845 else: 846 raise CallException( 847 "Cosmos.UnknownMsg", 848 "Cosmos message %s is not yet supported" % (msg['type'],) 849 ) 850 851 if not isinstance(resp, cosmos_proto.CosmosSignedTx): 852 raise CallException( 853 "Cosmos.UnexpectedEndOfOperations", 854 "Reached end of operations without a signature.", 855 ) 856 857 return resp 858 859 @field('address') 860 @expect(thorchain_proto.ThorchainAddress) 861 def thorchain_get_address(self, address_n, show_display=False, testnet=False): 862 return self.call( 863 thorchain_proto.ThorchainGetAddress(address_n=address_n, show_display=show_display, testnet=testnet) 864 ) 865 866 @session 867 def thorchain_sign_tx( 868 self, 869 address_n, 870 account_number, 871 chain_id, 872 fee, 873 gas, 874 msgs, 875 memo, 876 sequence, 877 exchange_types=None, 878 testnet=None 879 ): 880 resp = self.call(thorchain_proto.ThorchainSignTx( 881 address_n=address_n, 882 account_number=account_number, 883 chain_id=chain_id, 884 fee_amount=fee, 885 gas=gas, 886 memo=memo, 887 sequence=sequence, 888 msg_count=len(msgs), 889 testnet=testnet 890 )) 891 892 for (msg, exchange_type) in zip(msgs, exchange_types or [None] * len(msgs)): 893 if not isinstance(resp, thorchain_proto.ThorchainMsgRequest): 894 raise CallException( 895 "Thorchain.ExpectedMsgRequest", 896 "Message request expected but not received.", 897 ) 898 899 if msg['type'] == "thorchain/MsgSend": 900 if len(msg['value']['amount']) != 1: 901 raise CallException("Thorchain.MsgSend", "Multiple amounts per send msg not supported") 902 903 denom = msg['value']['amount'][0]['denom'] 904 if denom != 'rune': 905 raise CallException("Thorchain.MsgSend", "Unsupported denomination: " + denom) 906 907 resp = self.call(thorchain_proto.ThorchainMsgAck( 908 send=thorchain_proto.ThorchainMsgSend( 909 from_address=msg['value']['from_address'], 910 to_address=msg['value']['to_address'], 911 amount=int(msg['value']['amount'][0]['amount']), 912 address_type=types.EXCHANGE if exchange_type is not None else types.SPEND, 913 exchange_type=exchange_type 914 ) 915 )) 916 917 elif msg['type'] == "thorchain/MsgDeposit": 918 if len(msg['value']['coins']) != 1: 919 raise CallException("Thorchain.MsgDeposit", "Multiple coins per deposit msg not supported") 920 921 asset = msg['value']['coins'][0]['asset'] 922 if asset != 'THOR.RUNE': 923 raise CallException("Thorchain.MsgDeposit", "Unsupported asset: " + asset) 924 925 resp = self.call(thorchain_proto.ThorchainMsgAck( 926 deposit=thorchain_proto.ThorchainMsgDeposit( 927 asset=asset, 928 amount=int(msg['value']['coins'][0]['amount']), 929 memo=msg['value']['memo'], 930 signer=msg['value']['signer'] 931 ) 932 )) 933 934 else: 935 raise CallException( 936 "Thorchain.UnknownMsg", 937 "Thorchain message %s is not yet supported" % (msg['type'],) 938 ) 939 940 if not isinstance(resp, thorchain_proto.ThorchainSignedTx): 941 raise CallException( 942 "Thorchain.UnexpectedEndOfOperations", 943 "Reached end of operations without a signature.", 944 ) 945 946 return resp 947 948 949 @field('address') 950 @expect(ripple_proto.RippleAddress) 951 def ripple_get_address(self, address_n, show_display=False): 952 return self.call( 953 ripple_proto.RippleGetAddress(address_n=address_n, show_display=show_display) 954 ) 955 956 @session 957 @expect(ripple_proto.RippleSignedTx) 958 def ripple_sign_tx(self, address_n, msg): 959 msg.address_n = address_n 960 return self.call(msg) 961 962 @field('entropy') 963 @expect(proto.Entropy) 964 def get_entropy(self, size): 965 return self.call(proto.GetEntropy(size=size)) 966 967 @field('message') 968 @expect(proto.Success) 969 def ping(self, msg, button_protection=False, pin_protection=False, passphrase_protection=False): 970 msg = proto.Ping(message=msg, 971 button_protection=button_protection, 972 pin_protection=pin_protection, 973 passphrase_protection=passphrase_protection) 974 return self.call(msg) 975 976 def get_device_id(self): 977 return self.features.device_id 978 979 @field('message') 980 @expect(proto.Success) 981 def apply_settings(self, label=None, language=None, use_passphrase=None, homescreen=None): 982 settings = proto.ApplySettings() 983 if label != None: 984 settings.label = label 985 if language: 986 settings.language = language 987 if use_passphrase != None: 988 settings.use_passphrase = use_passphrase 989 990 out = self.call(settings) 991 self.init_device() # Reload Features 992 return out 993 994 @field('message') 995 @expect(proto.Success) 996 def apply_policy(self, policy_name, enabled): 997 policy = types.PolicyType(policy_name=policy_name, enabled=enabled) 998 apply_policies = proto.ApplyPolicies(policy=[policy]) 999 1000 out = self.call(apply_policies) 1001 self.init_device() # Reload Features 1002 return out 1003 1004 @field('message') 1005 @expect(proto.Success) 1006 def clear_session(self): 1007 return self.call(proto.ClearSession()) 1008 1009 @field('message') 1010 @expect(proto.Success) 1011 def change_pin(self, remove=False): 1012 ret = self.call(proto.ChangePin(remove=remove)) 1013 self.init_device() # Re-read features 1014 return ret 1015 1016 @expect(proto.MessageSignature) 1017 def sign_message(self, coin_name, n, message, script_type=types.SPENDADDRESS): 1018 n = self._convert_prime(n) 1019 # Convert message to UTF8 NFC (seems to be a bitcoin-qt standard) 1020 message = normalize_nfc(message).encode("utf-8") 1021 return self.call(proto.SignMessage(coin_name=coin_name, address_n=n, message=message, script_type=script_type)) 1022 1023 @expect(proto.SignedIdentity) 1024 def sign_identity(self, identity, challenge_hidden, challenge_visual, ecdsa_curve_name=DEFAULT_CURVE): 1025 return self.call(proto.SignIdentity(identity=identity, challenge_hidden=challenge_hidden, challenge_visual=challenge_visual, ecdsa_curve_name=ecdsa_curve_name)) 1026 1027 1028 def verify_message(self, coin_name, address, signature, message): 1029 # Convert message to UTF8 NFC (seems to be a bitcoin-qt standard) 1030 message = normalize_nfc(message).encode("utf-8") 1031 try: 1032 resp = self.call(proto.VerifyMessage(address=address, signature=signature, message=message, coin_name=coin_name)) 1033 except CallException as e: 1034 resp = e 1035 if isinstance(resp, proto.Success): 1036 return True 1037 return False 1038 1039 @field('value') 1040 @expect(proto.CipheredKeyValue) 1041 def encrypt_keyvalue(self, n, key, value, ask_on_encrypt=True, ask_on_decrypt=True, iv=b''): 1042 n = self._convert_prime(n) 1043 return self.call(proto.CipherKeyValue(address_n=n, 1044 key=key, 1045 value=value, 1046 encrypt=True, 1047 ask_on_encrypt=ask_on_encrypt, 1048 ask_on_decrypt=ask_on_decrypt, 1049 iv=iv)) 1050 1051 @field('value') 1052 @expect(proto.CipheredKeyValue) 1053 def decrypt_keyvalue(self, n, key, value, ask_on_encrypt=True, ask_on_decrypt=True, iv=b''): 1054 n = self._convert_prime(n) 1055 return self.call(proto.CipherKeyValue(address_n=n, 1056 key=key, 1057 value=value, 1058 encrypt=False, 1059 ask_on_encrypt=ask_on_encrypt, 1060 ask_on_decrypt=ask_on_decrypt, 1061 iv=iv)) 1062 1063 def _prepare_sign_tx(self, coin_name, inputs, outputs): 1064 tx = types.TransactionType() 1065 tx.inputs.extend(inputs) 1066 tx.outputs.extend(outputs) 1067 1068 txes = {None: tx} 1069 txes[b''] = tx 1070 1071 force_bip143 = ['BitcoinGold', 'BitcoinCash', 'BitcoinSV'] 1072 if coin_name in force_bip143: 1073 return txes 1074 1075 known_hashes = [] 1076 for inp in inputs: 1077 if inp.prev_hash in txes: 1078 continue 1079 1080 if inp.script_type in (types.SPENDP2SHWITNESS, 1081 types.SPENDWITNESS): 1082 continue 1083 1084 if not self.tx_api: 1085 raise Exception('TX_API not defined') 1086 1087 prev_tx = self.tx_api.get_tx(binascii.hexlify(inp.prev_hash).decode('utf-8')) 1088 txes[inp.prev_hash] = prev_tx 1089 1090 return txes 1091 1092 @session 1093 def sign_tx(self, coin_name, inputs, outputs, version=None, lock_time=None, debug_processor=None): 1094 1095 start = time.time() 1096 txes = self._prepare_sign_tx(coin_name, inputs, outputs) 1097 1098 # Prepare and send initial message 1099 tx = proto.SignTx() 1100 tx.inputs_count = len(inputs) 1101 tx.outputs_count = len(outputs) 1102 tx.coin_name = coin_name 1103 if version is not None: 1104 tx.version = version 1105 if lock_time is not None: 1106 tx.lock_time = lock_time 1107 res = self.call(tx) 1108 1109 # Prepare structure for signatures 1110 signatures = [None] * len(inputs) 1111 serialized_tx = b'' 1112 1113 counter = 0 1114 while True: 1115 counter += 1 1116 1117 if isinstance(res, proto.Failure): 1118 raise CallException("Signing failed") 1119 1120 if not isinstance(res, proto.TxRequest): 1121 raise CallException("Unexpected message") 1122 1123 # If there's some part of signed transaction, let's add it 1124 if res.HasField('serialized') and res.serialized.HasField('serialized_tx'): 1125 if self.verbose: 1126 log("RECEIVED PART OF SERIALIZED TX (%d BYTES)" % len(res.serialized.serialized_tx)) 1127 serialized_tx += res.serialized.serialized_tx 1128 1129 if res.HasField('serialized') and res.serialized.HasField('signature_index'): 1130 if signatures[res.serialized.signature_index] != None: 1131 raise Exception("Signature for index %d already filled" % res.serialized.signature_index) 1132 signatures[res.serialized.signature_index] = res.serialized.signature 1133 1134 if res.request_type == types.TXFINISHED: 1135 # Device didn't ask for more information, finish workflow 1136 break 1137 1138 # Device asked for one more information, let's process it. 1139 if not res.details.tx_hash: 1140 current_tx = txes[None] 1141 else: 1142 current_tx = txes[bytes(res.details.tx_hash)] 1143 1144 if res.request_type == types.TXMETA: 1145 msg = types.TransactionType() 1146 msg.version = current_tx.version 1147 msg.lock_time = current_tx.lock_time 1148 msg.inputs_cnt = len(current_tx.inputs) 1149 if res.details.tx_hash: 1150 msg.outputs_cnt = len(current_tx.bin_outputs) 1151 else: 1152 msg.outputs_cnt = len(current_tx.outputs) 1153 msg.extra_data_len = len(current_tx.extra_data) if current_tx.extra_data else 0 1154 res = self.call(proto.TxAck(tx=msg)) 1155 continue 1156 1157 elif res.request_type == types.TXINPUT: 1158 msg = types.TransactionType() 1159 msg.inputs.extend([current_tx.inputs[res.details.request_index], ]) 1160 if debug_processor is not None: 1161 # msg needs to be deep copied so when it's modified 1162 # the other messages stay intact 1163 from copy import deepcopy 1164 msg = deepcopy(msg) 1165 # If debug_processor function is provided, 1166 # pass thru it the request and prepared response. 1167 # This is useful for tests, see test_msg_signtx 1168 msg = debug_processor(res, msg) 1169 res = self.call(proto.TxAck(tx=msg)) 1170 continue 1171 1172 elif res.request_type == types.TXOUTPUT: 1173 msg = types.TransactionType() 1174 if res.details.tx_hash: 1175 msg.bin_outputs.extend([current_tx.bin_outputs[res.details.request_index], ]) 1176 else: 1177 msg.outputs.extend([current_tx.outputs[res.details.request_index], ]) 1178 1179 if debug_processor != None: 1180 # msg needs to be deep copied so when it's modified 1181 # the other messages stay intact 1182 from copy import deepcopy 1183 msg = deepcopy(msg) 1184 # If debug_processor function is provided, 1185 # pass thru it the request and prepared response. 1186 # This is useful for tests, see test_msg_signtx 1187 msg = debug_processor(res, msg) 1188 1189 res = self.call(proto.TxAck(tx=msg)) 1190 continue 1191 1192 elif res.request_type == types.TXEXTRADATA: 1193 o, l = res.details.extra_data_offset, res.details.extra_data_len 1194 msg = types.TransactionType() 1195 msg.extra_data = current_tx.extra_data[o:o + l] 1196 res = self.call(proto.TxAck(tx=msg)) 1197 continue 1198 1199 if None in signatures: 1200 raise Exception("Some signatures are missing!") 1201 1202 if self.verbose: 1203 log("SIGNED IN %.03f SECONDS, CALLED %d MESSAGES, %d BYTES" % \ 1204 (time.time() - start, counter, len(serialized_tx))) 1205 1206 return (signatures, serialized_tx) 1207 1208 @field('message') 1209 @expect(proto.Success) 1210 def wipe_device(self): 1211 ret = self.call(proto.WipeDevice()) 1212 self.init_device() 1213 return ret 1214 1215 @field('message') 1216 @expect(proto.Success) 1217 def recovery_device(self, use_trezor_method, word_count, passphrase_protection, pin_protection, label, language): 1218 if self.features.initialized: 1219 raise Exception("Device is initialized already. Call wipe_device() and try again.") 1220 if use_trezor_method: 1221 raise Exception("Trezor-style recovery is no longer supported") 1222 elif word_count not in (12, 18, 24): 1223 raise Exception("Invalid word count. Use 12/18/24") 1224 1225 res = self.call(proto.RecoveryDevice(word_count=int(word_count), 1226 passphrase_protection=bool(passphrase_protection), 1227 pin_protection=bool(pin_protection), 1228 label=label, 1229 language=language, 1230 enforce_wordlist=True, 1231 use_character_cipher=True)) 1232 1233 self.init_device() 1234 return res 1235 1236 @field('message') 1237 @expect(proto.Success) 1238 def test_recovery_seed(self, word_count, language): 1239 if not self.features.initialized: 1240 raise Exception("Device must already be initialized in order to perform test recovery") 1241 elif word_count not in (12, 18, 24): 1242 raise Exception("Invalid word count. Use 12/18/24") 1243 res = self.call(proto.RecoveryDevice(word_count=int(word_count), 1244 language=language, 1245 enforce_wordlist=True, 1246 use_character_cipher=True, 1247 dry_run=True)) 1248 1249 self.init_device() 1250 return res 1251 1252 @field('message') 1253 @expect(proto.Success) 1254 @session 1255 def reset_device(self, display_random, strength, passphrase_protection, pin_protection, label, language): 1256 if self.features.initialized: 1257 raise Exception("Device is initialized already. Call wipe_device() and try again.") 1258 1259 # Begin with device reset workflow 1260 msg = proto.ResetDevice(display_random=display_random, 1261 strength=strength, 1262 language=language, 1263 passphrase_protection=bool(passphrase_protection), 1264 pin_protection=bool(pin_protection), 1265 label=label) 1266 1267 resp = self.call(msg) 1268 if not isinstance(resp, proto.EntropyRequest): 1269 raise Exception("Invalid response, expected EntropyRequest") 1270 1271 external_entropy = self._get_local_entropy() 1272 if self.verbose: 1273 log("Computer generated entropy: " + binascii.hexlify(external_entropy).decode('ascii')) 1274 ret = self.call(proto.EntropyAck(entropy=external_entropy)) 1275 self.init_device() 1276 return ret 1277 1278 @field('message') 1279 @expect(proto.Success) 1280 def load_device_by_mnemonic(self, mnemonic, pin, passphrase_protection, label, language, skip_checksum=False): 1281 m = Mnemonic('english') 1282 if not skip_checksum and not m.check(mnemonic): 1283 raise Exception("Invalid mnemonic checksum") 1284 1285 # Convert mnemonic to UTF8 NKFD 1286 mnemonic = Mnemonic.normalize_string(mnemonic) 1287 1288 # Convert mnemonic to ASCII stream 1289 mnemonic = normalize_nfc(mnemonic) 1290 1291 if self.features.initialized: 1292 raise Exception("Device is initialized already. Call wipe_device() and try again.") 1293 1294 resp = self.call(proto.LoadDevice(mnemonic=mnemonic, pin=pin, 1295 passphrase_protection=passphrase_protection, 1296 language=language, 1297 label=label, 1298 skip_checksum=skip_checksum)) 1299 self.init_device() 1300 return resp 1301 1302 @field('message') 1303 @expect(proto.Success) 1304 def load_device_by_xprv(self, xprv, pin, passphrase_protection, label, language): 1305 if self.features.initialized: 1306 raise Exception("Device is initialized already. Call wipe_device() and try again.") 1307 1308 if xprv[0:4] not in ('xprv', 'tprv'): 1309 raise Exception("Unknown type of xprv") 1310 1311 if len(xprv) < 100 and len(xprv) > 112: 1312 raise Exception("Invalid length of xprv") 1313 1314 node = types.HDNodeType() 1315 data = binascii.hexlify(tools.b58decode(xprv, None)) 1316 1317 if data[90:92] != b'00': 1318 raise Exception("Contain invalid private key") 1319 1320 checksum = binascii.hexlify(hashlib.sha256(hashlib.sha256(binascii.unhexlify(data[:156])).digest()).digest()[:4]) 1321 if checksum != data[156:]: 1322 raise Exception("Checksum doesn't match") 1323 1324 # version 0488ade4 1325 # depth 00 1326 # fingerprint 00000000 1327 # child_num 00000000 1328 # chaincode 873dff81c02f525623fd1fe5167eac3a55a049de3d314bb42ee227ffed37d508 1329 # privkey 00e8f32e723decf4051aefac8e2c93c9c5b214313817cdb01a1494b917c8436b35 1330 # checksum e77e9d71 1331 1332 node.depth = int(data[8:10], 16) 1333 node.fingerprint = int(data[10:18], 16) 1334 node.child_num = int(data[18:26], 16) 1335 node.chain_code = binascii.unhexlify(data[26:90]) 1336 node.private_key = binascii.unhexlify(data[92:156]) # skip 0x00 indicating privkey 1337 1338 resp = self.call(proto.LoadDevice(node=node, 1339 pin=pin, 1340 passphrase_protection=passphrase_protection, 1341 language=language, 1342 label=label)) 1343 self.init_device() 1344 return resp 1345 1346 def firmware_update(self, fp): 1347 if self.features.bootloader_mode == False: 1348 raise Exception("Device must be in bootloader mode") 1349 1350 resp = self.call(proto.FirmwareErase()) 1351 if isinstance(resp, proto.Failure) and resp.code == types.Failure_FirmwareError: 1352 return False 1353 1354 data = fp.read() 1355 data_hash = hashlib.sha256(data).digest() 1356 1357 resp = self.call(proto.FirmwareUpload(payload_hash=data_hash, payload=data)) 1358 1359 if isinstance(resp, proto.Success): 1360 return True 1361 1362 elif isinstance(resp, proto.Failure) and resp.code == types.Failure_FirmwareError: 1363 return False 1364 1365 raise Exception("Unexpected result %s" % resp) 1366 1367class KeepKeyClient(ProtocolMixin, TextUIMixin, BaseClient): 1368 pass 1369 1370class KeepKeyClientVerbose(ProtocolMixin, TextUIMixin, DebugWireMixin, BaseClient): 1371 pass 1372 1373class KeepKeyDebuglinkClient(ProtocolMixin, DebugLinkMixin, BaseClient): 1374 pass 1375 1376class KeepKeyDebuglinkClientVerbose(ProtocolMixin, DebugLinkMixin, DebugWireMixin, BaseClient): 1377 pass 1378