1# Electrum - lightweight Bitcoin client
2# Copyright (C) 2011 Thomas Voegtlin
3#
4# Permission is hereby granted, free of charge, to any person
5# obtaining a copy of this software and associated documentation files
6# (the "Software"), to deal in the Software without restriction,
7# including without limitation the rights to use, copy, modify, merge,
8# publish, distribute, sublicense, and/or sell copies of the Software,
9# and to permit persons to whom the Software is furnished to do so,
10# subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be
13# included in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22# SOFTWARE.
23import binascii
24import os, sys, re, json
25from collections import defaultdict, OrderedDict
26from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any,
27                    Sequence, Dict, Generic, TypeVar, List, Iterable, Set)
28from datetime import datetime
29import decimal
30from decimal import Decimal
31import traceback
32import urllib
33import threading
34import hmac
35import stat
36from locale import localeconv
37import asyncio
38import urllib.request, urllib.parse, urllib.error
39import builtins
40import json
41import time
42from typing import NamedTuple, Optional
43import ssl
44import ipaddress
45from ipaddress import IPv4Address, IPv6Address
46import random
47import secrets
48import functools
49from abc import abstractmethod, ABC
50
51import attr
52import aiohttp
53from aiohttp_socks import ProxyConnector, ProxyType
54import aiorpcx
55from aiorpcx import TaskGroup
56import certifi
57import dns.resolver
58
59from .i18n import _
60from .logging import get_logger, Logger
61
62if TYPE_CHECKING:
63    from .network import Network
64    from .interface import Interface
65    from .simple_config import SimpleConfig
66
67
68_logger = get_logger(__name__)
69
70
71def inv_dict(d):
72    return {v: k for k, v in d.items()}
73
74
75def all_subclasses(cls) -> Set:
76    """Return all (transitive) subclasses of cls."""
77    res = set(cls.__subclasses__())
78    for sub in res.copy():
79        res |= all_subclasses(sub)
80    return res
81
82
83ca_path = certifi.where()
84
85
86base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0}
87base_units_inverse = inv_dict(base_units)
88base_units_list = ['BTC', 'mBTC', 'bits', 'sat']  # list(dict) does not guarantee order
89
90DECIMAL_POINT_DEFAULT = 5  # mBTC
91
92
93class UnknownBaseUnit(Exception): pass
94
95
96def decimal_point_to_base_unit_name(dp: int) -> str:
97    # e.g. 8 -> "BTC"
98    try:
99        return base_units_inverse[dp]
100    except KeyError:
101        raise UnknownBaseUnit(dp) from None
102
103
104def base_unit_name_to_decimal_point(unit_name: str) -> int:
105    # e.g. "BTC" -> 8
106    try:
107        return base_units[unit_name]
108    except KeyError:
109        raise UnknownBaseUnit(unit_name) from None
110
111
112class NotEnoughFunds(Exception):
113    def __str__(self):
114        return _("Insufficient funds")
115
116
117class NoDynamicFeeEstimates(Exception):
118    def __str__(self):
119        return _('Dynamic fee estimates not available')
120
121
122class MultipleSpendMaxTxOutputs(Exception):
123    def __str__(self):
124        return _('At most one output can be set to spend max')
125
126
127class InvalidPassword(Exception):
128    def __str__(self):
129        return _("Incorrect password")
130
131
132class AddTransactionException(Exception):
133    pass
134
135
136class UnrelatedTransactionException(AddTransactionException):
137    def __str__(self):
138        return _("Transaction is unrelated to this wallet.")
139
140
141class FileImportFailed(Exception):
142    def __init__(self, message=''):
143        self.message = str(message)
144
145    def __str__(self):
146        return _("Failed to import from file.") + "\n" + self.message
147
148
149class FileExportFailed(Exception):
150    def __init__(self, message=''):
151        self.message = str(message)
152
153    def __str__(self):
154        return _("Failed to export to file.") + "\n" + self.message
155
156
157class WalletFileException(Exception): pass
158
159
160class BitcoinException(Exception): pass
161
162
163class UserFacingException(Exception):
164    """Exception that contains information intended to be shown to the user."""
165
166
167class InvoiceError(UserFacingException): pass
168
169
170# Throw this exception to unwind the stack like when an error occurs.
171# However unlike other exceptions the user won't be informed.
172class UserCancelled(Exception):
173    '''An exception that is suppressed from the user'''
174    pass
175
176
177# note: this is not a NamedTuple as then its json encoding cannot be customized
178class Satoshis(object):
179    __slots__ = ('value',)
180
181    def __new__(cls, value):
182        self = super(Satoshis, cls).__new__(cls)
183        # note: 'value' sometimes has msat precision
184        self.value = value
185        return self
186
187    def __repr__(self):
188        return f'Satoshis({self.value})'
189
190    def __str__(self):
191        # note: precision is truncated to satoshis here
192        return format_satoshis(self.value)
193
194    def __eq__(self, other):
195        return self.value == other.value
196
197    def __ne__(self, other):
198        return not (self == other)
199
200    def __add__(self, other):
201        return Satoshis(self.value + other.value)
202
203
204# note: this is not a NamedTuple as then its json encoding cannot be customized
205class Fiat(object):
206    __slots__ = ('value', 'ccy')
207
208    def __new__(cls, value: Optional[Decimal], ccy: str):
209        self = super(Fiat, cls).__new__(cls)
210        self.ccy = ccy
211        if not isinstance(value, (Decimal, type(None))):
212            raise TypeError(f"value should be Decimal or None, not {type(value)}")
213        self.value = value
214        return self
215
216    def __repr__(self):
217        return 'Fiat(%s)'% self.__str__()
218
219    def __str__(self):
220        if self.value is None or self.value.is_nan():
221            return _('No Data')
222        else:
223            return "{:.2f}".format(self.value)
224
225    def to_ui_string(self):
226        if self.value is None or self.value.is_nan():
227            return _('No Data')
228        else:
229            return "{:.2f}".format(self.value) + ' ' + self.ccy
230
231    def __eq__(self, other):
232        if not isinstance(other, Fiat):
233            return False
234        if self.ccy != other.ccy:
235            return False
236        if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \
237                and self.value.is_nan() and other.value.is_nan():
238            return True
239        return self.value == other.value
240
241    def __ne__(self, other):
242        return not (self == other)
243
244    def __add__(self, other):
245        assert self.ccy == other.ccy
246        return Fiat(self.value + other.value, self.ccy)
247
248
249class MyEncoder(json.JSONEncoder):
250    def default(self, obj):
251        # note: this does not get called for namedtuples :(  https://bugs.python.org/issue30343
252        from .transaction import Transaction, TxOutput
253        from .lnutil import UpdateAddHtlc
254        if isinstance(obj, UpdateAddHtlc):
255            return obj.to_tuple()
256        if isinstance(obj, Transaction):
257            return obj.serialize()
258        if isinstance(obj, TxOutput):
259            return obj.to_legacy_tuple()
260        if isinstance(obj, Satoshis):
261            return str(obj)
262        if isinstance(obj, Fiat):
263            return str(obj)
264        if isinstance(obj, Decimal):
265            return str(obj)
266        if isinstance(obj, datetime):
267            return obj.isoformat(' ')[:-3]
268        if isinstance(obj, set):
269            return list(obj)
270        if isinstance(obj, bytes): # for nametuples in lnchannel
271            return obj.hex()
272        if hasattr(obj, 'to_json') and callable(obj.to_json):
273            return obj.to_json()
274        return super(MyEncoder, self).default(obj)
275
276
277class ThreadJob(Logger):
278    """A job that is run periodically from a thread's main loop.  run() is
279    called from that thread's context.
280    """
281
282    def __init__(self):
283        Logger.__init__(self)
284
285    def run(self):
286        """Called periodically from the thread"""
287        pass
288
289class DebugMem(ThreadJob):
290    '''A handy class for debugging GC memory leaks'''
291    def __init__(self, classes, interval=30):
292        ThreadJob.__init__(self)
293        self.next_time = 0
294        self.classes = classes
295        self.interval = interval
296
297    def mem_stats(self):
298        import gc
299        self.logger.info("Start memscan")
300        gc.collect()
301        objmap = defaultdict(list)
302        for obj in gc.get_objects():
303            for class_ in self.classes:
304                if isinstance(obj, class_):
305                    objmap[class_].append(obj)
306        for class_, objs in objmap.items():
307            self.logger.info(f"{class_.__name__}: {len(objs)}")
308        self.logger.info("Finish memscan")
309
310    def run(self):
311        if time.time() > self.next_time:
312            self.mem_stats()
313            self.next_time = time.time() + self.interval
314
315class DaemonThread(threading.Thread, Logger):
316    """ daemon thread that terminates cleanly """
317
318    LOGGING_SHORTCUT = 'd'
319
320    def __init__(self):
321        threading.Thread.__init__(self)
322        Logger.__init__(self)
323        self.parent_thread = threading.currentThread()
324        self.running = False
325        self.running_lock = threading.Lock()
326        self.job_lock = threading.Lock()
327        self.jobs = []
328        self.stopped_event = threading.Event()  # set when fully stopped
329
330    def add_jobs(self, jobs):
331        with self.job_lock:
332            self.jobs.extend(jobs)
333
334    def run_jobs(self):
335        # Don't let a throwing job disrupt the thread, future runs of
336        # itself, or other jobs.  This is useful protection against
337        # malformed or malicious server responses
338        with self.job_lock:
339            for job in self.jobs:
340                try:
341                    job.run()
342                except Exception as e:
343                    self.logger.exception('')
344
345    def remove_jobs(self, jobs):
346        with self.job_lock:
347            for job in jobs:
348                self.jobs.remove(job)
349
350    def start(self):
351        with self.running_lock:
352            self.running = True
353        return threading.Thread.start(self)
354
355    def is_running(self):
356        with self.running_lock:
357            return self.running and self.parent_thread.is_alive()
358
359    def stop(self):
360        with self.running_lock:
361            self.running = False
362
363    def on_stop(self):
364        if 'ANDROID_DATA' in os.environ:
365            import jnius
366            jnius.detach()
367            self.logger.info("jnius detach")
368        self.logger.info("stopped")
369        self.stopped_event.set()
370
371
372def print_stderr(*args):
373    args = [str(item) for item in args]
374    sys.stderr.write(" ".join(args) + "\n")
375    sys.stderr.flush()
376
377def print_msg(*args):
378    # Stringify args
379    args = [str(item) for item in args]
380    sys.stdout.write(" ".join(args) + "\n")
381    sys.stdout.flush()
382
383def json_encode(obj):
384    try:
385        s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder)
386    except TypeError:
387        s = repr(obj)
388    return s
389
390def json_decode(x):
391    try:
392        return json.loads(x, parse_float=Decimal)
393    except:
394        return x
395
396def json_normalize(x):
397    # note: The return value of commands, when going through the JSON-RPC interface,
398    #       is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis.
399    # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded.
400    # see #5868
401    return json_decode(json_encode(x))
402
403
404# taken from Django Source Code
405def constant_time_compare(val1, val2):
406    """Return True if the two strings are equal, False otherwise."""
407    return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8'))
408
409
410# decorator that prints execution time
411_profiler_logger = _logger.getChild('profiler')
412def profiler(func):
413    def do_profile(args, kw_args):
414        name = func.__qualname__
415        t0 = time.time()
416        o = func(*args, **kw_args)
417        t = time.time() - t0
418        _profiler_logger.debug(f"{name} {t:,.4f}")
419        return o
420    return lambda *args, **kw_args: do_profile(args, kw_args)
421
422
423def android_ext_dir():
424    from android.storage import primary_external_storage_path
425    return primary_external_storage_path()
426
427def android_backup_dir():
428    d = os.path.join(android_ext_dir(), 'org.electrum.electrum')
429    if not os.path.exists(d):
430        os.mkdir(d)
431    return d
432
433def android_data_dir():
434    import jnius
435    PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity')
436    return PythonActivity.mActivity.getFilesDir().getPath() + '/data'
437
438def ensure_sparse_file(filename):
439    # On modern Linux, no need to do anything.
440    # On Windows, need to explicitly mark file.
441    if os.name == "nt":
442        try:
443            os.system('fsutil sparse setflag "{}" 1'.format(filename))
444        except Exception as e:
445            _logger.info(f'error marking file {filename} as sparse: {e}')
446
447
448def get_headers_dir(config):
449    return config.path
450
451
452def assert_datadir_available(config_path):
453    path = config_path
454    if os.path.exists(path):
455        return
456    else:
457        raise FileNotFoundError(
458            'Electrum datadir does not exist. Was it deleted while running?' + '\n' +
459            'Should be at {}'.format(path))
460
461
462def assert_file_in_datadir_available(path, config_path):
463    if os.path.exists(path):
464        return
465    else:
466        assert_datadir_available(config_path)
467        raise FileNotFoundError(
468            'Cannot find file but datadir is there.' + '\n' +
469            'Should be at {}'.format(path))
470
471
472def standardize_path(path):
473    return os.path.normcase(
474            os.path.realpath(
475                os.path.abspath(
476                    os.path.expanduser(
477                        path
478    ))))
479
480
481def get_new_wallet_name(wallet_folder: str) -> str:
482    i = 1
483    while True:
484        filename = "wallet_%d" % i
485        if filename in os.listdir(wallet_folder):
486            i += 1
487        else:
488            break
489    return filename
490
491
492def assert_bytes(*args):
493    """
494    porting helper, assert args type
495    """
496    try:
497        for x in args:
498            assert isinstance(x, (bytes, bytearray))
499    except:
500        print('assert bytes failed', list(map(type, args)))
501        raise
502
503
504def assert_str(*args):
505    """
506    porting helper, assert args type
507    """
508    for x in args:
509        assert isinstance(x, str)
510
511
512def to_string(x, enc) -> str:
513    if isinstance(x, (bytes, bytearray)):
514        return x.decode(enc)
515    if isinstance(x, str):
516        return x
517    else:
518        raise TypeError("Not a string or bytes like object")
519
520
521def to_bytes(something, encoding='utf8') -> bytes:
522    """
523    cast string to bytes() like object, but for python2 support it's bytearray copy
524    """
525    if isinstance(something, bytes):
526        return something
527    if isinstance(something, str):
528        return something.encode(encoding)
529    elif isinstance(something, bytearray):
530        return bytes(something)
531    else:
532        raise TypeError("Not a string or bytes like object")
533
534
535bfh = bytes.fromhex
536
537
538def bh2u(x: bytes) -> str:
539    """
540    str with hex representation of a bytes-like object
541
542    >>> x = bytes((1, 2, 10))
543    >>> bh2u(x)
544    '01020A'
545    """
546    return x.hex()
547
548
549def xor_bytes(a: bytes, b: bytes) -> bytes:
550    size = min(len(a), len(b))
551    return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big"))
552            .to_bytes(size, "big"))
553
554
555def user_dir():
556    if "ELECTRUMDIR" in os.environ:
557        return os.environ["ELECTRUMDIR"]
558    elif 'ANDROID_DATA' in os.environ:
559        return android_data_dir()
560    elif os.name == 'posix':
561        return os.path.join(os.environ["HOME"], ".electrum")
562    elif "APPDATA" in os.environ:
563        return os.path.join(os.environ["APPDATA"], "Electrum")
564    elif "LOCALAPPDATA" in os.environ:
565        return os.path.join(os.environ["LOCALAPPDATA"], "Electrum")
566    else:
567        #raise Exception("No home directory found in environment variables.")
568        return
569
570
571def resource_path(*parts):
572    return os.path.join(pkg_dir, *parts)
573
574
575# absolute path to python package folder of electrum ("lib")
576pkg_dir = os.path.split(os.path.realpath(__file__))[0]
577
578
579def is_valid_email(s):
580    regexp = r"[^@]+@[^@]+\.[^@]+"
581    return re.match(regexp, s) is not None
582
583
584def is_hash256_str(text: Any) -> bool:
585    if not isinstance(text, str): return False
586    if len(text) != 64: return False
587    return is_hex_str(text)
588
589
590def is_hex_str(text: Any) -> bool:
591    if not isinstance(text, str): return False
592    try:
593        b = bytes.fromhex(text)
594    except:
595        return False
596    # forbid whitespaces in text:
597    if len(text) != 2 * len(b):
598        return False
599    return True
600
601
602def is_integer(val: Any) -> bool:
603    return isinstance(val, int)
604
605
606def is_non_negative_integer(val: Any) -> bool:
607    if is_integer(val):
608        return val >= 0
609    return False
610
611
612def is_int_or_float(val: Any) -> bool:
613    return isinstance(val, (int, float))
614
615
616def is_non_negative_int_or_float(val: Any) -> bool:
617    if is_int_or_float(val):
618        return val >= 0
619    return False
620
621
622def chunks(items, size: int):
623    """Break up items, an iterable, into chunks of length size."""
624    if size < 1:
625        raise ValueError(f"size must be positive, not {repr(size)}")
626    for i in range(0, len(items), size):
627        yield items[i: i + size]
628
629
630def format_satoshis_plain(x, *, decimal_point=8) -> str:
631    """Display a satoshi amount scaled.  Always uses a '.' as a decimal
632    point and has no thousands separator"""
633    if x == '!':
634        return 'max'
635    scale_factor = pow(10, decimal_point)
636    return "{:.8f}".format(Decimal(x) / scale_factor).rstrip('0').rstrip('.')
637
638
639# Check that Decimal precision is sufficient.
640# We need at the very least ~20, as we deal with msat amounts, and
641# log10(21_000_000 * 10**8 * 1000) ~= 18.3
642# decimal.DefaultContext.prec == 28 by default, but it is mutable.
643# We enforce that we have at least that available.
644assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}"
645
646DECIMAL_POINT = localeconv()['decimal_point']  # type: str
647
648
649def format_satoshis(
650        x,  # in satoshis
651        *,
652        num_zeros=0,
653        decimal_point=8,
654        precision=None,
655        is_diff=False,
656        whitespaces=False,
657) -> str:
658    if x is None:
659        return 'unknown'
660    if x == '!':
661        return 'max'
662    if precision is None:
663        precision = decimal_point
664    # format string
665    decimal_format = "." + str(precision) if precision > 0 else ""
666    if is_diff:
667        decimal_format = '+' + decimal_format
668    # initial result
669    scale_factor = pow(10, decimal_point)
670    if not isinstance(x, Decimal):
671        x = Decimal(x).quantize(Decimal('1E-8'))
672    result = ("{:" + decimal_format + "f}").format(x / scale_factor)
673    if "." not in result: result += "."
674    result = result.rstrip('0')
675    # extra decimal places
676    integer_part, fract_part = result.split(".")
677    if len(fract_part) < num_zeros:
678        fract_part += "0" * (num_zeros - len(fract_part))
679    result = integer_part + DECIMAL_POINT + fract_part
680    # leading/trailing whitespaces
681    if whitespaces:
682        result += " " * (decimal_point - len(fract_part))
683        result = " " * (15 - len(result)) + result
684    return result
685
686
687FEERATE_PRECISION = 1  # num fractional decimal places for sat/byte fee rates
688_feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION)
689
690
691def format_fee_satoshis(fee, *, num_zeros=0, precision=None):
692    if precision is None:
693        precision = FEERATE_PRECISION
694    num_zeros = min(num_zeros, FEERATE_PRECISION)  # no more zeroes than available prec
695    return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision)
696
697
698def quantize_feerate(fee) -> Union[None, Decimal, int]:
699    """Strip sat/byte fee rate of excess precision."""
700    if fee is None:
701        return None
702    return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN)
703
704
705def timestamp_to_datetime(timestamp: Optional[int]) -> Optional[datetime]:
706    if timestamp is None:
707        return None
708    return datetime.fromtimestamp(timestamp)
709
710def format_time(timestamp):
711    date = timestamp_to_datetime(timestamp)
712    return date.isoformat(' ')[:-3] if date else _("Unknown")
713
714
715# Takes a timestamp and returns a string with the approximation of the age
716def age(from_date, since_date = None, target_tz=None, include_seconds=False):
717    if from_date is None:
718        return "Unknown"
719
720    from_date = datetime.fromtimestamp(from_date)
721    if since_date is None:
722        since_date = datetime.now(target_tz)
723
724    td = time_difference(from_date - since_date, include_seconds)
725    return td + " ago" if from_date < since_date else "in " + td
726
727
728def time_difference(distance_in_time, include_seconds):
729    #distance_in_time = since_date - from_date
730    distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds)))
731    distance_in_minutes = int(round(distance_in_seconds/60))
732
733    if distance_in_minutes == 0:
734        if include_seconds:
735            return "%s seconds" % distance_in_seconds
736        else:
737            return "less than a minute"
738    elif distance_in_minutes < 45:
739        return "%s minutes" % distance_in_minutes
740    elif distance_in_minutes < 90:
741        return "about 1 hour"
742    elif distance_in_minutes < 1440:
743        return "about %d hours" % (round(distance_in_minutes / 60.0))
744    elif distance_in_minutes < 2880:
745        return "1 day"
746    elif distance_in_minutes < 43220:
747        return "%d days" % (round(distance_in_minutes / 1440))
748    elif distance_in_minutes < 86400:
749        return "about 1 month"
750    elif distance_in_minutes < 525600:
751        return "%d months" % (round(distance_in_minutes / 43200))
752    elif distance_in_minutes < 1051200:
753        return "about 1 year"
754    else:
755        return "over %d years" % (round(distance_in_minutes / 525600))
756
757mainnet_block_explorers = {
758    'Bitupper Explorer': ('https://bitupper.com/en/explorer/bitcoin/',
759                        {'tx': 'transactions/', 'addr': 'addresses/'}),
760    'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/',
761                        {'tx': 'Transaction/', 'addr': 'Address/'}),
762    'Blockchain.info': ('https://blockchain.com/btc/',
763                        {'tx': 'tx/', 'addr': 'address/'}),
764    'blockchainbdgpzk.onion': ('https://blockchainbdgpzk.onion/',
765                        {'tx': 'tx/', 'addr': 'address/'}),
766    'Blockstream.info': ('https://blockstream.info/',
767                        {'tx': 'tx/', 'addr': 'address/'}),
768    'Bitaps.com': ('https://btc.bitaps.com/',
769                        {'tx': '', 'addr': ''}),
770    'BTC.com': ('https://btc.com/',
771                        {'tx': '', 'addr': ''}),
772    'Chain.so': ('https://www.chain.so/',
773                        {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}),
774    'Insight.is': ('https://insight.bitpay.com/',
775                        {'tx': 'tx/', 'addr': 'address/'}),
776    'TradeBlock.com': ('https://tradeblock.com/blockchain/',
777                        {'tx': 'tx/', 'addr': 'address/'}),
778    'BlockCypher.com': ('https://live.blockcypher.com/btc/',
779                        {'tx': 'tx/', 'addr': 'address/'}),
780    'Blockchair.com': ('https://blockchair.com/bitcoin/',
781                        {'tx': 'transaction/', 'addr': 'address/'}),
782    'blockonomics.co': ('https://www.blockonomics.co/',
783                        {'tx': 'api/tx?txid=', 'addr': '#/search?q='}),
784    'mempool.space': ('https://mempool.space/',
785                        {'tx': 'tx/', 'addr': 'address/'}),
786    'mempool.emzy.de': ('https://mempool.emzy.de/',
787                        {'tx': 'tx/', 'addr': 'address/'}),
788    'OXT.me': ('https://oxt.me/',
789                        {'tx': 'transaction/', 'addr': 'address/'}),
790    'smartbit.com.au': ('https://www.smartbit.com.au/',
791                        {'tx': 'tx/', 'addr': 'address/'}),
792    'mynode.local': ('http://mynode.local:3002/',
793                        {'tx': 'tx/', 'addr': 'address/'}),
794    'system default': ('blockchain:/',
795                        {'tx': 'tx/', 'addr': 'address/'}),
796}
797
798testnet_block_explorers = {
799    'Bitaps.com': ('https://tbtc.bitaps.com/',
800                       {'tx': '', 'addr': ''}),
801    'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/',
802                       {'tx': 'tx/', 'addr': 'address/'}),
803    'Blockchain.info': ('https://www.blockchain.com/btc-testnet/',
804                       {'tx': 'tx/', 'addr': 'address/'}),
805    'Blockstream.info': ('https://blockstream.info/testnet/',
806                        {'tx': 'tx/', 'addr': 'address/'}),
807    'mempool.space': ('https://mempool.space/testnet/',
808                        {'tx': 'tx/', 'addr': 'address/'}),
809    'smartbit.com.au': ('https://testnet.smartbit.com.au/',
810                       {'tx': 'tx/', 'addr': 'address/'}),
811    'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/',
812                       {'tx': 'tx/', 'addr': 'address/'}),
813}
814
815signet_block_explorers = {
816    'bc-2.jp': ('https://explorer.bc-2.jp/',
817                        {'tx': 'tx/', 'addr': 'address/'}),
818    'mempool.space': ('https://mempool.space/signet/',
819                        {'tx': 'tx/', 'addr': 'address/'}),
820    'bitcoinexplorer.org': ('https://signet.bitcoinexplorer.org/',
821                       {'tx': 'tx/', 'addr': 'address/'}),
822    'wakiyamap.dev': ('https://signet-explorer.wakiyamap.dev/',
823                       {'tx': 'tx/', 'addr': 'address/'}),
824    'system default': ('blockchain:/',
825                       {'tx': 'tx/', 'addr': 'address/'}),
826}
827
828_block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'}
829
830
831def block_explorer_info():
832    from . import constants
833    if constants.net.NET_NAME == "testnet":
834        return testnet_block_explorers
835    elif constants.net.NET_NAME == "signet":
836        return signet_block_explorers
837    return mainnet_block_explorers
838
839
840def block_explorer(config: 'SimpleConfig') -> Optional[str]:
841    """Returns name of selected block explorer,
842    or None if a custom one (not among hardcoded ones) is configured.
843    """
844    if config.get('block_explorer_custom') is not None:
845        return None
846    default_ = 'Blockstream.info'
847    be_key = config.get('block_explorer', default_)
848    be_tuple = block_explorer_info().get(be_key)
849    if be_tuple is None:
850        be_key = default_
851    assert isinstance(be_key, str), f"{be_key!r} should be str"
852    return be_key
853
854
855def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]:
856    custom_be = config.get('block_explorer_custom')
857    if custom_be:
858        if isinstance(custom_be, str):
859            return custom_be, _block_explorer_default_api_loc
860        if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2:
861            return tuple(custom_be)
862        _logger.warning(f"not using 'block_explorer_custom' from config. "
863                        f"expected a str or a pair but got {custom_be!r}")
864        return None
865    else:
866        # using one of the hardcoded block explorers
867        return block_explorer_info().get(block_explorer(config))
868
869
870def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]:
871    be_tuple = block_explorer_tuple(config)
872    if not be_tuple:
873        return
874    explorer_url, explorer_dict = be_tuple
875    kind_str = explorer_dict.get(kind)
876    if kind_str is None:
877        return
878    if explorer_url[-1] != "/":
879        explorer_url += "/"
880    url_parts = [explorer_url, kind_str, item]
881    return ''.join(url_parts)
882
883# URL decode
884#_ud = re.compile('%([0-9a-hA-H]{2})', re.MULTILINE)
885#urldecode = lambda x: _ud.sub(lambda m: chr(int(m.group(1), 16)), x)
886
887
888# note: when checking against these, use .lower() to support case-insensitivity
889BITCOIN_BIP21_URI_SCHEME = 'bitcoin'
890LIGHTNING_URI_SCHEME = 'lightning'
891
892
893class InvalidBitcoinURI(Exception): pass
894
895
896# TODO rename to parse_bip21_uri or similar
897def parse_URI(uri: str, on_pr: Callable = None, *, loop=None) -> dict:
898    """Raises InvalidBitcoinURI on malformed URI."""
899    from . import bitcoin
900    from .bitcoin import COIN, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC
901
902    if not isinstance(uri, str):
903        raise InvalidBitcoinURI(f"expected string, not {repr(uri)}")
904
905    if ':' not in uri:
906        if not bitcoin.is_address(uri):
907            raise InvalidBitcoinURI("Not a bitcoin address")
908        return {'address': uri}
909
910    u = urllib.parse.urlparse(uri)
911    if u.scheme.lower() != BITCOIN_BIP21_URI_SCHEME:
912        raise InvalidBitcoinURI("Not a bitcoin URI")
913    address = u.path
914
915    # python for android fails to parse query
916    if address.find('?') > 0:
917        address, query = u.path.split('?')
918        pq = urllib.parse.parse_qs(query)
919    else:
920        pq = urllib.parse.parse_qs(u.query)
921
922    for k, v in pq.items():
923        if len(v) != 1:
924            raise InvalidBitcoinURI(f'Duplicate Key: {repr(k)}')
925
926    out = {k: v[0] for k, v in pq.items()}
927    if address:
928        if not bitcoin.is_address(address):
929            raise InvalidBitcoinURI(f"Invalid bitcoin address: {address}")
930        out['address'] = address
931    if 'amount' in out:
932        am = out['amount']
933        try:
934            m = re.match(r'([0-9.]+)X([0-9])', am)
935            if m:
936                k = int(m.group(2)) - 8
937                amount = Decimal(m.group(1)) * pow(Decimal(10), k)
938            else:
939                amount = Decimal(am) * COIN
940            if amount > TOTAL_COIN_SUPPLY_LIMIT_IN_BTC * COIN:
941                raise InvalidBitcoinURI(f"amount is out-of-bounds: {amount!r} BTC")
942            out['amount'] = int(amount)
943        except Exception as e:
944            raise InvalidBitcoinURI(f"failed to parse 'amount' field: {repr(e)}") from e
945    if 'message' in out:
946        out['message'] = out['message']
947        out['memo'] = out['message']
948    if 'time' in out:
949        try:
950            out['time'] = int(out['time'])
951        except Exception as e:
952            raise InvalidBitcoinURI(f"failed to parse 'time' field: {repr(e)}") from e
953    if 'exp' in out:
954        try:
955            out['exp'] = int(out['exp'])
956        except Exception as e:
957            raise InvalidBitcoinURI(f"failed to parse 'exp' field: {repr(e)}") from e
958    if 'sig' in out:
959        try:
960            out['sig'] = bh2u(bitcoin.base_decode(out['sig'], base=58))
961        except Exception as e:
962            raise InvalidBitcoinURI(f"failed to parse 'sig' field: {repr(e)}") from e
963
964    r = out.get('r')
965    sig = out.get('sig')
966    name = out.get('name')
967    if on_pr and (r or (name and sig)):
968        @log_exceptions
969        async def get_payment_request():
970            from . import paymentrequest as pr
971            if name and sig:
972                s = pr.serialize_request(out).SerializeToString()
973                request = pr.PaymentRequest(s)
974            else:
975                request = await pr.get_payment_request(r)
976            if on_pr:
977                on_pr(request)
978        loop = loop or asyncio.get_event_loop()
979        asyncio.run_coroutine_threadsafe(get_payment_request(), loop)
980
981    return out
982
983
984def create_bip21_uri(addr, amount_sat: Optional[int], message: Optional[str],
985                     *, extra_query_params: Optional[dict] = None) -> str:
986    from . import bitcoin
987    if not bitcoin.is_address(addr):
988        return ""
989    if extra_query_params is None:
990        extra_query_params = {}
991    query = []
992    if amount_sat:
993        query.append('amount=%s'%format_satoshis_plain(amount_sat))
994    if message:
995        query.append('message=%s'%urllib.parse.quote(message))
996    for k, v in extra_query_params.items():
997        if not isinstance(k, str) or k != urllib.parse.quote(k):
998            raise Exception(f"illegal key for URI: {repr(k)}")
999        v = urllib.parse.quote(v)
1000        query.append(f"{k}={v}")
1001    p = urllib.parse.ParseResult(
1002        scheme=BITCOIN_BIP21_URI_SCHEME,
1003        netloc='',
1004        path=addr,
1005        params='',
1006        query='&'.join(query),
1007        fragment='',
1008    )
1009    return str(urllib.parse.urlunparse(p))
1010
1011
1012def maybe_extract_bolt11_invoice(data: str) -> Optional[str]:
1013    data = data.strip()  # whitespaces
1014    data = data.lower()
1015    if data.startswith(LIGHTNING_URI_SCHEME + ':ln'):
1016        data = data[10:]
1017    if data.startswith('ln'):
1018        return data
1019    return None
1020
1021
1022# Python bug (http://bugs.python.org/issue1927) causes raw_input
1023# to be redirected improperly between stdin/stderr on Unix systems
1024#TODO: py3
1025def raw_input(prompt=None):
1026    if prompt:
1027        sys.stdout.write(prompt)
1028    return builtin_raw_input()
1029
1030builtin_raw_input = builtins.input
1031builtins.input = raw_input
1032
1033
1034def parse_json(message):
1035    # TODO: check \r\n pattern
1036    n = message.find(b'\n')
1037    if n==-1:
1038        return None, message
1039    try:
1040        j = json.loads(message[0:n].decode('utf8'))
1041    except:
1042        j = None
1043    return j, message[n+1:]
1044
1045
1046def setup_thread_excepthook():
1047    """
1048    Workaround for `sys.excepthook` thread bug from:
1049    http://bugs.python.org/issue1230540
1050
1051    Call once from the main thread before creating any threads.
1052    """
1053
1054    init_original = threading.Thread.__init__
1055
1056    def init(self, *args, **kwargs):
1057
1058        init_original(self, *args, **kwargs)
1059        run_original = self.run
1060
1061        def run_with_except_hook(*args2, **kwargs2):
1062            try:
1063                run_original(*args2, **kwargs2)
1064            except Exception:
1065                sys.excepthook(*sys.exc_info())
1066
1067        self.run = run_with_except_hook
1068
1069    threading.Thread.__init__ = init
1070
1071
1072def send_exception_to_crash_reporter(e: BaseException):
1073    sys.excepthook(type(e), e, e.__traceback__)
1074
1075
1076def versiontuple(v):
1077    return tuple(map(int, (v.split("."))))
1078
1079
1080def read_json_file(path):
1081    try:
1082        with open(path, 'r', encoding='utf-8') as f:
1083            data = json.loads(f.read())
1084    #backwards compatibility for JSONDecodeError
1085    except ValueError:
1086        _logger.exception('')
1087        raise FileImportFailed(_("Invalid JSON code."))
1088    except BaseException as e:
1089        _logger.exception('')
1090        raise FileImportFailed(e)
1091    return data
1092
1093def write_json_file(path, data):
1094    try:
1095        with open(path, 'w+', encoding='utf-8') as f:
1096            json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder)
1097    except (IOError, os.error) as e:
1098        _logger.exception('')
1099        raise FileExportFailed(e)
1100
1101
1102def make_dir(path, allow_symlink=True):
1103    """Make directory if it does not yet exist."""
1104    if not os.path.exists(path):
1105        if not allow_symlink and os.path.islink(path):
1106            raise Exception('Dangling link: ' + path)
1107        os.mkdir(path)
1108        os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1109
1110
1111def log_exceptions(func):
1112    """Decorator to log AND re-raise exceptions."""
1113    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
1114    @functools.wraps(func)
1115    async def wrapper(*args, **kwargs):
1116        self = args[0] if len(args) > 0 else None
1117        try:
1118            return await func(*args, **kwargs)
1119        except asyncio.CancelledError as e:
1120            raise
1121        except BaseException as e:
1122            mylogger = self.logger if hasattr(self, 'logger') else _logger
1123            try:
1124                mylogger.exception(f"Exception in {func.__name__}: {repr(e)}")
1125            except BaseException as e2:
1126                print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}")
1127            raise
1128    return wrapper
1129
1130
1131def ignore_exceptions(func):
1132    """Decorator to silently swallow all exceptions."""
1133    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
1134    @functools.wraps(func)
1135    async def wrapper(*args, **kwargs):
1136        try:
1137            return await func(*args, **kwargs)
1138        except asyncio.CancelledError:
1139            # note: with python 3.8, CancelledError no longer inherits Exception, so this catch is redundant
1140            raise
1141        except Exception as e:
1142            pass
1143    return wrapper
1144
1145
1146def with_lock(func):
1147    """Decorator to enforce a lock on a function call."""
1148    def func_wrapper(self, *args, **kwargs):
1149        with self.lock:
1150            return func(self, *args, **kwargs)
1151    return func_wrapper
1152
1153
1154class TxMinedInfo(NamedTuple):
1155    height: int                        # height of block that mined tx
1156    conf: Optional[int] = None         # number of confirmations, SPV verified (None means unknown)
1157    timestamp: Optional[int] = None    # timestamp of block that mined tx
1158    txpos: Optional[int] = None        # position of tx in serialized block
1159    header_hash: Optional[str] = None  # hash of block that mined tx
1160
1161
1162def make_aiohttp_session(proxy: Optional[dict], headers=None, timeout=None):
1163    if headers is None:
1164        headers = {'User-Agent': 'Electrum'}
1165    if timeout is None:
1166        # The default timeout is high intentionally.
1167        # DNS on some systems can be really slow, see e.g. #5337
1168        timeout = aiohttp.ClientTimeout(total=45)
1169    elif isinstance(timeout, (int, float)):
1170        timeout = aiohttp.ClientTimeout(total=timeout)
1171    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
1172
1173    if proxy:
1174        connector = ProxyConnector(
1175            proxy_type=ProxyType.SOCKS5 if proxy['mode'] == 'socks5' else ProxyType.SOCKS4,
1176            host=proxy['host'],
1177            port=int(proxy['port']),
1178            username=proxy.get('user', None),
1179            password=proxy.get('password', None),
1180            rdns=True,
1181            ssl=ssl_context,
1182        )
1183    else:
1184        connector = aiohttp.TCPConnector(ssl=ssl_context)
1185
1186    return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
1187
1188
1189class SilentTaskGroup(TaskGroup):
1190
1191    def spawn(self, *args, **kwargs):
1192        # don't complain if group is already closed.
1193        if self._closed:
1194            raise asyncio.CancelledError()
1195        return super().spawn(*args, **kwargs)
1196
1197
1198class NetworkJobOnDefaultServer(Logger, ABC):
1199    """An abstract base class for a job that runs on the main network
1200    interface. Every time the main interface changes, the job is
1201    restarted, and some of its internals are reset.
1202    """
1203    def __init__(self, network: 'Network'):
1204        Logger.__init__(self)
1205        asyncio.set_event_loop(network.asyncio_loop)
1206        self.network = network
1207        self.interface = None  # type: Interface
1208        self._restart_lock = asyncio.Lock()
1209        # Ensure fairness between NetworkJobs. e.g. if multiple wallets
1210        # are open, a large wallet's Synchronizer should not starve the small wallets:
1211        self._network_request_semaphore = asyncio.Semaphore(100)
1212
1213        self._reset()
1214        # every time the main interface changes, restart:
1215        register_callback(self._restart, ['default_server_changed'])
1216        # also schedule a one-off restart now, as there might already be a main interface:
1217        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
1218
1219    def _reset(self):
1220        """Initialise fields. Called every time the underlying
1221        server connection changes.
1222        """
1223        self.taskgroup = SilentTaskGroup()
1224
1225    async def _start(self, interface: 'Interface'):
1226        self.interface = interface
1227        await interface.taskgroup.spawn(self._run_tasks(taskgroup=self.taskgroup))
1228
1229    @abstractmethod
1230    async def _run_tasks(self, *, taskgroup: TaskGroup) -> None:
1231        """Start tasks in taskgroup. Called every time the underlying
1232        server connection changes.
1233        """
1234        # If self.taskgroup changed, don't start tasks. This can happen if we have
1235        # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
1236        if taskgroup != self.taskgroup:
1237            raise asyncio.CancelledError()
1238
1239    async def stop(self, *, full_shutdown: bool = True):
1240        if full_shutdown:
1241            unregister_callback(self._restart)
1242        await self.taskgroup.cancel_remaining()
1243
1244    @log_exceptions
1245    async def _restart(self, *args):
1246        interface = self.network.interface
1247        if interface is None:
1248            return  # we should get called again soon
1249
1250        async with self._restart_lock:
1251            await self.stop(full_shutdown=False)
1252            self._reset()
1253            await self._start(interface)
1254
1255    @property
1256    def session(self):
1257        s = self.interface.session
1258        assert s is not None
1259        return s
1260
1261
1262def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
1263                                           asyncio.Future,
1264                                           threading.Thread]:
1265    def on_exception(loop, context):
1266        """Suppress spurious messages it appears we cannot control."""
1267        SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
1268                                            'SSL error in data received')
1269        message = context.get('message')
1270        if message and SUPPRESS_MESSAGE_REGEX.match(message):
1271            return
1272        loop.default_exception_handler(context)
1273
1274    loop = asyncio.get_event_loop()
1275    loop.set_exception_handler(on_exception)
1276    # loop.set_debug(1)
1277    stopping_fut = asyncio.Future()
1278    loop_thread = threading.Thread(target=loop.run_until_complete,
1279                                         args=(stopping_fut,),
1280                                         name='EventLoop')
1281    loop_thread.start()
1282    return loop, stopping_fut, loop_thread
1283
1284
1285class OrderedDictWithIndex(OrderedDict):
1286    """An OrderedDict that keeps track of the positions of keys.
1287
1288    Note: very inefficient to modify contents, except to add new items.
1289    """
1290
1291    def __init__(self):
1292        super().__init__()
1293        self._key_to_pos = {}
1294        self._pos_to_key = {}
1295
1296    def _recalc_index(self):
1297        self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())}
1298        self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())}
1299
1300    def pos_from_key(self, key):
1301        return self._key_to_pos[key]
1302
1303    def value_from_pos(self, pos):
1304        key = self._pos_to_key[pos]
1305        return self[key]
1306
1307    def popitem(self, *args, **kwargs):
1308        ret = super().popitem(*args, **kwargs)
1309        self._recalc_index()
1310        return ret
1311
1312    def move_to_end(self, *args, **kwargs):
1313        ret = super().move_to_end(*args, **kwargs)
1314        self._recalc_index()
1315        return ret
1316
1317    def clear(self):
1318        ret = super().clear()
1319        self._recalc_index()
1320        return ret
1321
1322    def pop(self, *args, **kwargs):
1323        ret = super().pop(*args, **kwargs)
1324        self._recalc_index()
1325        return ret
1326
1327    def update(self, *args, **kwargs):
1328        ret = super().update(*args, **kwargs)
1329        self._recalc_index()
1330        return ret
1331
1332    def __delitem__(self, *args, **kwargs):
1333        ret = super().__delitem__(*args, **kwargs)
1334        self._recalc_index()
1335        return ret
1336
1337    def __setitem__(self, key, *args, **kwargs):
1338        is_new_key = key not in self
1339        ret = super().__setitem__(key, *args, **kwargs)
1340        if is_new_key:
1341            pos = len(self) - 1
1342            self._key_to_pos[key] = pos
1343            self._pos_to_key[pos] = key
1344        return ret
1345
1346
1347def multisig_type(wallet_type):
1348    '''If wallet_type is mofn multi-sig, return [m, n],
1349    otherwise return None.'''
1350    if not wallet_type:
1351        return None
1352    match = re.match(r'(\d+)of(\d+)', wallet_type)
1353    if match:
1354        match = [int(x) for x in match.group(1, 2)]
1355    return match
1356
1357
1358def is_ip_address(x: Union[str, bytes]) -> bool:
1359    if isinstance(x, bytes):
1360        x = x.decode("utf-8")
1361    try:
1362        ipaddress.ip_address(x)
1363        return True
1364    except ValueError:
1365        return False
1366
1367
1368def is_private_netaddress(host: str) -> bool:
1369    if str(host) in ('localhost', 'localhost.',):
1370        return True
1371    if host[0] == '[' and host[-1] == ']':  # IPv6
1372        host = host[1:-1]
1373    try:
1374        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
1375        return ip_addr.is_private
1376    except ValueError:
1377        pass  # not an IP
1378    return False
1379
1380
1381def list_enabled_bits(x: int) -> Sequence[int]:
1382    """e.g. 77 (0b1001101) --> (0, 2, 3, 6)"""
1383    binary = bin(x)[2:]
1384    rev_bin = reversed(binary)
1385    return tuple(i for i, b in enumerate(rev_bin) if b == '1')
1386
1387
1388def resolve_dns_srv(host: str):
1389    srv_records = dns.resolver.resolve(host, 'SRV')
1390    # priority: prefer lower
1391    # weight: tie breaker; prefer higher
1392    srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
1393
1394    def dict_from_srv_record(srv):
1395        return {
1396            'host': str(srv.target),
1397            'port': srv.port,
1398        }
1399    return [dict_from_srv_record(srv) for srv in srv_records]
1400
1401
1402def randrange(bound: int) -> int:
1403    """Return a random integer k such that 1 <= k < bound, uniformly
1404    distributed across that range."""
1405    # secrets.randbelow(bound) returns a random int: 0 <= r < bound,
1406    # hence transformations:
1407    return secrets.randbelow(bound - 1) + 1
1408
1409
1410class CallbackManager:
1411    # callbacks set by the GUI or any thread
1412    # guarantee: the callbacks will always get triggered from the asyncio thread.
1413
1414    def __init__(self):
1415        self.callback_lock = threading.Lock()
1416        self.callbacks = defaultdict(list)      # note: needs self.callback_lock
1417        self.asyncio_loop = None
1418
1419    def register_callback(self, callback, events):
1420        with self.callback_lock:
1421            for event in events:
1422                self.callbacks[event].append(callback)
1423
1424    def unregister_callback(self, callback):
1425        with self.callback_lock:
1426            for callbacks in self.callbacks.values():
1427                if callback in callbacks:
1428                    callbacks.remove(callback)
1429
1430    def trigger_callback(self, event, *args):
1431        """Trigger a callback with given arguments.
1432        Can be called from any thread. The callback itself will get scheduled
1433        on the event loop.
1434        """
1435        if self.asyncio_loop is None:
1436            self.asyncio_loop = asyncio.get_event_loop()
1437            assert self.asyncio_loop.is_running(), "event loop not running"
1438        with self.callback_lock:
1439            callbacks = self.callbacks[event][:]
1440        for callback in callbacks:
1441            # FIXME: if callback throws, we will lose the traceback
1442            if asyncio.iscoroutinefunction(callback):
1443                asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop)
1444            else:
1445                self.asyncio_loop.call_soon_threadsafe(callback, event, *args)
1446
1447
1448callback_mgr = CallbackManager()
1449trigger_callback = callback_mgr.trigger_callback
1450register_callback = callback_mgr.register_callback
1451unregister_callback = callback_mgr.unregister_callback
1452
1453
1454_NetAddrType = TypeVar("_NetAddrType")
1455
1456
1457class NetworkRetryManager(Generic[_NetAddrType]):
1458    """Truncated Exponential Backoff for network connections."""
1459
1460    def __init__(
1461            self, *,
1462            max_retry_delay_normal: float,
1463            init_retry_delay_normal: float,
1464            max_retry_delay_urgent: float = None,
1465            init_retry_delay_urgent: float = None,
1466    ):
1467        self._last_tried_addr = {}  # type: Dict[_NetAddrType, Tuple[float, int]]  # (unix ts, num_attempts)
1468
1469        # note: these all use "seconds" as unit
1470        if max_retry_delay_urgent is None:
1471            max_retry_delay_urgent = max_retry_delay_normal
1472        if init_retry_delay_urgent is None:
1473            init_retry_delay_urgent = init_retry_delay_normal
1474        self._max_retry_delay_normal = max_retry_delay_normal
1475        self._init_retry_delay_normal = init_retry_delay_normal
1476        self._max_retry_delay_urgent = max_retry_delay_urgent
1477        self._init_retry_delay_urgent = init_retry_delay_urgent
1478
1479    def _trying_addr_now(self, addr: _NetAddrType) -> None:
1480        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
1481        # we add up to 1 second of noise to the time, so that clients are less likely
1482        # to get synchronised and bombard the remote in connection waves:
1483        cur_time = time.time() + random.random()
1484        self._last_tried_addr[addr] = cur_time, num_attempts + 1
1485
1486    def _on_connection_successfully_established(self, addr: _NetAddrType) -> None:
1487        self._last_tried_addr[addr] = time.time(), 0
1488
1489    def _can_retry_addr(self, addr: _NetAddrType, *,
1490                        now: float = None, urgent: bool = False) -> bool:
1491        if now is None:
1492            now = time.time()
1493        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
1494        if urgent:
1495            max_delay = self._max_retry_delay_urgent
1496            init_delay = self._init_retry_delay_urgent
1497        else:
1498            max_delay = self._max_retry_delay_normal
1499            init_delay = self._init_retry_delay_normal
1500        delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts)
1501        next_time = last_time + delay
1502        return next_time < now
1503
1504    @classmethod
1505    def __calc_delay(cls, *, multiplier: float, max_delay: float,
1506                     num_attempts: int) -> float:
1507        num_attempts = min(num_attempts, 100_000)
1508        try:
1509            res = multiplier * 2 ** num_attempts
1510        except OverflowError:
1511            return max_delay
1512        return max(0, min(max_delay, res))
1513
1514    def _clear_addr_retry_times(self) -> None:
1515        self._last_tried_addr.clear()
1516
1517
1518class MySocksProxy(aiorpcx.SOCKSProxy):
1519
1520    async def open_connection(self, host=None, port=None, **kwargs):
1521        loop = asyncio.get_event_loop()
1522        reader = asyncio.StreamReader(loop=loop)
1523        protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
1524        transport, _ = await self.create_connection(
1525            lambda: protocol, host, port, **kwargs)
1526        writer = asyncio.StreamWriter(transport, protocol, reader, loop)
1527        return reader, writer
1528
1529    @classmethod
1530    def from_proxy_dict(cls, proxy: dict = None) -> Optional['MySocksProxy']:
1531        if not proxy:
1532            return None
1533        username, pw = proxy.get('user'), proxy.get('password')
1534        if not username or not pw:
1535            auth = None
1536        else:
1537            auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
1538        addr = aiorpcx.NetAddress(proxy['host'], proxy['port'])
1539        if proxy['mode'] == "socks4":
1540            ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
1541        elif proxy['mode'] == "socks5":
1542            ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
1543        else:
1544            raise NotImplementedError  # http proxy not available with aiorpcx
1545        return ret
1546
1547
1548class JsonRPCClient:
1549
1550    def __init__(self, session: aiohttp.ClientSession, url: str):
1551        self.session = session
1552        self.url = url
1553        self._id = 0
1554
1555    async def request(self, endpoint, *args):
1556        self._id += 1
1557        data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
1558                % (self._id, endpoint, json.dumps(args)))
1559        async with self.session.post(self.url, data=data) as resp:
1560            if resp.status == 200:
1561                r = await resp.json()
1562                result = r.get('result')
1563                error = r.get('error')
1564                if error:
1565                    return 'Error: ' + str(error)
1566                else:
1567                    return result
1568            else:
1569                text = await resp.text()
1570                return 'Error: ' + str(text)
1571
1572    def add_method(self, endpoint):
1573        async def coro(*args):
1574            return await self.request(endpoint, *args)
1575        setattr(self, endpoint, coro)
1576
1577
1578T = TypeVar('T')
1579
1580def random_shuffled_copy(x: Iterable[T]) -> List[T]:
1581    """Returns a shuffled copy of the input."""
1582    x_copy = list(x)  # copy
1583    random.shuffle(x_copy)  # shuffle in-place
1584    return x_copy
1585
1586
1587def test_read_write_permissions(path) -> None:
1588    # note: There might already be a file at 'path'.
1589    #       Make sure we do NOT overwrite/corrupt that!
1590    temp_path = "%s.tmptest.%s" % (path, os.getpid())
1591    echo = "fs r/w test"
1592    try:
1593        # test READ permissions for actual path
1594        if os.path.exists(path):
1595            with open(path, "rb") as f:
1596                f.read(1)  # read 1 byte
1597        # test R/W sanity for "similar" path
1598        with open(temp_path, "w", encoding='utf-8') as f:
1599            f.write(echo)
1600        with open(temp_path, "r", encoding='utf-8') as f:
1601            echo2 = f.read()
1602        os.remove(temp_path)
1603    except Exception as e:
1604        raise IOError(e) from e
1605    if echo != echo2:
1606        raise IOError('echo sanity-check failed')
1607
1608
1609class nullcontext:
1610    """Context manager that does no additional processing.
1611    This is a ~backport of contextlib.nullcontext from Python 3.10
1612    """
1613
1614    def __init__(self, enter_result=None):
1615        self.enter_result = enter_result
1616
1617    def __enter__(self):
1618        return self.enter_result
1619
1620    def __exit__(self, *excinfo):
1621        pass
1622
1623    async def __aenter__(self):
1624        return self.enter_result
1625
1626    async def __aexit__(self, *excinfo):
1627        pass
1628
1629def get_running_loop():
1630    """Mimics _get_running_loop convenient functionality for sanity checks on all python versions"""
1631    if sys.version_info < (3, 7):
1632        return asyncio._get_running_loop()
1633    try:
1634        return asyncio.get_running_loop()
1635    except RuntimeError:
1636        return None
1637