1#!/usr/local/bin/python3.8
2# -*- mode: python -*-
3#
4# Electrum - lightweight Bitcoin client
5# Copyright (C) 2016  The Electrum developers
6#
7# Permission is hereby granted, free of charge, to any person
8# obtaining a copy of this software and associated documentation files
9# (the "Software"), to deal in the Software without restriction,
10# including without limitation the rights to use, copy, modify, merge,
11# publish, distribute, sublicense, and/or sell copies of the Software,
12# and to permit persons to whom the Software is furnished to do so,
13# subject to the following conditions:
14#
15# The above copyright notice and this permission notice shall be
16# included in all copies or substantial portions of the Software.
17#
18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
22# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
23# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
24# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25# SOFTWARE.
26
27from unicodedata import normalize
28import hashlib
29import re
30from typing import Tuple, TYPE_CHECKING, Union, Sequence, Optional, Dict, List, NamedTuple
31from functools import lru_cache
32from abc import ABC, abstractmethod
33
34from . import bitcoin, ecc, constants, bip32
35from .bitcoin import deserialize_privkey, serialize_privkey, BaseDecodeError
36from .transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput, TxInput
37from .bip32 import (convert_bip32_path_to_list_of_uint32, BIP32_PRIME,
38                    is_xpub, is_xprv, BIP32Node, normalize_bip32_derivation,
39                    convert_bip32_intpath_to_strpath, is_xkey_consistent_with_key_origin_info)
40from .ecc import string_to_number
41from .crypto import (pw_decode, pw_encode, sha256, sha256d, PW_HASH_VERSION_LATEST,
42                     SUPPORTED_PW_HASH_VERSIONS, UnsupportedPasswordHashVersion, hash_160)
43from .util import (InvalidPassword, WalletFileException,
44                   BitcoinException, bh2u, bfh, inv_dict, is_hex_str)
45from .mnemonic import Mnemonic, Wordlist, seed_type, is_seed
46from .plugin import run_hook
47from .logging import Logger
48
49if TYPE_CHECKING:
50    from .gui.qt.util import TaskThread
51    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
52    from .wallet_db import WalletDB
53
54
55class CannotDerivePubkey(Exception): pass
56
57
58class KeyStore(Logger, ABC):
59    type: str
60
61    def __init__(self):
62        Logger.__init__(self)
63        self.is_requesting_to_be_rewritten_to_wallet_file = False  # type: bool
64
65    def has_seed(self) -> bool:
66        return False
67
68    def is_watching_only(self) -> bool:
69        return False
70
71    def can_import(self) -> bool:
72        return False
73
74    def get_type_text(self) -> str:
75        return f'{self.type}'
76
77    @abstractmethod
78    def may_have_password(self):
79        """Returns whether the keystore can be encrypted with a password."""
80        pass
81
82    def _get_tx_derivations(self, tx: 'PartialTransaction') -> Dict[str, Union[Sequence[int], str]]:
83        keypairs = {}
84        for txin in tx.inputs():
85            keypairs.update(self._get_txin_derivations(txin))
86        return keypairs
87
88    def _get_txin_derivations(self, txin: 'PartialTxInput') -> Dict[str, Union[Sequence[int], str]]:
89        if txin.is_complete():
90            return {}
91        keypairs = {}
92        for pubkey in txin.pubkeys:
93            if pubkey in txin.part_sigs:
94                # this pubkey already signed
95                continue
96            derivation = self.get_pubkey_derivation(pubkey, txin)
97            if not derivation:
98                continue
99            keypairs[pubkey.hex()] = derivation
100        return keypairs
101
102    def can_sign(self, tx: 'Transaction', *, ignore_watching_only=False) -> bool:
103        """Returns whether this keystore could sign *something* in this tx."""
104        if not ignore_watching_only and self.is_watching_only():
105            return False
106        if not isinstance(tx, PartialTransaction):
107            return False
108        return bool(self._get_tx_derivations(tx))
109
110    def can_sign_txin(self, txin: 'TxInput', *, ignore_watching_only=False) -> bool:
111        """Returns whether this keystore could sign this txin."""
112        if not ignore_watching_only and self.is_watching_only():
113            return False
114        if not isinstance(txin, PartialTxInput):
115            return False
116        return bool(self._get_txin_derivations(txin))
117
118    def ready_to_sign(self) -> bool:
119        return not self.is_watching_only()
120
121    @abstractmethod
122    def dump(self) -> dict:
123        pass
124
125    @abstractmethod
126    def is_deterministic(self) -> bool:
127        pass
128
129    @abstractmethod
130    def sign_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes:
131        pass
132
133    @abstractmethod
134    def decrypt_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes:
135        pass
136
137    @abstractmethod
138    def sign_transaction(self, tx: 'PartialTransaction', password) -> None:
139        pass
140
141    @abstractmethod
142    def get_pubkey_derivation(self, pubkey: bytes,
143                              txinout: Union['PartialTxInput', 'PartialTxOutput'],
144                              *, only_der_suffix=True) \
145            -> Union[Sequence[int], str, None]:
146        """Returns either a derivation int-list if the pubkey can be HD derived from this keystore,
147        the pubkey itself (hex) if the pubkey belongs to the keystore but not HD derived,
148        or None if the pubkey is unrelated.
149        """
150        pass
151
152    def find_my_pubkey_in_txinout(
153            self, txinout: Union['PartialTxInput', 'PartialTxOutput'],
154            *, only_der_suffix: bool = False
155    ) -> Tuple[Optional[bytes], Optional[List[int]]]:
156        # note: we assume that this cosigner only has one pubkey in this txin/txout
157        for pubkey in txinout.bip32_paths:
158            path = self.get_pubkey_derivation(pubkey, txinout, only_der_suffix=only_der_suffix)
159            if path and not isinstance(path, (str, bytes)):
160                return pubkey, list(path)
161        return None, None
162
163    def can_have_deterministic_lightning_xprv(self) -> bool:
164        return False
165
166
167class Software_KeyStore(KeyStore):
168
169    def __init__(self, d):
170        KeyStore.__init__(self)
171        self.pw_hash_version = d.get('pw_hash_version', 1)
172        if self.pw_hash_version not in SUPPORTED_PW_HASH_VERSIONS:
173            raise UnsupportedPasswordHashVersion(self.pw_hash_version)
174
175    def may_have_password(self):
176        return not self.is_watching_only()
177
178    def sign_message(self, sequence, message, password) -> bytes:
179        privkey, compressed = self.get_private_key(sequence, password)
180        key = ecc.ECPrivkey(privkey)
181        return key.sign_message(message, compressed)
182
183    def decrypt_message(self, sequence, message, password) -> bytes:
184        privkey, compressed = self.get_private_key(sequence, password)
185        ec = ecc.ECPrivkey(privkey)
186        decrypted = ec.decrypt_message(message)
187        return decrypted
188
189    def sign_transaction(self, tx, password):
190        if self.is_watching_only():
191            return
192        # Raise if password is not correct.
193        self.check_password(password)
194        # Add private keys
195        keypairs = self._get_tx_derivations(tx)
196        for k, v in keypairs.items():
197            keypairs[k] = self.get_private_key(v, password)
198        # Sign
199        if keypairs:
200            tx.sign(keypairs)
201
202    @abstractmethod
203    def update_password(self, old_password, new_password):
204        pass
205
206    @abstractmethod
207    def check_password(self, password):
208        pass
209
210    @abstractmethod
211    def get_private_key(self, sequence: 'AddressIndexGeneric', password) -> Tuple[bytes, bool]:
212        """Returns (privkey, is_compressed)"""
213        pass
214
215
216class Imported_KeyStore(Software_KeyStore):
217    # keystore for imported private keys
218
219    type = 'imported'
220
221    def __init__(self, d):
222        Software_KeyStore.__init__(self, d)
223        self.keypairs = d.get('keypairs', {})  # type: Dict[str, str]
224
225    def is_deterministic(self):
226        return False
227
228    def dump(self):
229        return {
230            'type': self.type,
231            'keypairs': self.keypairs,
232            'pw_hash_version': self.pw_hash_version,
233        }
234
235    def can_import(self):
236        return True
237
238    def check_password(self, password):
239        pubkey = list(self.keypairs.keys())[0]
240        self.get_private_key(pubkey, password)
241
242    def import_privkey(self, sec, password):
243        txin_type, privkey, compressed = deserialize_privkey(sec)
244        pubkey = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
245        # re-serialize the key so the internal storage format is consistent
246        serialized_privkey = serialize_privkey(
247            privkey, compressed, txin_type, internal_use=True)
248        # NOTE: if the same pubkey is reused for multiple addresses (script types),
249        # there will only be one pubkey-privkey pair for it in self.keypairs,
250        # and the privkey will encode a txin_type but that txin_type cannot be trusted.
251        # Removing keys complicates this further.
252        self.keypairs[pubkey] = pw_encode(serialized_privkey, password, version=self.pw_hash_version)
253        return txin_type, pubkey
254
255    def delete_imported_key(self, key):
256        self.keypairs.pop(key)
257
258    def get_private_key(self, pubkey: str, password):
259        sec = pw_decode(self.keypairs[pubkey], password, version=self.pw_hash_version)
260        try:
261            txin_type, privkey, compressed = deserialize_privkey(sec)
262        except BaseDecodeError as e:
263            raise InvalidPassword() from e
264        if pubkey != ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed):
265            raise InvalidPassword()
266        return privkey, compressed
267
268    def get_pubkey_derivation(self, pubkey, txin, *, only_der_suffix=True):
269        if pubkey.hex() in self.keypairs:
270            return pubkey.hex()
271        return None
272
273    def update_password(self, old_password, new_password):
274        self.check_password(old_password)
275        if new_password == '':
276            new_password = None
277        for k, v in self.keypairs.items():
278            b = pw_decode(v, old_password, version=self.pw_hash_version)
279            c = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
280            self.keypairs[k] = c
281        self.pw_hash_version = PW_HASH_VERSION_LATEST
282
283
284class Deterministic_KeyStore(Software_KeyStore):
285
286    def __init__(self, d):
287        Software_KeyStore.__init__(self, d)
288        self.seed = d.get('seed', '')  # only electrum seeds
289        self.passphrase = d.get('passphrase', '')
290        self._seed_type = d.get('seed_type', None)  # only electrum seeds
291
292    def is_deterministic(self):
293        return True
294
295    def dump(self):
296        d = {
297            'type': self.type,
298            'pw_hash_version': self.pw_hash_version,
299        }
300        if self.seed:
301            d['seed'] = self.seed
302        if self.passphrase:
303            d['passphrase'] = self.passphrase
304        if self._seed_type:
305            d['seed_type'] = self._seed_type
306        return d
307
308    def has_seed(self):
309        return bool(self.seed)
310
311    def get_seed_type(self) -> Optional[str]:
312        return self._seed_type
313
314    def is_watching_only(self):
315        return not self.has_seed()
316
317    @abstractmethod
318    def format_seed(self, seed: str) -> str:
319        pass
320
321    def add_seed(self, seed):
322        if self.seed:
323            raise Exception("a seed exists")
324        self.seed = self.format_seed(seed)
325        self._seed_type = seed_type(seed) or None
326
327    def get_seed(self, password):
328        if not self.has_seed():
329            raise Exception("This wallet has no seed words")
330        return pw_decode(self.seed, password, version=self.pw_hash_version)
331
332    def get_passphrase(self, password):
333        if self.passphrase:
334            return pw_decode(self.passphrase, password, version=self.pw_hash_version)
335        else:
336            return ''
337
338
339class MasterPublicKeyMixin(ABC):
340
341    @abstractmethod
342    def get_master_public_key(self) -> str:
343        pass
344
345    @abstractmethod
346    def get_derivation_prefix(self) -> Optional[str]:
347        """Returns to bip32 path from some root node to self.xpub
348        Note that the return value might be None; if it is unknown.
349        """
350        pass
351
352    @abstractmethod
353    def get_root_fingerprint(self) -> Optional[str]:
354        """Returns the bip32 fingerprint of the top level node.
355        This top level node is the node at the beginning of the derivation prefix,
356        i.e. applying the derivation prefix to it will result self.xpub
357        Note that the return value might be None; if it is unknown.
358        """
359        pass
360
361    @abstractmethod
362    def get_fp_and_derivation_to_be_used_in_partial_tx(
363            self,
364            der_suffix: Sequence[int],
365            *,
366            only_der_suffix: bool,
367    ) -> Tuple[bytes, Sequence[int]]:
368        """Returns fingerprint and derivation path corresponding to a derivation suffix.
369        The fingerprint is either the root fp or the intermediate fp, depending on what is available
370        and 'only_der_suffix', and the derivation path is adjusted accordingly.
371        """
372        pass
373
374    @abstractmethod
375    def derive_pubkey(self, for_change: int, n: int) -> bytes:
376        """Returns pubkey at given path.
377        May raise CannotDerivePubkey.
378        """
379        pass
380
381    def get_pubkey_derivation(
382            self,
383            pubkey: bytes,
384            txinout: Union['PartialTxInput', 'PartialTxOutput'],
385            *,
386            only_der_suffix=True,
387    ) -> Union[Sequence[int], str, None]:
388        EXPECTED_DER_SUFFIX_LEN = 2
389        def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool:
390            if len(der_suffix) != EXPECTED_DER_SUFFIX_LEN:
391                return False
392            try:
393                if pubkey != self.derive_pubkey(*der_suffix):
394                    return False
395            except CannotDerivePubkey:
396                return False
397            return True
398
399        if pubkey not in txinout.bip32_paths:
400            return None
401        fp_found, path_found = txinout.bip32_paths[pubkey]
402        der_suffix = None
403        full_path = None
404        # 1. try fp against our root
405        ks_root_fingerprint_hex = self.get_root_fingerprint()
406        ks_der_prefix_str = self.get_derivation_prefix()
407        ks_der_prefix = convert_bip32_path_to_list_of_uint32(ks_der_prefix_str) if ks_der_prefix_str else None
408        if (ks_root_fingerprint_hex is not None and ks_der_prefix is not None and
409                fp_found.hex() == ks_root_fingerprint_hex):
410            if path_found[:len(ks_der_prefix)] == ks_der_prefix:
411                der_suffix = path_found[len(ks_der_prefix):]
412                if not test_der_suffix_against_pubkey(der_suffix, pubkey):
413                    der_suffix = None
414        # 2. try fp against our intermediate fingerprint
415        if (der_suffix is None and isinstance(self, Xpub) and
416                fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()):
417            der_suffix = path_found
418            if not test_der_suffix_against_pubkey(der_suffix, pubkey):
419                der_suffix = None
420        # 3. hack/bruteforce: ignore fp and check pubkey anyway
421        #    This is only to resolve the following scenario/problem:
422        #    problem: if we don't know our root fp, but tx contains root fp and full path,
423        #             we will miss the pubkey (false negative match). Though it might still work
424        #             within gap limit due to tx.add_info_from_wallet overwriting the fields.
425        #             Example: keystore has intermediate xprv without root fp; tx contains root fp and full path.
426        if der_suffix is None:
427            der_suffix = path_found[-EXPECTED_DER_SUFFIX_LEN:]
428            if not test_der_suffix_against_pubkey(der_suffix, pubkey):
429                der_suffix = None
430        # if all attempts/methods failed, we give up now:
431        if der_suffix is None:
432            return None
433        if ks_der_prefix is not None:
434            full_path = ks_der_prefix + list(der_suffix)
435        return der_suffix if only_der_suffix else full_path
436
437
438class Xpub(MasterPublicKeyMixin):
439
440    def __init__(self, *, derivation_prefix: str = None, root_fingerprint: str = None):
441        self.xpub = None
442        self.xpub_receive = None
443        self.xpub_change = None
444        self._xpub_bip32_node = None  # type: Optional[BIP32Node]
445
446        # "key origin" info (subclass should persist these):
447        self._derivation_prefix = derivation_prefix  # type: Optional[str]
448        self._root_fingerprint = root_fingerprint  # type: Optional[str]
449
450    def get_master_public_key(self):
451        return self.xpub
452
453    def get_bip32_node_for_xpub(self) -> Optional[BIP32Node]:
454        if self._xpub_bip32_node is None:
455            if self.xpub is None:
456                return None
457            self._xpub_bip32_node = BIP32Node.from_xkey(self.xpub)
458        return self._xpub_bip32_node
459
460    def get_derivation_prefix(self) -> Optional[str]:
461        return self._derivation_prefix
462
463    def get_root_fingerprint(self) -> Optional[str]:
464        return self._root_fingerprint
465
466    def get_fp_and_derivation_to_be_used_in_partial_tx(
467            self,
468            der_suffix: Sequence[int],
469            *,
470            only_der_suffix: bool,
471    ) -> Tuple[bytes, Sequence[int]]:
472        fingerprint_hex = self.get_root_fingerprint()
473        der_prefix_str = self.get_derivation_prefix()
474        if not only_der_suffix and fingerprint_hex is not None and der_prefix_str is not None:
475            # use root fp, and true full path
476            fingerprint_bytes = bfh(fingerprint_hex)
477            der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str)
478        else:
479            # use intermediate fp, and claim der suffix is the full path
480            fingerprint_bytes = self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()
481            der_prefix_ints = convert_bip32_path_to_list_of_uint32('m')
482        der_full = der_prefix_ints + list(der_suffix)
483        return fingerprint_bytes, der_full
484
485    def get_xpub_to_be_used_in_partial_tx(self, *, only_der_suffix: bool) -> str:
486        assert self.xpub
487        fp_bytes, der_full = self.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[],
488                                                                                 only_der_suffix=only_der_suffix)
489        bip32node = self.get_bip32_node_for_xpub()
490        depth = len(der_full)
491        child_number_int = der_full[-1] if len(der_full) >= 1 else 0
492        child_number_bytes = child_number_int.to_bytes(length=4, byteorder="big")
493        fingerprint = bytes(4) if depth == 0 else bip32node.fingerprint
494        bip32node = bip32node._replace(depth=depth,
495                                       fingerprint=fingerprint,
496                                       child_number=child_number_bytes)
497        return bip32node.to_xpub()
498
499    def add_key_origin_from_root_node(self, *, derivation_prefix: str, root_node: BIP32Node):
500        assert self.xpub
501        # try to derive ourselves from what we were given
502        child_node1 = root_node.subkey_at_private_derivation(derivation_prefix)
503        child_pubkey_bytes1 = child_node1.eckey.get_public_key_bytes(compressed=True)
504        child_node2 = self.get_bip32_node_for_xpub()
505        child_pubkey_bytes2 = child_node2.eckey.get_public_key_bytes(compressed=True)
506        if child_pubkey_bytes1 != child_pubkey_bytes2:
507            raise Exception("(xpub, derivation_prefix, root_node) inconsistency")
508        self.add_key_origin(derivation_prefix=derivation_prefix,
509                            root_fingerprint=root_node.calc_fingerprint_of_this_node().hex().lower())
510
511    def add_key_origin(self, *, derivation_prefix: str = None, root_fingerprint: str = None) -> None:
512        assert self.xpub
513        if not (root_fingerprint is None or (is_hex_str(root_fingerprint) and len(root_fingerprint) == 8)):
514            raise Exception("root fp must be 8 hex characters")
515        derivation_prefix = normalize_bip32_derivation(derivation_prefix)
516        if not is_xkey_consistent_with_key_origin_info(self.xpub,
517                                                       derivation_prefix=derivation_prefix,
518                                                       root_fingerprint=root_fingerprint):
519            raise Exception("xpub inconsistent with provided key origin info")
520        if root_fingerprint is not None:
521            self._root_fingerprint = root_fingerprint
522        if derivation_prefix is not None:
523            self._derivation_prefix = derivation_prefix
524        self.is_requesting_to_be_rewritten_to_wallet_file = True
525
526    @lru_cache(maxsize=None)
527    def derive_pubkey(self, for_change: int, n: int) -> bytes:
528        for_change = int(for_change)
529        if for_change not in (0, 1):
530            raise CannotDerivePubkey("forbidden path")
531        xpub = self.xpub_change if for_change else self.xpub_receive
532        if xpub is None:
533            rootnode = self.get_bip32_node_for_xpub()
534            xpub = rootnode.subkey_at_public_derivation((for_change,)).to_xpub()
535            if for_change:
536                self.xpub_change = xpub
537            else:
538                self.xpub_receive = xpub
539        return self.get_pubkey_from_xpub(xpub, (n,))
540
541    @classmethod
542    def get_pubkey_from_xpub(self, xpub: str, sequence) -> bytes:
543        node = BIP32Node.from_xkey(xpub).subkey_at_public_derivation(sequence)
544        return node.eckey.get_public_key_bytes(compressed=True)
545
546
547class BIP32_KeyStore(Xpub, Deterministic_KeyStore):
548
549    type = 'bip32'
550
551    def __init__(self, d):
552        Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
553        Deterministic_KeyStore.__init__(self, d)
554        self.xpub = d.get('xpub')
555        self.xprv = d.get('xprv')
556
557    def format_seed(self, seed):
558        return ' '.join(seed.split())
559
560    def dump(self):
561        d = Deterministic_KeyStore.dump(self)
562        d['xpub'] = self.xpub
563        d['xprv'] = self.xprv
564        d['derivation'] = self.get_derivation_prefix()
565        d['root_fingerprint'] = self.get_root_fingerprint()
566        return d
567
568    def get_master_private_key(self, password):
569        return pw_decode(self.xprv, password, version=self.pw_hash_version)
570
571    def check_password(self, password):
572        xprv = pw_decode(self.xprv, password, version=self.pw_hash_version)
573        try:
574            bip32node = BIP32Node.from_xkey(xprv)
575        except BaseDecodeError as e:
576            raise InvalidPassword() from e
577        if bip32node.chaincode != self.get_bip32_node_for_xpub().chaincode:
578            raise InvalidPassword()
579
580    def update_password(self, old_password, new_password):
581        self.check_password(old_password)
582        if new_password == '':
583            new_password = None
584        if self.has_seed():
585            decoded = self.get_seed(old_password)
586            self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
587        if self.passphrase:
588            decoded = self.get_passphrase(old_password)
589            self.passphrase = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
590        if self.xprv is not None:
591            b = pw_decode(self.xprv, old_password, version=self.pw_hash_version)
592            self.xprv = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
593        self.pw_hash_version = PW_HASH_VERSION_LATEST
594
595    def is_watching_only(self):
596        return self.xprv is None
597
598    def add_xpub(self, xpub):
599        assert is_xpub(xpub)
600        self.xpub = xpub
601        root_fingerprint, derivation_prefix = bip32.root_fp_and_der_prefix_from_xkey(xpub)
602        self.add_key_origin(derivation_prefix=derivation_prefix, root_fingerprint=root_fingerprint)
603
604    def add_xprv(self, xprv):
605        assert is_xprv(xprv)
606        self.xprv = xprv
607        self.add_xpub(bip32.xpub_from_xprv(xprv))
608
609    def add_xprv_from_seed(self, bip32_seed, xtype, derivation):
610        rootnode = BIP32Node.from_rootseed(bip32_seed, xtype=xtype)
611        node = rootnode.subkey_at_private_derivation(derivation)
612        self.add_xprv(node.to_xprv())
613        self.add_key_origin_from_root_node(derivation_prefix=derivation, root_node=rootnode)
614
615    def get_private_key(self, sequence: Sequence[int], password):
616        xprv = self.get_master_private_key(password)
617        node = BIP32Node.from_xkey(xprv).subkey_at_private_derivation(sequence)
618        pk = node.eckey.get_secret_bytes()
619        return pk, True
620
621    def get_keypair(self, sequence, password):
622        k, _ = self.get_private_key(sequence, password)
623        cK = ecc.ECPrivkey(k).get_public_key_bytes()
624        return cK, k
625
626    def can_have_deterministic_lightning_xprv(self):
627        if (self.get_seed_type() == 'segwit'
628                and self.get_bip32_node_for_xpub().xtype == 'p2wpkh'):
629            return True
630        return False
631
632    def get_lightning_xprv(self, password) -> str:
633        assert self.can_have_deterministic_lightning_xprv()
634        xprv = self.get_master_private_key(password)
635        rootnode = BIP32Node.from_xkey(xprv)
636        node = rootnode.subkey_at_private_derivation("m/67'/")
637        return node.to_xprv()
638
639class Old_KeyStore(MasterPublicKeyMixin, Deterministic_KeyStore):
640
641    type = 'old'
642
643    def __init__(self, d):
644        Deterministic_KeyStore.__init__(self, d)
645        self.mpk = d.get('mpk')
646        self._root_fingerprint = None
647
648    def get_hex_seed(self, password):
649        return pw_decode(self.seed, password, version=self.pw_hash_version).encode('utf8')
650
651    def dump(self):
652        d = Deterministic_KeyStore.dump(self)
653        d['mpk'] = self.mpk
654        return d
655
656    def add_seed(self, seedphrase):
657        Deterministic_KeyStore.add_seed(self, seedphrase)
658        s = self.get_hex_seed(None)
659        self.mpk = self.mpk_from_seed(s)
660
661    def add_master_public_key(self, mpk):
662        self.mpk = mpk
663
664    def format_seed(self, seed):
665        from . import old_mnemonic, mnemonic
666        seed = mnemonic.normalize_text(seed)
667        # see if seed was entered as hex
668        if seed:
669            try:
670                bfh(seed)
671                return str(seed)
672            except Exception:
673                pass
674        words = seed.split()
675        seed = old_mnemonic.mn_decode(words)
676        if not seed:
677            raise Exception("Invalid seed")
678        return seed
679
680    def get_seed(self, password):
681        from . import old_mnemonic
682        s = self.get_hex_seed(password)
683        return ' '.join(old_mnemonic.mn_encode(s))
684
685    @classmethod
686    def mpk_from_seed(klass, seed):
687        secexp = klass.stretch_key(seed)
688        privkey = ecc.ECPrivkey.from_secret_scalar(secexp)
689        return privkey.get_public_key_hex(compressed=False)[2:]
690
691    @classmethod
692    def stretch_key(self, seed):
693        x = seed
694        for i in range(100000):
695            x = hashlib.sha256(x + seed).digest()
696        return string_to_number(x)
697
698    @classmethod
699    def get_sequence(self, mpk, for_change, n):
700        return string_to_number(sha256d(("%d:%d:"%(n, for_change)).encode('ascii') + bfh(mpk)))
701
702    @classmethod
703    def get_pubkey_from_mpk(cls, mpk, for_change, n) -> bytes:
704        z = cls.get_sequence(mpk, for_change, n)
705        master_public_key = ecc.ECPubkey(bfh('04'+mpk))
706        public_key = master_public_key + z*ecc.GENERATOR
707        return public_key.get_public_key_bytes(compressed=False)
708
709    @lru_cache(maxsize=None)
710    def derive_pubkey(self, for_change, n) -> bytes:
711        for_change = int(for_change)
712        if for_change not in (0, 1):
713            raise CannotDerivePubkey("forbidden path")
714        return self.get_pubkey_from_mpk(self.mpk, for_change, n)
715
716    def _get_private_key_from_stretched_exponent(self, for_change, n, secexp):
717        secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER
718        pk = int.to_bytes(secexp, length=32, byteorder='big', signed=False)
719        return pk
720
721    def get_private_key(self, sequence: Sequence[int], password):
722        seed = self.get_hex_seed(password)
723        secexp = self.stretch_key(seed)
724        self._check_seed(seed, secexp=secexp)
725        for_change, n = sequence
726        pk = self._get_private_key_from_stretched_exponent(for_change, n, secexp)
727        return pk, False
728
729    def _check_seed(self, seed, *, secexp=None):
730        if secexp is None:
731            secexp = self.stretch_key(seed)
732        master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp)
733        master_public_key = master_private_key.get_public_key_bytes(compressed=False)[1:]
734        if master_public_key != bfh(self.mpk):
735            raise InvalidPassword()
736
737    def check_password(self, password):
738        seed = self.get_hex_seed(password)
739        self._check_seed(seed)
740
741    def get_master_public_key(self):
742        return self.mpk
743
744    def get_derivation_prefix(self) -> str:
745        return 'm'
746
747    def get_root_fingerprint(self) -> str:
748        if self._root_fingerprint is None:
749            master_public_key = ecc.ECPubkey(bfh('04'+self.mpk))
750            xfp = hash_160(master_public_key.get_public_key_bytes(compressed=True))[0:4]
751            self._root_fingerprint = xfp.hex().lower()
752        return self._root_fingerprint
753
754    def get_fp_and_derivation_to_be_used_in_partial_tx(
755            self,
756            der_suffix: Sequence[int],
757            *,
758            only_der_suffix: bool,
759    ) -> Tuple[bytes, Sequence[int]]:
760        fingerprint_hex = self.get_root_fingerprint()
761        der_prefix_str = self.get_derivation_prefix()
762        fingerprint_bytes = bfh(fingerprint_hex)
763        der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str)
764        der_full = der_prefix_ints + list(der_suffix)
765        return fingerprint_bytes, der_full
766
767    def update_password(self, old_password, new_password):
768        self.check_password(old_password)
769        if new_password == '':
770            new_password = None
771        if self.has_seed():
772            decoded = pw_decode(self.seed, old_password, version=self.pw_hash_version)
773            self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
774        self.pw_hash_version = PW_HASH_VERSION_LATEST
775
776
777class Hardware_KeyStore(Xpub, KeyStore):
778    hw_type: str
779    device: str
780    plugin: 'HW_PluginBase'
781    thread: Optional['TaskThread'] = None
782
783    type = 'hardware'
784
785    def __init__(self, d):
786        Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
787        KeyStore.__init__(self)
788        # Errors and other user interaction is done through the wallet's
789        # handler.  The handler is per-window and preserved across
790        # device reconnects
791        self.xpub = d.get('xpub')
792        self.label = d.get('label')
793        self.soft_device_id = d.get('soft_device_id')  # type: Optional[str]
794        self.handler = None  # type: Optional[HardwareHandlerBase]
795        run_hook('init_keystore', self)
796
797    def set_label(self, label):
798        self.label = label
799
800    def may_have_password(self):
801        return False
802
803    def is_deterministic(self):
804        return True
805
806    def get_type_text(self) -> str:
807        return f'hw[{self.hw_type}]'
808
809    def dump(self):
810        return {
811            'type': self.type,
812            'hw_type': self.hw_type,
813            'xpub': self.xpub,
814            'derivation': self.get_derivation_prefix(),
815            'root_fingerprint': self.get_root_fingerprint(),
816            'label':self.label,
817            'soft_device_id': self.soft_device_id,
818        }
819
820    def unpaired(self):
821        '''A device paired with the wallet was disconnected.  This can be
822        called in any thread context.'''
823        self.logger.info("unpaired")
824
825    def paired(self):
826        '''A device paired with the wallet was (re-)connected.  This can be
827        called in any thread context.'''
828        self.logger.info("paired")
829
830    def is_watching_only(self):
831        '''The wallet is not watching-only; the user will be prompted for
832        pin and passphrase as appropriate when needed.'''
833        assert not self.has_seed()
834        return False
835
836    def get_password_for_storage_encryption(self) -> str:
837        client = self.plugin.get_client(self)
838        return client.get_password_for_storage_encryption()
839
840    def has_usable_connection_with_device(self) -> bool:
841        if not hasattr(self, 'plugin'):
842            return False
843        client = self.plugin.get_client(self, force_pair=False)
844        if client is None:
845            return False
846        return client.has_usable_connection_with_device()
847
848    def ready_to_sign(self):
849        return super().ready_to_sign() and self.has_usable_connection_with_device()
850
851    def opportunistically_fill_in_missing_info_from_device(self, client: 'HardwareClientBase'):
852        assert client is not None
853        if self._root_fingerprint is None:
854            self._root_fingerprint = client.request_root_fingerprint_from_device()
855            self.is_requesting_to_be_rewritten_to_wallet_file = True
856        if self.label != client.label():
857            self.label = client.label()
858            self.is_requesting_to_be_rewritten_to_wallet_file = True
859        if self.soft_device_id != client.get_soft_device_id():
860            self.soft_device_id = client.get_soft_device_id()
861            self.is_requesting_to_be_rewritten_to_wallet_file = True
862
863
864KeyStoreWithMPK = Union[KeyStore, MasterPublicKeyMixin]  # intersection really...
865AddressIndexGeneric = Union[Sequence[int], str]  # can be hex pubkey str
866
867
868def bip39_normalize_passphrase(passphrase):
869    return normalize('NFKD', passphrase or '')
870
871def bip39_to_seed(mnemonic, passphrase):
872    import hashlib, hmac
873    PBKDF2_ROUNDS = 2048
874    mnemonic = normalize('NFKD', ' '.join(mnemonic.split()))
875    passphrase = bip39_normalize_passphrase(passphrase)
876    return hashlib.pbkdf2_hmac('sha512', mnemonic.encode('utf-8'),
877        b'mnemonic' + passphrase.encode('utf-8'), iterations = PBKDF2_ROUNDS)
878
879
880def bip39_is_checksum_valid(
881        mnemonic: str,
882        *,
883        wordlist: Wordlist = None,
884) -> Tuple[bool, bool]:
885    """Test checksum of bip39 mnemonic assuming English wordlist.
886    Returns tuple (is_checksum_valid, is_wordlist_valid)
887    """
888    words = [normalize('NFKD', word) for word in mnemonic.split()]
889    words_len = len(words)
890    if wordlist is None:
891        wordlist = Wordlist.from_file("english.txt")
892    n = len(wordlist)
893    i = 0
894    words.reverse()
895    while words:
896        w = words.pop()
897        try:
898            k = wordlist.index(w)
899        except ValueError:
900            return False, False
901        i = i*n + k
902    if words_len not in [12, 15, 18, 21, 24]:
903        return False, True
904    checksum_length = 11 * words_len // 33  # num bits
905    entropy_length = 32 * checksum_length  # num bits
906    entropy = i >> checksum_length
907    checksum = i % 2**checksum_length
908    entropy_bytes = int.to_bytes(entropy, length=entropy_length//8, byteorder="big")
909    hashed = int.from_bytes(sha256(entropy_bytes), byteorder="big")
910    calculated_checksum = hashed >> (256 - checksum_length)
911    return checksum == calculated_checksum, True
912
913
914def from_bip43_rootseed(root_seed, derivation, xtype=None):
915    k = BIP32_KeyStore({})
916    if xtype is None:
917        xtype = xtype_from_derivation(derivation)
918    k.add_xprv_from_seed(root_seed, xtype, derivation)
919    return k
920
921
922PURPOSE48_SCRIPT_TYPES = {
923    'p2wsh-p2sh': 1,  # specifically multisig
924    'p2wsh': 2,       # specifically multisig
925}
926PURPOSE48_SCRIPT_TYPES_INV = inv_dict(PURPOSE48_SCRIPT_TYPES)
927
928
929def xtype_from_derivation(derivation: str) -> str:
930    """Returns the script type to be used for this derivation."""
931    bip32_indices = convert_bip32_path_to_list_of_uint32(derivation)
932    if len(bip32_indices) >= 1:
933        if bip32_indices[0] == 84 + BIP32_PRIME:
934            return 'p2wpkh'
935        elif bip32_indices[0] == 49 + BIP32_PRIME:
936            return 'p2wpkh-p2sh'
937        elif bip32_indices[0] == 44 + BIP32_PRIME:
938            return 'standard'
939        elif bip32_indices[0] == 45 + BIP32_PRIME:
940            return 'standard'
941
942    if len(bip32_indices) >= 4:
943        if bip32_indices[0] == 48 + BIP32_PRIME:
944            # m / purpose' / coin_type' / account' / script_type' / change / address_index
945            script_type_int = bip32_indices[3] - BIP32_PRIME
946            script_type = PURPOSE48_SCRIPT_TYPES_INV.get(script_type_int)
947            if script_type is not None:
948                return script_type
949    return 'standard'
950
951
952hw_keystores = {}
953
954def register_keystore(hw_type, constructor):
955    hw_keystores[hw_type] = constructor
956
957def hardware_keystore(d) -> Hardware_KeyStore:
958    hw_type = d['hw_type']
959    if hw_type in hw_keystores:
960        constructor = hw_keystores[hw_type]
961        return constructor(d)
962    raise WalletFileException(f'unknown hardware type: {hw_type}. '
963                              f'hw_keystores: {list(hw_keystores)}')
964
965def load_keystore(db: 'WalletDB', name: str) -> KeyStore:
966    d = db.get(name, {})
967    t = d.get('type')
968    if not t:
969        raise WalletFileException(
970            'Wallet format requires update.\n'
971            'Cannot find keystore for name {}'.format(name))
972    keystore_constructors = {ks.type: ks for ks in [Old_KeyStore, Imported_KeyStore, BIP32_KeyStore]}
973    keystore_constructors['hardware'] = hardware_keystore
974    try:
975        ks_constructor = keystore_constructors[t]
976    except KeyError:
977        raise WalletFileException(f'Unknown type {t} for keystore named {name}')
978    k = ks_constructor(d)
979    return k
980
981
982def is_old_mpk(mpk: str) -> bool:
983    try:
984        int(mpk, 16)  # test if hex string
985    except:
986        return False
987    if len(mpk) != 128:
988        return False
989    try:
990        ecc.ECPubkey(bfh('04' + mpk))
991    except:
992        return False
993    return True
994
995
996def is_address_list(text):
997    parts = text.split()
998    return bool(parts) and all(bitcoin.is_address(x) for x in parts)
999
1000
1001def get_private_keys(text, *, allow_spaces_inside_key=True, raise_on_error=False):
1002    if allow_spaces_inside_key:  # see #1612
1003        parts = text.split('\n')
1004        parts = map(lambda x: ''.join(x.split()), parts)
1005        parts = list(filter(bool, parts))
1006    else:
1007        parts = text.split()
1008    if bool(parts) and all(bitcoin.is_private_key(x, raise_on_error=raise_on_error) for x in parts):
1009        return parts
1010
1011
1012def is_private_key_list(text, *, allow_spaces_inside_key=True, raise_on_error=False):
1013    return bool(get_private_keys(text,
1014                                 allow_spaces_inside_key=allow_spaces_inside_key,
1015                                 raise_on_error=raise_on_error))
1016
1017
1018def is_master_key(x):
1019    return is_old_mpk(x) or is_bip32_key(x)
1020
1021
1022def is_bip32_key(x):
1023    return is_xprv(x) or is_xpub(x)
1024
1025
1026def bip44_derivation(account_id, bip43_purpose=44):
1027    coin = constants.net.BIP44_COIN_TYPE
1028    der = "m/%d'/%d'/%d'" % (bip43_purpose, coin, int(account_id))
1029    return normalize_bip32_derivation(der)
1030
1031
1032def purpose48_derivation(account_id: int, xtype: str) -> str:
1033    # m / purpose' / coin_type' / account' / script_type' / change / address_index
1034    bip43_purpose = 48
1035    coin = constants.net.BIP44_COIN_TYPE
1036    account_id = int(account_id)
1037    script_type_int = PURPOSE48_SCRIPT_TYPES.get(xtype)
1038    if script_type_int is None:
1039        raise Exception('unknown xtype: {}'.format(xtype))
1040    der = "m/%d'/%d'/%d'/%d'" % (bip43_purpose, coin, account_id, script_type_int)
1041    return normalize_bip32_derivation(der)
1042
1043
1044def from_seed(seed, passphrase, is_p2sh=False):
1045    t = seed_type(seed)
1046    if t == 'old':
1047        keystore = Old_KeyStore({})
1048        keystore.add_seed(seed)
1049    elif t in ['standard', 'segwit']:
1050        keystore = BIP32_KeyStore({})
1051        keystore.add_seed(seed)
1052        keystore.passphrase = passphrase
1053        bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase)
1054        if t == 'standard':
1055            der = "m/"
1056            xtype = 'standard'
1057        else:
1058            der = "m/1'/" if is_p2sh else "m/0'/"
1059            xtype = 'p2wsh' if is_p2sh else 'p2wpkh'
1060        keystore.add_xprv_from_seed(bip32_seed, xtype, der)
1061    else:
1062        raise BitcoinException('Unexpected seed type {}'.format(repr(t)))
1063    return keystore
1064
1065def from_private_key_list(text):
1066    keystore = Imported_KeyStore({})
1067    for x in get_private_keys(text):
1068        keystore.import_privkey(x, None)
1069    return keystore
1070
1071def from_old_mpk(mpk):
1072    keystore = Old_KeyStore({})
1073    keystore.add_master_public_key(mpk)
1074    return keystore
1075
1076def from_xpub(xpub):
1077    k = BIP32_KeyStore({})
1078    k.add_xpub(xpub)
1079    return k
1080
1081def from_xprv(xprv):
1082    k = BIP32_KeyStore({})
1083    k.add_xprv(xprv)
1084    return k
1085
1086def from_master_key(text):
1087    if is_xprv(text):
1088        k = from_xprv(text)
1089    elif is_old_mpk(text):
1090        k = from_old_mpk(text)
1091    elif is_xpub(text):
1092        k = from_xpub(text)
1093    else:
1094        raise BitcoinException('Invalid master key')
1095    return k
1096