1#!/usr/local/bin/python3.8
2# -*- mode: python -*-
3#
4# Electrum - lightweight Bitcoin client
5# Copyright (C) 2016  The Electrum developers
6#
7# Permission is hereby granted, free of charge, to any person
8# obtaining a copy of this software and associated documentation files
9# (the "Software"), to deal in the Software without restriction,
10# including without limitation the rights to use, copy, modify, merge,
11# publish, distribute, sublicense, and/or sell copies of the Software,
12# and to permit persons to whom the Software is furnished to do so,
13# subject to the following conditions:
14#
15# The above copyright notice and this permission notice shall be
16# included in all copies or substantial portions of the Software.
17#
18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
22# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
23# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
24# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25# SOFTWARE.
26
27from typing import TYPE_CHECKING, Dict, List, Union, Tuple, Sequence, Optional, Type, Iterable, Any
28from functools import partial
29
30from electrum.plugin import (BasePlugin, hook, Device, DeviceMgr, DeviceInfo,
31                             assert_runs_in_hwd_thread, runs_in_hwd_thread)
32from electrum.i18n import _
33from electrum.bitcoin import is_address, opcodes
34from electrum.util import bfh, versiontuple, UserFacingException
35from electrum.transaction import TxOutput, Transaction, PartialTransaction, PartialTxInput, PartialTxOutput
36from electrum.bip32 import BIP32Node
37from electrum.storage import get_derivation_used_for_hw_device_encryption
38from electrum.keystore import Xpub, Hardware_KeyStore
39
40if TYPE_CHECKING:
41    import threading
42    from electrum.wallet import Abstract_Wallet
43    from electrum.base_wizard import BaseWizard
44
45
46class HW_PluginBase(BasePlugin):
47    keystore_class: Type['Hardware_KeyStore']
48    libraries_available: bool
49
50    # define supported library versions:  minimum_library <= x < maximum_library
51    minimum_library = (0,)
52    maximum_library = (float('inf'),)
53
54    DEVICE_IDS: Iterable[Any]
55
56    def __init__(self, parent, config, name):
57        BasePlugin.__init__(self, parent, config, name)
58        self.device = self.keystore_class.device
59        self.keystore_class.plugin = self
60        self._ignore_outdated_fw = False
61
62    def is_enabled(self):
63        return True
64
65    def device_manager(self) -> 'DeviceMgr':
66        return self.parent.device_manager
67
68    def create_device_from_hid_enumeration(self, d: dict, *, product_key) -> Optional['Device']:
69        # Older versions of hid don't provide interface_number
70        interface_number = d.get('interface_number', -1)
71        usage_page = d['usage_page']
72        id_ = d['serial_number']
73        if len(id_) == 0:
74            id_ = str(d['path'])
75        id_ += str(interface_number) + str(usage_page)
76        device = Device(path=d['path'],
77                        interface_number=interface_number,
78                        id_=id_,
79                        product_key=product_key,
80                        usage_page=usage_page,
81                        transport_ui_string='hid')
82        return device
83
84    @hook
85    def close_wallet(self, wallet: 'Abstract_Wallet'):
86        for keystore in wallet.get_keystores():
87            if isinstance(keystore, self.keystore_class):
88                self.device_manager().unpair_xpub(keystore.xpub)
89                if keystore.thread:
90                    keystore.thread.stop()
91
92    def scan_and_create_client_for_device(self, *, device_id: str, wizard: 'BaseWizard') -> 'HardwareClientBase':
93        devmgr = self.device_manager()
94        client = wizard.run_task_without_blocking_gui(
95            task=partial(devmgr.client_by_id, device_id))
96        if client is None:
97            raise UserFacingException(_('Failed to create a client for this device.') + '\n' +
98                                      _('Make sure it is in the correct state.'))
99        client.handler = self.create_handler(wizard)
100        return client
101
102    def setup_device(self, device_info: DeviceInfo, wizard: 'BaseWizard', purpose) -> 'HardwareClientBase':
103        """Called when creating a new wallet or when using the device to decrypt
104        an existing wallet. Select the device to use.  If the device is
105        uninitialized, go through the initialization process.
106
107        Runs in GUI thread.
108        """
109        raise NotImplementedError()
110
111    def get_client(self, keystore: 'Hardware_KeyStore', force_pair: bool = True, *,
112                   devices: Sequence['Device'] = None,
113                   allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
114        devmgr = self.device_manager()
115        handler = keystore.handler
116        client = devmgr.client_for_keystore(self, handler, keystore, force_pair,
117                                            devices=devices,
118                                            allow_user_interaction=allow_user_interaction)
119        return client
120
121    def show_address(self, wallet: 'Abstract_Wallet', address, keystore: 'Hardware_KeyStore' = None):
122        pass  # implemented in child classes
123
124    def show_address_helper(self, wallet, address, keystore=None):
125        if keystore is None:
126            keystore = wallet.get_keystore()
127        if not is_address(address):
128            keystore.handler.show_error(_('Invalid Bitcoin Address'))
129            return False
130        if not wallet.is_mine(address):
131            keystore.handler.show_error(_('Address not in wallet.'))
132            return False
133        if type(keystore) != self.keystore_class:
134            return False
135        return True
136
137    def get_library_version(self) -> str:
138        """Returns the version of the 3rd party python library
139        for the hw wallet. For example '0.9.0'
140
141        Returns 'unknown' if library is found but cannot determine version.
142        Raises 'ImportError' if library is not found.
143        Raises 'LibraryFoundButUnusable' if found but there was some problem (includes version num).
144        """
145        raise NotImplementedError()
146
147    def check_libraries_available(self) -> bool:
148        def version_str(t):
149            return ".".join(str(i) for i in t)
150
151        try:
152            # this might raise ImportError or LibraryFoundButUnusable
153            library_version = self.get_library_version()
154            # if no exception so far, we might still raise LibraryFoundButUnusable
155            if (library_version == 'unknown'
156                    or versiontuple(library_version) < self.minimum_library
157                    or versiontuple(library_version) >= self.maximum_library):
158                raise LibraryFoundButUnusable(library_version=library_version)
159        except ImportError:
160            return False
161        except LibraryFoundButUnusable as e:
162            library_version = e.library_version
163            self.libraries_available_message = (
164                    _("Library version for '{}' is incompatible.").format(self.name)
165                    + '\nInstalled: {}, Needed: {} <= x < {}'
166                    .format(library_version, version_str(self.minimum_library), version_str(self.maximum_library)))
167            self.logger.warning(self.libraries_available_message)
168            return False
169
170        return True
171
172    def get_library_not_available_message(self) -> str:
173        if hasattr(self, 'libraries_available_message'):
174            message = self.libraries_available_message
175        else:
176            message = _("Missing libraries for {}.").format(self.name)
177        message += '\n' + _("Make sure you install it with python3")
178        return message
179
180    def set_ignore_outdated_fw(self):
181        self._ignore_outdated_fw = True
182
183    def is_outdated_fw_ignored(self) -> bool:
184        return self._ignore_outdated_fw
185
186    def create_client(self, device: 'Device',
187                      handler: Optional['HardwareHandlerBase']) -> Optional['HardwareClientBase']:
188        raise NotImplementedError()
189
190    def get_xpub(self, device_id: str, derivation: str, xtype, wizard: 'BaseWizard') -> str:
191        raise NotImplementedError()
192
193    def create_handler(self, window) -> 'HardwareHandlerBase':
194        # note: in Qt GUI, 'window' is either an ElectrumWindow or an InstallWizard
195        raise NotImplementedError()
196
197    def can_recognize_device(self, device: Device) -> bool:
198        """Whether the plugin thinks it can handle the given device.
199        Used for filtering all connected hardware devices to only those by this vendor.
200        """
201        return device.product_key in self.DEVICE_IDS
202
203
204class HardwareClientBase:
205
206    handler = None  # type: Optional['HardwareHandlerBase']
207
208    def __init__(self, *, plugin: 'HW_PluginBase'):
209        assert_runs_in_hwd_thread()
210        self.plugin = plugin
211
212    def device_manager(self) -> 'DeviceMgr':
213        return self.plugin.device_manager()
214
215    def is_pairable(self) -> bool:
216        raise NotImplementedError()
217
218    def close(self):
219        raise NotImplementedError()
220
221    def timeout(self, cutoff) -> None:
222        pass
223
224    def is_initialized(self) -> bool:
225        """True if initialized, False if wiped."""
226        raise NotImplementedError()
227
228    def label(self) -> Optional[str]:
229        """The name given by the user to the device.
230
231        Note: labels are shown to the user to help distinguish their devices,
232        and they are also used as a fallback to distinguish devices programmatically.
233        So ideally, different devices would have different labels.
234        """
235        # When returning a constant here (i.e. not implementing the method in the way
236        # it is supposed to work), make sure the return value is in electrum.plugin.PLACEHOLDER_HW_CLIENT_LABELS
237        return " "
238
239    def get_soft_device_id(self) -> Optional[str]:
240        """An id-like string that is used to distinguish devices programmatically.
241        This is a long term id for the device, that does not change between reconnects.
242        This method should not prompt the user, i.e. no user interaction, as it is used
243        during USB device enumeration (called for each unpaired device).
244        Stored in the wallet file.
245        """
246        # This functionality is optional. If not implemented just return None:
247        return None
248
249    def has_usable_connection_with_device(self) -> bool:
250        raise NotImplementedError()
251
252    def get_xpub(self, bip32_path: str, xtype) -> str:
253        raise NotImplementedError()
254
255    @runs_in_hwd_thread
256    def request_root_fingerprint_from_device(self) -> str:
257        # digitalbitbox (at least) does not reveal xpubs corresponding to unhardened paths
258        # so ask for a direct child, and read out fingerprint from that:
259        child_of_root_xpub = self.get_xpub("m/0'", xtype='standard')
260        root_fingerprint = BIP32Node.from_xkey(child_of_root_xpub).fingerprint.hex().lower()
261        return root_fingerprint
262
263    @runs_in_hwd_thread
264    def get_password_for_storage_encryption(self) -> str:
265        # note: using a different password based on hw device type is highly undesirable! see #5993
266        derivation = get_derivation_used_for_hw_device_encryption()
267        xpub = self.get_xpub(derivation, "standard")
268        password = Xpub.get_pubkey_from_xpub(xpub, ()).hex()
269        return password
270
271    def device_model_name(self) -> Optional[str]:
272        """Return the name of the model of this device, which might be displayed in the UI.
273        E.g. for Trezor, "Trezor One" or "Trezor T".
274        """
275        return None
276
277    def manipulate_keystore_dict_during_wizard_setup(self, d: dict) -> None:
278        """Called during wallet creation in the wizard, before the keystore
279        is constructed for the first time. 'd' is the dict that will be
280        passed to the keystore constructor.
281        """
282        pass
283
284
285class HardwareHandlerBase:
286    """An interface between the GUI and the device handling logic for handling I/O."""
287    win = None
288    device: str
289
290    def get_wallet(self) -> Optional['Abstract_Wallet']:
291        if self.win is not None:
292            if hasattr(self.win, 'wallet'):
293                return self.win.wallet
294
295    def get_gui_thread(self) -> Optional['threading.Thread']:
296        if self.win is not None:
297            if hasattr(self.win, 'gui_thread'):
298                return self.win.gui_thread
299
300    def update_status(self, paired: bool) -> None:
301        pass
302
303    def query_choice(self, msg: str, labels: Sequence[str]) -> Optional[int]:
304        raise NotImplementedError()
305
306    def yes_no_question(self, msg: str) -> bool:
307        raise NotImplementedError()
308
309    def show_message(self, msg: str, on_cancel=None) -> None:
310        raise NotImplementedError()
311
312    def show_error(self, msg: str, blocking: bool = False) -> None:
313        raise NotImplementedError()
314
315    def finished(self) -> None:
316        pass
317
318    def get_word(self, msg: str) -> str:
319        raise NotImplementedError()
320
321    def get_passphrase(self, msg: str, confirm: bool) -> Optional[str]:
322        raise NotImplementedError()
323
324    def get_pin(self, msg: str, *, show_strength: bool = True) -> str:
325        raise NotImplementedError()
326
327
328def is_any_tx_output_on_change_branch(tx: PartialTransaction) -> bool:
329    return any([txout.is_change for txout in tx.outputs()])
330
331
332def trezor_validate_op_return_output_and_get_data(output: TxOutput) -> bytes:
333    validate_op_return_output(output)
334    script = output.scriptpubkey
335    if not (script[0] == opcodes.OP_RETURN and
336            script[1] == len(script) - 2 and script[1] <= 75):
337        raise UserFacingException(_("Only OP_RETURN scripts, with one constant push, are supported."))
338    return script[2:]
339
340
341def validate_op_return_output(output: TxOutput, *, max_size: int = None) -> None:
342    script = output.scriptpubkey
343    if script[0] != opcodes.OP_RETURN:
344        raise UserFacingException(_("Only OP_RETURN scripts are supported."))
345    if max_size is not None and len(script) > max_size:
346        raise UserFacingException(_("OP_RETURN payload too large." + "\n"
347                                  + f"(scriptpubkey size {len(script)} > {max_size})"))
348    if output.value != 0:
349        raise UserFacingException(_("Amount for OP_RETURN output must be zero."))
350
351
352def get_xpubs_and_der_suffixes_from_txinout(tx: PartialTransaction,
353                                            txinout: Union[PartialTxInput, PartialTxOutput]) \
354        -> List[Tuple[str, List[int]]]:
355    xfp_to_xpub_map = {xfp: bip32node for bip32node, (xfp, path)
356                       in tx.xpubs.items()}  # type: Dict[bytes, BIP32Node]
357    xfps = [txinout.bip32_paths[pubkey][0] for pubkey in txinout.pubkeys]
358    try:
359        xpubs = [xfp_to_xpub_map[xfp] for xfp in xfps]
360    except KeyError as e:
361        raise Exception(f"Partial transaction is missing global xpub for "
362                        f"fingerprint ({str(e)}) in input/output") from e
363    xpubs_and_deriv_suffixes = []
364    for bip32node, pubkey in zip(xpubs, txinout.pubkeys):
365        xfp, path = txinout.bip32_paths[pubkey]
366        der_suffix = list(path)[bip32node.depth:]
367        xpubs_and_deriv_suffixes.append((bip32node.to_xpub(), der_suffix))
368    return xpubs_and_deriv_suffixes
369
370
371def only_hook_if_libraries_available(func):
372    # note: this decorator must wrap @hook, not the other way around,
373    # as 'hook' uses the name of the function it wraps
374    def wrapper(self: 'HW_PluginBase', *args, **kwargs):
375        if not self.libraries_available: return None
376        return func(self, *args, **kwargs)
377    return wrapper
378
379
380class LibraryFoundButUnusable(Exception):
381    def __init__(self, library_version='unknown'):
382        self.library_version = library_version
383
384
385class OutdatedHwFirmwareException(UserFacingException):
386
387    def text_ignore_old_fw_and_continue(self) -> str:
388        suffix = (_("The firmware of your hardware device is too old. "
389                    "If possible, you should upgrade it. "
390                    "You can ignore this error and try to continue, however things are likely to break.") + "\n\n" +
391                  _("Ignore and continue?"))
392        if str(self):
393            return str(self) + "\n\n" + suffix
394        else:
395            return suffix
396