1#!/usr/local/bin/python3.8 2# -*- mode: python -*- 3# 4# Electrum - lightweight Bitcoin client 5# Copyright (C) 2016 The Electrum developers 6# 7# Permission is hereby granted, free of charge, to any person 8# obtaining a copy of this software and associated documentation files 9# (the "Software"), to deal in the Software without restriction, 10# including without limitation the rights to use, copy, modify, merge, 11# publish, distribute, sublicense, and/or sell copies of the Software, 12# and to permit persons to whom the Software is furnished to do so, 13# subject to the following conditions: 14# 15# The above copyright notice and this permission notice shall be 16# included in all copies or substantial portions of the Software. 17# 18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 19# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 20# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 21# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 22# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 23# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 24# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 25# SOFTWARE. 26 27from unicodedata import normalize 28import hashlib 29import re 30from typing import Tuple, TYPE_CHECKING, Union, Sequence, Optional, Dict, List, NamedTuple 31from functools import lru_cache 32from abc import ABC, abstractmethod 33 34from . import bitcoin, ecc, constants, bip32 35from .bitcoin import deserialize_privkey, serialize_privkey, BaseDecodeError 36from .transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput, TxInput 37from .bip32 import (convert_bip32_path_to_list_of_uint32, BIP32_PRIME, 38 is_xpub, is_xprv, BIP32Node, normalize_bip32_derivation, 39 convert_bip32_intpath_to_strpath, is_xkey_consistent_with_key_origin_info) 40from .ecc import string_to_number 41from .crypto import (pw_decode, pw_encode, sha256, sha256d, PW_HASH_VERSION_LATEST, 42 SUPPORTED_PW_HASH_VERSIONS, UnsupportedPasswordHashVersion, hash_160) 43from .util import (InvalidPassword, WalletFileException, 44 BitcoinException, bh2u, bfh, inv_dict, is_hex_str) 45from .mnemonic import Mnemonic, Wordlist, seed_type, is_seed 46from .plugin import run_hook 47from .logging import Logger 48 49if TYPE_CHECKING: 50 from .gui.qt.util import TaskThread 51 from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase 52 from .wallet_db import WalletDB 53 54 55class CannotDerivePubkey(Exception): pass 56 57 58class KeyStore(Logger, ABC): 59 type: str 60 61 def __init__(self): 62 Logger.__init__(self) 63 self.is_requesting_to_be_rewritten_to_wallet_file = False # type: bool 64 65 def has_seed(self) -> bool: 66 return False 67 68 def is_watching_only(self) -> bool: 69 return False 70 71 def can_import(self) -> bool: 72 return False 73 74 def get_type_text(self) -> str: 75 return f'{self.type}' 76 77 @abstractmethod 78 def may_have_password(self): 79 """Returns whether the keystore can be encrypted with a password.""" 80 pass 81 82 def _get_tx_derivations(self, tx: 'PartialTransaction') -> Dict[str, Union[Sequence[int], str]]: 83 keypairs = {} 84 for txin in tx.inputs(): 85 keypairs.update(self._get_txin_derivations(txin)) 86 return keypairs 87 88 def _get_txin_derivations(self, txin: 'PartialTxInput') -> Dict[str, Union[Sequence[int], str]]: 89 if txin.is_complete(): 90 return {} 91 keypairs = {} 92 for pubkey in txin.pubkeys: 93 if pubkey in txin.part_sigs: 94 # this pubkey already signed 95 continue 96 derivation = self.get_pubkey_derivation(pubkey, txin) 97 if not derivation: 98 continue 99 keypairs[pubkey.hex()] = derivation 100 return keypairs 101 102 def can_sign(self, tx: 'Transaction', *, ignore_watching_only=False) -> bool: 103 """Returns whether this keystore could sign *something* in this tx.""" 104 if not ignore_watching_only and self.is_watching_only(): 105 return False 106 if not isinstance(tx, PartialTransaction): 107 return False 108 return bool(self._get_tx_derivations(tx)) 109 110 def can_sign_txin(self, txin: 'TxInput', *, ignore_watching_only=False) -> bool: 111 """Returns whether this keystore could sign this txin.""" 112 if not ignore_watching_only and self.is_watching_only(): 113 return False 114 if not isinstance(txin, PartialTxInput): 115 return False 116 return bool(self._get_txin_derivations(txin)) 117 118 def ready_to_sign(self) -> bool: 119 return not self.is_watching_only() 120 121 @abstractmethod 122 def dump(self) -> dict: 123 pass 124 125 @abstractmethod 126 def is_deterministic(self) -> bool: 127 pass 128 129 @abstractmethod 130 def sign_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes: 131 pass 132 133 @abstractmethod 134 def decrypt_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes: 135 pass 136 137 @abstractmethod 138 def sign_transaction(self, tx: 'PartialTransaction', password) -> None: 139 pass 140 141 @abstractmethod 142 def get_pubkey_derivation(self, pubkey: bytes, 143 txinout: Union['PartialTxInput', 'PartialTxOutput'], 144 *, only_der_suffix=True) \ 145 -> Union[Sequence[int], str, None]: 146 """Returns either a derivation int-list if the pubkey can be HD derived from this keystore, 147 the pubkey itself (hex) if the pubkey belongs to the keystore but not HD derived, 148 or None if the pubkey is unrelated. 149 """ 150 pass 151 152 def find_my_pubkey_in_txinout( 153 self, txinout: Union['PartialTxInput', 'PartialTxOutput'], 154 *, only_der_suffix: bool = False 155 ) -> Tuple[Optional[bytes], Optional[List[int]]]: 156 # note: we assume that this cosigner only has one pubkey in this txin/txout 157 for pubkey in txinout.bip32_paths: 158 path = self.get_pubkey_derivation(pubkey, txinout, only_der_suffix=only_der_suffix) 159 if path and not isinstance(path, (str, bytes)): 160 return pubkey, list(path) 161 return None, None 162 163 def can_have_deterministic_lightning_xprv(self) -> bool: 164 return False 165 166 167class Software_KeyStore(KeyStore): 168 169 def __init__(self, d): 170 KeyStore.__init__(self) 171 self.pw_hash_version = d.get('pw_hash_version', 1) 172 if self.pw_hash_version not in SUPPORTED_PW_HASH_VERSIONS: 173 raise UnsupportedPasswordHashVersion(self.pw_hash_version) 174 175 def may_have_password(self): 176 return not self.is_watching_only() 177 178 def sign_message(self, sequence, message, password) -> bytes: 179 privkey, compressed = self.get_private_key(sequence, password) 180 key = ecc.ECPrivkey(privkey) 181 return key.sign_message(message, compressed) 182 183 def decrypt_message(self, sequence, message, password) -> bytes: 184 privkey, compressed = self.get_private_key(sequence, password) 185 ec = ecc.ECPrivkey(privkey) 186 decrypted = ec.decrypt_message(message) 187 return decrypted 188 189 def sign_transaction(self, tx, password): 190 if self.is_watching_only(): 191 return 192 # Raise if password is not correct. 193 self.check_password(password) 194 # Add private keys 195 keypairs = self._get_tx_derivations(tx) 196 for k, v in keypairs.items(): 197 keypairs[k] = self.get_private_key(v, password) 198 # Sign 199 if keypairs: 200 tx.sign(keypairs) 201 202 @abstractmethod 203 def update_password(self, old_password, new_password): 204 pass 205 206 @abstractmethod 207 def check_password(self, password): 208 pass 209 210 @abstractmethod 211 def get_private_key(self, sequence: 'AddressIndexGeneric', password) -> Tuple[bytes, bool]: 212 """Returns (privkey, is_compressed)""" 213 pass 214 215 216class Imported_KeyStore(Software_KeyStore): 217 # keystore for imported private keys 218 219 type = 'imported' 220 221 def __init__(self, d): 222 Software_KeyStore.__init__(self, d) 223 self.keypairs = d.get('keypairs', {}) # type: Dict[str, str] 224 225 def is_deterministic(self): 226 return False 227 228 def dump(self): 229 return { 230 'type': self.type, 231 'keypairs': self.keypairs, 232 'pw_hash_version': self.pw_hash_version, 233 } 234 235 def can_import(self): 236 return True 237 238 def check_password(self, password): 239 pubkey = list(self.keypairs.keys())[0] 240 self.get_private_key(pubkey, password) 241 242 def import_privkey(self, sec, password): 243 txin_type, privkey, compressed = deserialize_privkey(sec) 244 pubkey = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed) 245 # re-serialize the key so the internal storage format is consistent 246 serialized_privkey = serialize_privkey( 247 privkey, compressed, txin_type, internal_use=True) 248 # NOTE: if the same pubkey is reused for multiple addresses (script types), 249 # there will only be one pubkey-privkey pair for it in self.keypairs, 250 # and the privkey will encode a txin_type but that txin_type cannot be trusted. 251 # Removing keys complicates this further. 252 self.keypairs[pubkey] = pw_encode(serialized_privkey, password, version=self.pw_hash_version) 253 return txin_type, pubkey 254 255 def delete_imported_key(self, key): 256 self.keypairs.pop(key) 257 258 def get_private_key(self, pubkey: str, password): 259 sec = pw_decode(self.keypairs[pubkey], password, version=self.pw_hash_version) 260 try: 261 txin_type, privkey, compressed = deserialize_privkey(sec) 262 except BaseDecodeError as e: 263 raise InvalidPassword() from e 264 if pubkey != ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed): 265 raise InvalidPassword() 266 return privkey, compressed 267 268 def get_pubkey_derivation(self, pubkey, txin, *, only_der_suffix=True): 269 if pubkey.hex() in self.keypairs: 270 return pubkey.hex() 271 return None 272 273 def update_password(self, old_password, new_password): 274 self.check_password(old_password) 275 if new_password == '': 276 new_password = None 277 for k, v in self.keypairs.items(): 278 b = pw_decode(v, old_password, version=self.pw_hash_version) 279 c = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST) 280 self.keypairs[k] = c 281 self.pw_hash_version = PW_HASH_VERSION_LATEST 282 283 284class Deterministic_KeyStore(Software_KeyStore): 285 286 def __init__(self, d): 287 Software_KeyStore.__init__(self, d) 288 self.seed = d.get('seed', '') # only electrum seeds 289 self.passphrase = d.get('passphrase', '') 290 self._seed_type = d.get('seed_type', None) # only electrum seeds 291 292 def is_deterministic(self): 293 return True 294 295 def dump(self): 296 d = { 297 'type': self.type, 298 'pw_hash_version': self.pw_hash_version, 299 } 300 if self.seed: 301 d['seed'] = self.seed 302 if self.passphrase: 303 d['passphrase'] = self.passphrase 304 if self._seed_type: 305 d['seed_type'] = self._seed_type 306 return d 307 308 def has_seed(self): 309 return bool(self.seed) 310 311 def get_seed_type(self) -> Optional[str]: 312 return self._seed_type 313 314 def is_watching_only(self): 315 return not self.has_seed() 316 317 @abstractmethod 318 def format_seed(self, seed: str) -> str: 319 pass 320 321 def add_seed(self, seed): 322 if self.seed: 323 raise Exception("a seed exists") 324 self.seed = self.format_seed(seed) 325 self._seed_type = seed_type(seed) or None 326 327 def get_seed(self, password): 328 if not self.has_seed(): 329 raise Exception("This wallet has no seed words") 330 return pw_decode(self.seed, password, version=self.pw_hash_version) 331 332 def get_passphrase(self, password): 333 if self.passphrase: 334 return pw_decode(self.passphrase, password, version=self.pw_hash_version) 335 else: 336 return '' 337 338 339class MasterPublicKeyMixin(ABC): 340 341 @abstractmethod 342 def get_master_public_key(self) -> str: 343 pass 344 345 @abstractmethod 346 def get_derivation_prefix(self) -> Optional[str]: 347 """Returns to bip32 path from some root node to self.xpub 348 Note that the return value might be None; if it is unknown. 349 """ 350 pass 351 352 @abstractmethod 353 def get_root_fingerprint(self) -> Optional[str]: 354 """Returns the bip32 fingerprint of the top level node. 355 This top level node is the node at the beginning of the derivation prefix, 356 i.e. applying the derivation prefix to it will result self.xpub 357 Note that the return value might be None; if it is unknown. 358 """ 359 pass 360 361 @abstractmethod 362 def get_fp_and_derivation_to_be_used_in_partial_tx( 363 self, 364 der_suffix: Sequence[int], 365 *, 366 only_der_suffix: bool, 367 ) -> Tuple[bytes, Sequence[int]]: 368 """Returns fingerprint and derivation path corresponding to a derivation suffix. 369 The fingerprint is either the root fp or the intermediate fp, depending on what is available 370 and 'only_der_suffix', and the derivation path is adjusted accordingly. 371 """ 372 pass 373 374 @abstractmethod 375 def derive_pubkey(self, for_change: int, n: int) -> bytes: 376 """Returns pubkey at given path. 377 May raise CannotDerivePubkey. 378 """ 379 pass 380 381 def get_pubkey_derivation( 382 self, 383 pubkey: bytes, 384 txinout: Union['PartialTxInput', 'PartialTxOutput'], 385 *, 386 only_der_suffix=True, 387 ) -> Union[Sequence[int], str, None]: 388 EXPECTED_DER_SUFFIX_LEN = 2 389 def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool: 390 if len(der_suffix) != EXPECTED_DER_SUFFIX_LEN: 391 return False 392 try: 393 if pubkey != self.derive_pubkey(*der_suffix): 394 return False 395 except CannotDerivePubkey: 396 return False 397 return True 398 399 if pubkey not in txinout.bip32_paths: 400 return None 401 fp_found, path_found = txinout.bip32_paths[pubkey] 402 der_suffix = None 403 full_path = None 404 # 1. try fp against our root 405 ks_root_fingerprint_hex = self.get_root_fingerprint() 406 ks_der_prefix_str = self.get_derivation_prefix() 407 ks_der_prefix = convert_bip32_path_to_list_of_uint32(ks_der_prefix_str) if ks_der_prefix_str else None 408 if (ks_root_fingerprint_hex is not None and ks_der_prefix is not None and 409 fp_found.hex() == ks_root_fingerprint_hex): 410 if path_found[:len(ks_der_prefix)] == ks_der_prefix: 411 der_suffix = path_found[len(ks_der_prefix):] 412 if not test_der_suffix_against_pubkey(der_suffix, pubkey): 413 der_suffix = None 414 # 2. try fp against our intermediate fingerprint 415 if (der_suffix is None and isinstance(self, Xpub) and 416 fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()): 417 der_suffix = path_found 418 if not test_der_suffix_against_pubkey(der_suffix, pubkey): 419 der_suffix = None 420 # 3. hack/bruteforce: ignore fp and check pubkey anyway 421 # This is only to resolve the following scenario/problem: 422 # problem: if we don't know our root fp, but tx contains root fp and full path, 423 # we will miss the pubkey (false negative match). Though it might still work 424 # within gap limit due to tx.add_info_from_wallet overwriting the fields. 425 # Example: keystore has intermediate xprv without root fp; tx contains root fp and full path. 426 if der_suffix is None: 427 der_suffix = path_found[-EXPECTED_DER_SUFFIX_LEN:] 428 if not test_der_suffix_against_pubkey(der_suffix, pubkey): 429 der_suffix = None 430 # if all attempts/methods failed, we give up now: 431 if der_suffix is None: 432 return None 433 if ks_der_prefix is not None: 434 full_path = ks_der_prefix + list(der_suffix) 435 return der_suffix if only_der_suffix else full_path 436 437 438class Xpub(MasterPublicKeyMixin): 439 440 def __init__(self, *, derivation_prefix: str = None, root_fingerprint: str = None): 441 self.xpub = None 442 self.xpub_receive = None 443 self.xpub_change = None 444 self._xpub_bip32_node = None # type: Optional[BIP32Node] 445 446 # "key origin" info (subclass should persist these): 447 self._derivation_prefix = derivation_prefix # type: Optional[str] 448 self._root_fingerprint = root_fingerprint # type: Optional[str] 449 450 def get_master_public_key(self): 451 return self.xpub 452 453 def get_bip32_node_for_xpub(self) -> Optional[BIP32Node]: 454 if self._xpub_bip32_node is None: 455 if self.xpub is None: 456 return None 457 self._xpub_bip32_node = BIP32Node.from_xkey(self.xpub) 458 return self._xpub_bip32_node 459 460 def get_derivation_prefix(self) -> Optional[str]: 461 return self._derivation_prefix 462 463 def get_root_fingerprint(self) -> Optional[str]: 464 return self._root_fingerprint 465 466 def get_fp_and_derivation_to_be_used_in_partial_tx( 467 self, 468 der_suffix: Sequence[int], 469 *, 470 only_der_suffix: bool, 471 ) -> Tuple[bytes, Sequence[int]]: 472 fingerprint_hex = self.get_root_fingerprint() 473 der_prefix_str = self.get_derivation_prefix() 474 if not only_der_suffix and fingerprint_hex is not None and der_prefix_str is not None: 475 # use root fp, and true full path 476 fingerprint_bytes = bfh(fingerprint_hex) 477 der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str) 478 else: 479 # use intermediate fp, and claim der suffix is the full path 480 fingerprint_bytes = self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node() 481 der_prefix_ints = convert_bip32_path_to_list_of_uint32('m') 482 der_full = der_prefix_ints + list(der_suffix) 483 return fingerprint_bytes, der_full 484 485 def get_xpub_to_be_used_in_partial_tx(self, *, only_der_suffix: bool) -> str: 486 assert self.xpub 487 fp_bytes, der_full = self.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[], 488 only_der_suffix=only_der_suffix) 489 bip32node = self.get_bip32_node_for_xpub() 490 depth = len(der_full) 491 child_number_int = der_full[-1] if len(der_full) >= 1 else 0 492 child_number_bytes = child_number_int.to_bytes(length=4, byteorder="big") 493 fingerprint = bytes(4) if depth == 0 else bip32node.fingerprint 494 bip32node = bip32node._replace(depth=depth, 495 fingerprint=fingerprint, 496 child_number=child_number_bytes) 497 return bip32node.to_xpub() 498 499 def add_key_origin_from_root_node(self, *, derivation_prefix: str, root_node: BIP32Node): 500 assert self.xpub 501 # try to derive ourselves from what we were given 502 child_node1 = root_node.subkey_at_private_derivation(derivation_prefix) 503 child_pubkey_bytes1 = child_node1.eckey.get_public_key_bytes(compressed=True) 504 child_node2 = self.get_bip32_node_for_xpub() 505 child_pubkey_bytes2 = child_node2.eckey.get_public_key_bytes(compressed=True) 506 if child_pubkey_bytes1 != child_pubkey_bytes2: 507 raise Exception("(xpub, derivation_prefix, root_node) inconsistency") 508 self.add_key_origin(derivation_prefix=derivation_prefix, 509 root_fingerprint=root_node.calc_fingerprint_of_this_node().hex().lower()) 510 511 def add_key_origin(self, *, derivation_prefix: str = None, root_fingerprint: str = None) -> None: 512 assert self.xpub 513 if not (root_fingerprint is None or (is_hex_str(root_fingerprint) and len(root_fingerprint) == 8)): 514 raise Exception("root fp must be 8 hex characters") 515 derivation_prefix = normalize_bip32_derivation(derivation_prefix) 516 if not is_xkey_consistent_with_key_origin_info(self.xpub, 517 derivation_prefix=derivation_prefix, 518 root_fingerprint=root_fingerprint): 519 raise Exception("xpub inconsistent with provided key origin info") 520 if root_fingerprint is not None: 521 self._root_fingerprint = root_fingerprint 522 if derivation_prefix is not None: 523 self._derivation_prefix = derivation_prefix 524 self.is_requesting_to_be_rewritten_to_wallet_file = True 525 526 @lru_cache(maxsize=None) 527 def derive_pubkey(self, for_change: int, n: int) -> bytes: 528 for_change = int(for_change) 529 if for_change not in (0, 1): 530 raise CannotDerivePubkey("forbidden path") 531 xpub = self.xpub_change if for_change else self.xpub_receive 532 if xpub is None: 533 rootnode = self.get_bip32_node_for_xpub() 534 xpub = rootnode.subkey_at_public_derivation((for_change,)).to_xpub() 535 if for_change: 536 self.xpub_change = xpub 537 else: 538 self.xpub_receive = xpub 539 return self.get_pubkey_from_xpub(xpub, (n,)) 540 541 @classmethod 542 def get_pubkey_from_xpub(self, xpub: str, sequence) -> bytes: 543 node = BIP32Node.from_xkey(xpub).subkey_at_public_derivation(sequence) 544 return node.eckey.get_public_key_bytes(compressed=True) 545 546 547class BIP32_KeyStore(Xpub, Deterministic_KeyStore): 548 549 type = 'bip32' 550 551 def __init__(self, d): 552 Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint')) 553 Deterministic_KeyStore.__init__(self, d) 554 self.xpub = d.get('xpub') 555 self.xprv = d.get('xprv') 556 557 def format_seed(self, seed): 558 return ' '.join(seed.split()) 559 560 def dump(self): 561 d = Deterministic_KeyStore.dump(self) 562 d['xpub'] = self.xpub 563 d['xprv'] = self.xprv 564 d['derivation'] = self.get_derivation_prefix() 565 d['root_fingerprint'] = self.get_root_fingerprint() 566 return d 567 568 def get_master_private_key(self, password): 569 return pw_decode(self.xprv, password, version=self.pw_hash_version) 570 571 def check_password(self, password): 572 xprv = pw_decode(self.xprv, password, version=self.pw_hash_version) 573 try: 574 bip32node = BIP32Node.from_xkey(xprv) 575 except BaseDecodeError as e: 576 raise InvalidPassword() from e 577 if bip32node.chaincode != self.get_bip32_node_for_xpub().chaincode: 578 raise InvalidPassword() 579 580 def update_password(self, old_password, new_password): 581 self.check_password(old_password) 582 if new_password == '': 583 new_password = None 584 if self.has_seed(): 585 decoded = self.get_seed(old_password) 586 self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST) 587 if self.passphrase: 588 decoded = self.get_passphrase(old_password) 589 self.passphrase = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST) 590 if self.xprv is not None: 591 b = pw_decode(self.xprv, old_password, version=self.pw_hash_version) 592 self.xprv = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST) 593 self.pw_hash_version = PW_HASH_VERSION_LATEST 594 595 def is_watching_only(self): 596 return self.xprv is None 597 598 def add_xpub(self, xpub): 599 assert is_xpub(xpub) 600 self.xpub = xpub 601 root_fingerprint, derivation_prefix = bip32.root_fp_and_der_prefix_from_xkey(xpub) 602 self.add_key_origin(derivation_prefix=derivation_prefix, root_fingerprint=root_fingerprint) 603 604 def add_xprv(self, xprv): 605 assert is_xprv(xprv) 606 self.xprv = xprv 607 self.add_xpub(bip32.xpub_from_xprv(xprv)) 608 609 def add_xprv_from_seed(self, bip32_seed, xtype, derivation): 610 rootnode = BIP32Node.from_rootseed(bip32_seed, xtype=xtype) 611 node = rootnode.subkey_at_private_derivation(derivation) 612 self.add_xprv(node.to_xprv()) 613 self.add_key_origin_from_root_node(derivation_prefix=derivation, root_node=rootnode) 614 615 def get_private_key(self, sequence: Sequence[int], password): 616 xprv = self.get_master_private_key(password) 617 node = BIP32Node.from_xkey(xprv).subkey_at_private_derivation(sequence) 618 pk = node.eckey.get_secret_bytes() 619 return pk, True 620 621 def get_keypair(self, sequence, password): 622 k, _ = self.get_private_key(sequence, password) 623 cK = ecc.ECPrivkey(k).get_public_key_bytes() 624 return cK, k 625 626 def can_have_deterministic_lightning_xprv(self): 627 if (self.get_seed_type() == 'segwit' 628 and self.get_bip32_node_for_xpub().xtype == 'p2wpkh'): 629 return True 630 return False 631 632 def get_lightning_xprv(self, password) -> str: 633 assert self.can_have_deterministic_lightning_xprv() 634 xprv = self.get_master_private_key(password) 635 rootnode = BIP32Node.from_xkey(xprv) 636 node = rootnode.subkey_at_private_derivation("m/67'/") 637 return node.to_xprv() 638 639class Old_KeyStore(MasterPublicKeyMixin, Deterministic_KeyStore): 640 641 type = 'old' 642 643 def __init__(self, d): 644 Deterministic_KeyStore.__init__(self, d) 645 self.mpk = d.get('mpk') 646 self._root_fingerprint = None 647 648 def get_hex_seed(self, password): 649 return pw_decode(self.seed, password, version=self.pw_hash_version).encode('utf8') 650 651 def dump(self): 652 d = Deterministic_KeyStore.dump(self) 653 d['mpk'] = self.mpk 654 return d 655 656 def add_seed(self, seedphrase): 657 Deterministic_KeyStore.add_seed(self, seedphrase) 658 s = self.get_hex_seed(None) 659 self.mpk = self.mpk_from_seed(s) 660 661 def add_master_public_key(self, mpk): 662 self.mpk = mpk 663 664 def format_seed(self, seed): 665 from . import old_mnemonic, mnemonic 666 seed = mnemonic.normalize_text(seed) 667 # see if seed was entered as hex 668 if seed: 669 try: 670 bfh(seed) 671 return str(seed) 672 except Exception: 673 pass 674 words = seed.split() 675 seed = old_mnemonic.mn_decode(words) 676 if not seed: 677 raise Exception("Invalid seed") 678 return seed 679 680 def get_seed(self, password): 681 from . import old_mnemonic 682 s = self.get_hex_seed(password) 683 return ' '.join(old_mnemonic.mn_encode(s)) 684 685 @classmethod 686 def mpk_from_seed(klass, seed): 687 secexp = klass.stretch_key(seed) 688 privkey = ecc.ECPrivkey.from_secret_scalar(secexp) 689 return privkey.get_public_key_hex(compressed=False)[2:] 690 691 @classmethod 692 def stretch_key(self, seed): 693 x = seed 694 for i in range(100000): 695 x = hashlib.sha256(x + seed).digest() 696 return string_to_number(x) 697 698 @classmethod 699 def get_sequence(self, mpk, for_change, n): 700 return string_to_number(sha256d(("%d:%d:"%(n, for_change)).encode('ascii') + bfh(mpk))) 701 702 @classmethod 703 def get_pubkey_from_mpk(cls, mpk, for_change, n) -> bytes: 704 z = cls.get_sequence(mpk, for_change, n) 705 master_public_key = ecc.ECPubkey(bfh('04'+mpk)) 706 public_key = master_public_key + z*ecc.GENERATOR 707 return public_key.get_public_key_bytes(compressed=False) 708 709 @lru_cache(maxsize=None) 710 def derive_pubkey(self, for_change, n) -> bytes: 711 for_change = int(for_change) 712 if for_change not in (0, 1): 713 raise CannotDerivePubkey("forbidden path") 714 return self.get_pubkey_from_mpk(self.mpk, for_change, n) 715 716 def _get_private_key_from_stretched_exponent(self, for_change, n, secexp): 717 secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER 718 pk = int.to_bytes(secexp, length=32, byteorder='big', signed=False) 719 return pk 720 721 def get_private_key(self, sequence: Sequence[int], password): 722 seed = self.get_hex_seed(password) 723 secexp = self.stretch_key(seed) 724 self._check_seed(seed, secexp=secexp) 725 for_change, n = sequence 726 pk = self._get_private_key_from_stretched_exponent(for_change, n, secexp) 727 return pk, False 728 729 def _check_seed(self, seed, *, secexp=None): 730 if secexp is None: 731 secexp = self.stretch_key(seed) 732 master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp) 733 master_public_key = master_private_key.get_public_key_bytes(compressed=False)[1:] 734 if master_public_key != bfh(self.mpk): 735 raise InvalidPassword() 736 737 def check_password(self, password): 738 seed = self.get_hex_seed(password) 739 self._check_seed(seed) 740 741 def get_master_public_key(self): 742 return self.mpk 743 744 def get_derivation_prefix(self) -> str: 745 return 'm' 746 747 def get_root_fingerprint(self) -> str: 748 if self._root_fingerprint is None: 749 master_public_key = ecc.ECPubkey(bfh('04'+self.mpk)) 750 xfp = hash_160(master_public_key.get_public_key_bytes(compressed=True))[0:4] 751 self._root_fingerprint = xfp.hex().lower() 752 return self._root_fingerprint 753 754 def get_fp_and_derivation_to_be_used_in_partial_tx( 755 self, 756 der_suffix: Sequence[int], 757 *, 758 only_der_suffix: bool, 759 ) -> Tuple[bytes, Sequence[int]]: 760 fingerprint_hex = self.get_root_fingerprint() 761 der_prefix_str = self.get_derivation_prefix() 762 fingerprint_bytes = bfh(fingerprint_hex) 763 der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str) 764 der_full = der_prefix_ints + list(der_suffix) 765 return fingerprint_bytes, der_full 766 767 def update_password(self, old_password, new_password): 768 self.check_password(old_password) 769 if new_password == '': 770 new_password = None 771 if self.has_seed(): 772 decoded = pw_decode(self.seed, old_password, version=self.pw_hash_version) 773 self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST) 774 self.pw_hash_version = PW_HASH_VERSION_LATEST 775 776 777class Hardware_KeyStore(Xpub, KeyStore): 778 hw_type: str 779 device: str 780 plugin: 'HW_PluginBase' 781 thread: Optional['TaskThread'] = None 782 783 type = 'hardware' 784 785 def __init__(self, d): 786 Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint')) 787 KeyStore.__init__(self) 788 # Errors and other user interaction is done through the wallet's 789 # handler. The handler is per-window and preserved across 790 # device reconnects 791 self.xpub = d.get('xpub') 792 self.label = d.get('label') 793 self.soft_device_id = d.get('soft_device_id') # type: Optional[str] 794 self.handler = None # type: Optional[HardwareHandlerBase] 795 run_hook('init_keystore', self) 796 797 def set_label(self, label): 798 self.label = label 799 800 def may_have_password(self): 801 return False 802 803 def is_deterministic(self): 804 return True 805 806 def get_type_text(self) -> str: 807 return f'hw[{self.hw_type}]' 808 809 def dump(self): 810 return { 811 'type': self.type, 812 'hw_type': self.hw_type, 813 'xpub': self.xpub, 814 'derivation': self.get_derivation_prefix(), 815 'root_fingerprint': self.get_root_fingerprint(), 816 'label':self.label, 817 'soft_device_id': self.soft_device_id, 818 } 819 820 def unpaired(self): 821 '''A device paired with the wallet was disconnected. This can be 822 called in any thread context.''' 823 self.logger.info("unpaired") 824 825 def paired(self): 826 '''A device paired with the wallet was (re-)connected. This can be 827 called in any thread context.''' 828 self.logger.info("paired") 829 830 def is_watching_only(self): 831 '''The wallet is not watching-only; the user will be prompted for 832 pin and passphrase as appropriate when needed.''' 833 assert not self.has_seed() 834 return False 835 836 def get_password_for_storage_encryption(self) -> str: 837 client = self.plugin.get_client(self) 838 return client.get_password_for_storage_encryption() 839 840 def has_usable_connection_with_device(self) -> bool: 841 if not hasattr(self, 'plugin'): 842 return False 843 client = self.plugin.get_client(self, force_pair=False) 844 if client is None: 845 return False 846 return client.has_usable_connection_with_device() 847 848 def ready_to_sign(self): 849 return super().ready_to_sign() and self.has_usable_connection_with_device() 850 851 def opportunistically_fill_in_missing_info_from_device(self, client: 'HardwareClientBase'): 852 assert client is not None 853 if self._root_fingerprint is None: 854 self._root_fingerprint = client.request_root_fingerprint_from_device() 855 self.is_requesting_to_be_rewritten_to_wallet_file = True 856 if self.label != client.label(): 857 self.label = client.label() 858 self.is_requesting_to_be_rewritten_to_wallet_file = True 859 if self.soft_device_id != client.get_soft_device_id(): 860 self.soft_device_id = client.get_soft_device_id() 861 self.is_requesting_to_be_rewritten_to_wallet_file = True 862 863 864KeyStoreWithMPK = Union[KeyStore, MasterPublicKeyMixin] # intersection really... 865AddressIndexGeneric = Union[Sequence[int], str] # can be hex pubkey str 866 867 868def bip39_normalize_passphrase(passphrase): 869 return normalize('NFKD', passphrase or '') 870 871def bip39_to_seed(mnemonic, passphrase): 872 import hashlib, hmac 873 PBKDF2_ROUNDS = 2048 874 mnemonic = normalize('NFKD', ' '.join(mnemonic.split())) 875 passphrase = bip39_normalize_passphrase(passphrase) 876 return hashlib.pbkdf2_hmac('sha512', mnemonic.encode('utf-8'), 877 b'mnemonic' + passphrase.encode('utf-8'), iterations = PBKDF2_ROUNDS) 878 879 880def bip39_is_checksum_valid( 881 mnemonic: str, 882 *, 883 wordlist: Wordlist = None, 884) -> Tuple[bool, bool]: 885 """Test checksum of bip39 mnemonic assuming English wordlist. 886 Returns tuple (is_checksum_valid, is_wordlist_valid) 887 """ 888 words = [normalize('NFKD', word) for word in mnemonic.split()] 889 words_len = len(words) 890 if wordlist is None: 891 wordlist = Wordlist.from_file("english.txt") 892 n = len(wordlist) 893 i = 0 894 words.reverse() 895 while words: 896 w = words.pop() 897 try: 898 k = wordlist.index(w) 899 except ValueError: 900 return False, False 901 i = i*n + k 902 if words_len not in [12, 15, 18, 21, 24]: 903 return False, True 904 checksum_length = 11 * words_len // 33 # num bits 905 entropy_length = 32 * checksum_length # num bits 906 entropy = i >> checksum_length 907 checksum = i % 2**checksum_length 908 entropy_bytes = int.to_bytes(entropy, length=entropy_length//8, byteorder="big") 909 hashed = int.from_bytes(sha256(entropy_bytes), byteorder="big") 910 calculated_checksum = hashed >> (256 - checksum_length) 911 return checksum == calculated_checksum, True 912 913 914def from_bip43_rootseed(root_seed, derivation, xtype=None): 915 k = BIP32_KeyStore({}) 916 if xtype is None: 917 xtype = xtype_from_derivation(derivation) 918 k.add_xprv_from_seed(root_seed, xtype, derivation) 919 return k 920 921 922PURPOSE48_SCRIPT_TYPES = { 923 'p2wsh-p2sh': 1, # specifically multisig 924 'p2wsh': 2, # specifically multisig 925} 926PURPOSE48_SCRIPT_TYPES_INV = inv_dict(PURPOSE48_SCRIPT_TYPES) 927 928 929def xtype_from_derivation(derivation: str) -> str: 930 """Returns the script type to be used for this derivation.""" 931 bip32_indices = convert_bip32_path_to_list_of_uint32(derivation) 932 if len(bip32_indices) >= 1: 933 if bip32_indices[0] == 84 + BIP32_PRIME: 934 return 'p2wpkh' 935 elif bip32_indices[0] == 49 + BIP32_PRIME: 936 return 'p2wpkh-p2sh' 937 elif bip32_indices[0] == 44 + BIP32_PRIME: 938 return 'standard' 939 elif bip32_indices[0] == 45 + BIP32_PRIME: 940 return 'standard' 941 942 if len(bip32_indices) >= 4: 943 if bip32_indices[0] == 48 + BIP32_PRIME: 944 # m / purpose' / coin_type' / account' / script_type' / change / address_index 945 script_type_int = bip32_indices[3] - BIP32_PRIME 946 script_type = PURPOSE48_SCRIPT_TYPES_INV.get(script_type_int) 947 if script_type is not None: 948 return script_type 949 return 'standard' 950 951 952hw_keystores = {} 953 954def register_keystore(hw_type, constructor): 955 hw_keystores[hw_type] = constructor 956 957def hardware_keystore(d) -> Hardware_KeyStore: 958 hw_type = d['hw_type'] 959 if hw_type in hw_keystores: 960 constructor = hw_keystores[hw_type] 961 return constructor(d) 962 raise WalletFileException(f'unknown hardware type: {hw_type}. ' 963 f'hw_keystores: {list(hw_keystores)}') 964 965def load_keystore(db: 'WalletDB', name: str) -> KeyStore: 966 d = db.get(name, {}) 967 t = d.get('type') 968 if not t: 969 raise WalletFileException( 970 'Wallet format requires update.\n' 971 'Cannot find keystore for name {}'.format(name)) 972 keystore_constructors = {ks.type: ks for ks in [Old_KeyStore, Imported_KeyStore, BIP32_KeyStore]} 973 keystore_constructors['hardware'] = hardware_keystore 974 try: 975 ks_constructor = keystore_constructors[t] 976 except KeyError: 977 raise WalletFileException(f'Unknown type {t} for keystore named {name}') 978 k = ks_constructor(d) 979 return k 980 981 982def is_old_mpk(mpk: str) -> bool: 983 try: 984 int(mpk, 16) # test if hex string 985 except: 986 return False 987 if len(mpk) != 128: 988 return False 989 try: 990 ecc.ECPubkey(bfh('04' + mpk)) 991 except: 992 return False 993 return True 994 995 996def is_address_list(text): 997 parts = text.split() 998 return bool(parts) and all(bitcoin.is_address(x) for x in parts) 999 1000 1001def get_private_keys(text, *, allow_spaces_inside_key=True, raise_on_error=False): 1002 if allow_spaces_inside_key: # see #1612 1003 parts = text.split('\n') 1004 parts = map(lambda x: ''.join(x.split()), parts) 1005 parts = list(filter(bool, parts)) 1006 else: 1007 parts = text.split() 1008 if bool(parts) and all(bitcoin.is_private_key(x, raise_on_error=raise_on_error) for x in parts): 1009 return parts 1010 1011 1012def is_private_key_list(text, *, allow_spaces_inside_key=True, raise_on_error=False): 1013 return bool(get_private_keys(text, 1014 allow_spaces_inside_key=allow_spaces_inside_key, 1015 raise_on_error=raise_on_error)) 1016 1017 1018def is_master_key(x): 1019 return is_old_mpk(x) or is_bip32_key(x) 1020 1021 1022def is_bip32_key(x): 1023 return is_xprv(x) or is_xpub(x) 1024 1025 1026def bip44_derivation(account_id, bip43_purpose=44): 1027 coin = constants.net.BIP44_COIN_TYPE 1028 der = "m/%d'/%d'/%d'" % (bip43_purpose, coin, int(account_id)) 1029 return normalize_bip32_derivation(der) 1030 1031 1032def purpose48_derivation(account_id: int, xtype: str) -> str: 1033 # m / purpose' / coin_type' / account' / script_type' / change / address_index 1034 bip43_purpose = 48 1035 coin = constants.net.BIP44_COIN_TYPE 1036 account_id = int(account_id) 1037 script_type_int = PURPOSE48_SCRIPT_TYPES.get(xtype) 1038 if script_type_int is None: 1039 raise Exception('unknown xtype: {}'.format(xtype)) 1040 der = "m/%d'/%d'/%d'/%d'" % (bip43_purpose, coin, account_id, script_type_int) 1041 return normalize_bip32_derivation(der) 1042 1043 1044def from_seed(seed, passphrase, is_p2sh=False): 1045 t = seed_type(seed) 1046 if t == 'old': 1047 keystore = Old_KeyStore({}) 1048 keystore.add_seed(seed) 1049 elif t in ['standard', 'segwit']: 1050 keystore = BIP32_KeyStore({}) 1051 keystore.add_seed(seed) 1052 keystore.passphrase = passphrase 1053 bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase) 1054 if t == 'standard': 1055 der = "m/" 1056 xtype = 'standard' 1057 else: 1058 der = "m/1'/" if is_p2sh else "m/0'/" 1059 xtype = 'p2wsh' if is_p2sh else 'p2wpkh' 1060 keystore.add_xprv_from_seed(bip32_seed, xtype, der) 1061 else: 1062 raise BitcoinException('Unexpected seed type {}'.format(repr(t))) 1063 return keystore 1064 1065def from_private_key_list(text): 1066 keystore = Imported_KeyStore({}) 1067 for x in get_private_keys(text): 1068 keystore.import_privkey(x, None) 1069 return keystore 1070 1071def from_old_mpk(mpk): 1072 keystore = Old_KeyStore({}) 1073 keystore.add_master_public_key(mpk) 1074 return keystore 1075 1076def from_xpub(xpub): 1077 k = BIP32_KeyStore({}) 1078 k.add_xpub(xpub) 1079 return k 1080 1081def from_xprv(xprv): 1082 k = BIP32_KeyStore({}) 1083 k.add_xprv(xprv) 1084 return k 1085 1086def from_master_key(text): 1087 if is_xprv(text): 1088 k = from_xprv(text) 1089 elif is_old_mpk(text): 1090 k = from_old_mpk(text) 1091 elif is_xpub(text): 1092 k = from_xpub(text) 1093 else: 1094 raise BitcoinException('Invalid master key') 1095 return k 1096