1# Electrum - lightweight Bitcoin client 2# Copyright (C) 2011 Thomas Voegtlin 3# 4# Permission is hereby granted, free of charge, to any person 5# obtaining a copy of this software and associated documentation files 6# (the "Software"), to deal in the Software without restriction, 7# including without limitation the rights to use, copy, modify, merge, 8# publish, distribute, sublicense, and/or sell copies of the Software, 9# and to permit persons to whom the Software is furnished to do so, 10# subject to the following conditions: 11# 12# The above copyright notice and this permission notice shall be 13# included in all copies or substantial portions of the Software. 14# 15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 16# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 17# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 18# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 19# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 20# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 21# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 22# SOFTWARE. 23import binascii 24import os, sys, re, json 25from collections import defaultdict, OrderedDict 26from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any, 27 Sequence, Dict, Generic, TypeVar, List, Iterable, Set) 28from datetime import datetime 29import decimal 30from decimal import Decimal 31import traceback 32import urllib 33import threading 34import hmac 35import stat 36from locale import localeconv 37import asyncio 38import urllib.request, urllib.parse, urllib.error 39import builtins 40import json 41import time 42from typing import NamedTuple, Optional 43import ssl 44import ipaddress 45from ipaddress import IPv4Address, IPv6Address 46import random 47import secrets 48import functools 49from abc import abstractmethod, ABC 50 51import attr 52import aiohttp 53from aiohttp_socks import ProxyConnector, ProxyType 54import aiorpcx 55from aiorpcx import TaskGroup 56import certifi 57import dns.resolver 58 59from .i18n import _ 60from .logging import get_logger, Logger 61 62if TYPE_CHECKING: 63 from .network import Network 64 from .interface import Interface 65 from .simple_config import SimpleConfig 66 67 68_logger = get_logger(__name__) 69 70 71def inv_dict(d): 72 return {v: k for k, v in d.items()} 73 74 75def all_subclasses(cls) -> Set: 76 """Return all (transitive) subclasses of cls.""" 77 res = set(cls.__subclasses__()) 78 for sub in res.copy(): 79 res |= all_subclasses(sub) 80 return res 81 82 83ca_path = certifi.where() 84 85 86base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0} 87base_units_inverse = inv_dict(base_units) 88base_units_list = ['BTC', 'mBTC', 'bits', 'sat'] # list(dict) does not guarantee order 89 90DECIMAL_POINT_DEFAULT = 5 # mBTC 91 92 93class UnknownBaseUnit(Exception): pass 94 95 96def decimal_point_to_base_unit_name(dp: int) -> str: 97 # e.g. 8 -> "BTC" 98 try: 99 return base_units_inverse[dp] 100 except KeyError: 101 raise UnknownBaseUnit(dp) from None 102 103 104def base_unit_name_to_decimal_point(unit_name: str) -> int: 105 # e.g. "BTC" -> 8 106 try: 107 return base_units[unit_name] 108 except KeyError: 109 raise UnknownBaseUnit(unit_name) from None 110 111 112class NotEnoughFunds(Exception): 113 def __str__(self): 114 return _("Insufficient funds") 115 116 117class NoDynamicFeeEstimates(Exception): 118 def __str__(self): 119 return _('Dynamic fee estimates not available') 120 121 122class MultipleSpendMaxTxOutputs(Exception): 123 def __str__(self): 124 return _('At most one output can be set to spend max') 125 126 127class InvalidPassword(Exception): 128 def __str__(self): 129 return _("Incorrect password") 130 131 132class AddTransactionException(Exception): 133 pass 134 135 136class UnrelatedTransactionException(AddTransactionException): 137 def __str__(self): 138 return _("Transaction is unrelated to this wallet.") 139 140 141class FileImportFailed(Exception): 142 def __init__(self, message=''): 143 self.message = str(message) 144 145 def __str__(self): 146 return _("Failed to import from file.") + "\n" + self.message 147 148 149class FileExportFailed(Exception): 150 def __init__(self, message=''): 151 self.message = str(message) 152 153 def __str__(self): 154 return _("Failed to export to file.") + "\n" + self.message 155 156 157class WalletFileException(Exception): pass 158 159 160class BitcoinException(Exception): pass 161 162 163class UserFacingException(Exception): 164 """Exception that contains information intended to be shown to the user.""" 165 166 167class InvoiceError(UserFacingException): pass 168 169 170# Throw this exception to unwind the stack like when an error occurs. 171# However unlike other exceptions the user won't be informed. 172class UserCancelled(Exception): 173 '''An exception that is suppressed from the user''' 174 pass 175 176 177# note: this is not a NamedTuple as then its json encoding cannot be customized 178class Satoshis(object): 179 __slots__ = ('value',) 180 181 def __new__(cls, value): 182 self = super(Satoshis, cls).__new__(cls) 183 # note: 'value' sometimes has msat precision 184 self.value = value 185 return self 186 187 def __repr__(self): 188 return f'Satoshis({self.value})' 189 190 def __str__(self): 191 # note: precision is truncated to satoshis here 192 return format_satoshis(self.value) 193 194 def __eq__(self, other): 195 return self.value == other.value 196 197 def __ne__(self, other): 198 return not (self == other) 199 200 def __add__(self, other): 201 return Satoshis(self.value + other.value) 202 203 204# note: this is not a NamedTuple as then its json encoding cannot be customized 205class Fiat(object): 206 __slots__ = ('value', 'ccy') 207 208 def __new__(cls, value: Optional[Decimal], ccy: str): 209 self = super(Fiat, cls).__new__(cls) 210 self.ccy = ccy 211 if not isinstance(value, (Decimal, type(None))): 212 raise TypeError(f"value should be Decimal or None, not {type(value)}") 213 self.value = value 214 return self 215 216 def __repr__(self): 217 return 'Fiat(%s)'% self.__str__() 218 219 def __str__(self): 220 if self.value is None or self.value.is_nan(): 221 return _('No Data') 222 else: 223 return "{:.2f}".format(self.value) 224 225 def to_ui_string(self): 226 if self.value is None or self.value.is_nan(): 227 return _('No Data') 228 else: 229 return "{:.2f}".format(self.value) + ' ' + self.ccy 230 231 def __eq__(self, other): 232 if not isinstance(other, Fiat): 233 return False 234 if self.ccy != other.ccy: 235 return False 236 if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \ 237 and self.value.is_nan() and other.value.is_nan(): 238 return True 239 return self.value == other.value 240 241 def __ne__(self, other): 242 return not (self == other) 243 244 def __add__(self, other): 245 assert self.ccy == other.ccy 246 return Fiat(self.value + other.value, self.ccy) 247 248 249class MyEncoder(json.JSONEncoder): 250 def default(self, obj): 251 # note: this does not get called for namedtuples :( https://bugs.python.org/issue30343 252 from .transaction import Transaction, TxOutput 253 from .lnutil import UpdateAddHtlc 254 if isinstance(obj, UpdateAddHtlc): 255 return obj.to_tuple() 256 if isinstance(obj, Transaction): 257 return obj.serialize() 258 if isinstance(obj, TxOutput): 259 return obj.to_legacy_tuple() 260 if isinstance(obj, Satoshis): 261 return str(obj) 262 if isinstance(obj, Fiat): 263 return str(obj) 264 if isinstance(obj, Decimal): 265 return str(obj) 266 if isinstance(obj, datetime): 267 return obj.isoformat(' ')[:-3] 268 if isinstance(obj, set): 269 return list(obj) 270 if isinstance(obj, bytes): # for nametuples in lnchannel 271 return obj.hex() 272 if hasattr(obj, 'to_json') and callable(obj.to_json): 273 return obj.to_json() 274 return super(MyEncoder, self).default(obj) 275 276 277class ThreadJob(Logger): 278 """A job that is run periodically from a thread's main loop. run() is 279 called from that thread's context. 280 """ 281 282 def __init__(self): 283 Logger.__init__(self) 284 285 def run(self): 286 """Called periodically from the thread""" 287 pass 288 289class DebugMem(ThreadJob): 290 '''A handy class for debugging GC memory leaks''' 291 def __init__(self, classes, interval=30): 292 ThreadJob.__init__(self) 293 self.next_time = 0 294 self.classes = classes 295 self.interval = interval 296 297 def mem_stats(self): 298 import gc 299 self.logger.info("Start memscan") 300 gc.collect() 301 objmap = defaultdict(list) 302 for obj in gc.get_objects(): 303 for class_ in self.classes: 304 if isinstance(obj, class_): 305 objmap[class_].append(obj) 306 for class_, objs in objmap.items(): 307 self.logger.info(f"{class_.__name__}: {len(objs)}") 308 self.logger.info("Finish memscan") 309 310 def run(self): 311 if time.time() > self.next_time: 312 self.mem_stats() 313 self.next_time = time.time() + self.interval 314 315class DaemonThread(threading.Thread, Logger): 316 """ daemon thread that terminates cleanly """ 317 318 LOGGING_SHORTCUT = 'd' 319 320 def __init__(self): 321 threading.Thread.__init__(self) 322 Logger.__init__(self) 323 self.parent_thread = threading.currentThread() 324 self.running = False 325 self.running_lock = threading.Lock() 326 self.job_lock = threading.Lock() 327 self.jobs = [] 328 self.stopped_event = threading.Event() # set when fully stopped 329 330 def add_jobs(self, jobs): 331 with self.job_lock: 332 self.jobs.extend(jobs) 333 334 def run_jobs(self): 335 # Don't let a throwing job disrupt the thread, future runs of 336 # itself, or other jobs. This is useful protection against 337 # malformed or malicious server responses 338 with self.job_lock: 339 for job in self.jobs: 340 try: 341 job.run() 342 except Exception as e: 343 self.logger.exception('') 344 345 def remove_jobs(self, jobs): 346 with self.job_lock: 347 for job in jobs: 348 self.jobs.remove(job) 349 350 def start(self): 351 with self.running_lock: 352 self.running = True 353 return threading.Thread.start(self) 354 355 def is_running(self): 356 with self.running_lock: 357 return self.running and self.parent_thread.is_alive() 358 359 def stop(self): 360 with self.running_lock: 361 self.running = False 362 363 def on_stop(self): 364 if 'ANDROID_DATA' in os.environ: 365 import jnius 366 jnius.detach() 367 self.logger.info("jnius detach") 368 self.logger.info("stopped") 369 self.stopped_event.set() 370 371 372def print_stderr(*args): 373 args = [str(item) for item in args] 374 sys.stderr.write(" ".join(args) + "\n") 375 sys.stderr.flush() 376 377def print_msg(*args): 378 # Stringify args 379 args = [str(item) for item in args] 380 sys.stdout.write(" ".join(args) + "\n") 381 sys.stdout.flush() 382 383def json_encode(obj): 384 try: 385 s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder) 386 except TypeError: 387 s = repr(obj) 388 return s 389 390def json_decode(x): 391 try: 392 return json.loads(x, parse_float=Decimal) 393 except: 394 return x 395 396def json_normalize(x): 397 # note: The return value of commands, when going through the JSON-RPC interface, 398 # is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis. 399 # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded. 400 # see #5868 401 return json_decode(json_encode(x)) 402 403 404# taken from Django Source Code 405def constant_time_compare(val1, val2): 406 """Return True if the two strings are equal, False otherwise.""" 407 return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8')) 408 409 410# decorator that prints execution time 411_profiler_logger = _logger.getChild('profiler') 412def profiler(func): 413 def do_profile(args, kw_args): 414 name = func.__qualname__ 415 t0 = time.time() 416 o = func(*args, **kw_args) 417 t = time.time() - t0 418 _profiler_logger.debug(f"{name} {t:,.4f}") 419 return o 420 return lambda *args, **kw_args: do_profile(args, kw_args) 421 422 423def android_ext_dir(): 424 from android.storage import primary_external_storage_path 425 return primary_external_storage_path() 426 427def android_backup_dir(): 428 d = os.path.join(android_ext_dir(), 'org.electrum.electrum') 429 if not os.path.exists(d): 430 os.mkdir(d) 431 return d 432 433def android_data_dir(): 434 import jnius 435 PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity') 436 return PythonActivity.mActivity.getFilesDir().getPath() + '/data' 437 438def ensure_sparse_file(filename): 439 # On modern Linux, no need to do anything. 440 # On Windows, need to explicitly mark file. 441 if os.name == "nt": 442 try: 443 os.system('fsutil sparse setflag "{}" 1'.format(filename)) 444 except Exception as e: 445 _logger.info(f'error marking file {filename} as sparse: {e}') 446 447 448def get_headers_dir(config): 449 return config.path 450 451 452def assert_datadir_available(config_path): 453 path = config_path 454 if os.path.exists(path): 455 return 456 else: 457 raise FileNotFoundError( 458 'Electrum datadir does not exist. Was it deleted while running?' + '\n' + 459 'Should be at {}'.format(path)) 460 461 462def assert_file_in_datadir_available(path, config_path): 463 if os.path.exists(path): 464 return 465 else: 466 assert_datadir_available(config_path) 467 raise FileNotFoundError( 468 'Cannot find file but datadir is there.' + '\n' + 469 'Should be at {}'.format(path)) 470 471 472def standardize_path(path): 473 return os.path.normcase( 474 os.path.realpath( 475 os.path.abspath( 476 os.path.expanduser( 477 path 478 )))) 479 480 481def get_new_wallet_name(wallet_folder: str) -> str: 482 i = 1 483 while True: 484 filename = "wallet_%d" % i 485 if filename in os.listdir(wallet_folder): 486 i += 1 487 else: 488 break 489 return filename 490 491 492def assert_bytes(*args): 493 """ 494 porting helper, assert args type 495 """ 496 try: 497 for x in args: 498 assert isinstance(x, (bytes, bytearray)) 499 except: 500 print('assert bytes failed', list(map(type, args))) 501 raise 502 503 504def assert_str(*args): 505 """ 506 porting helper, assert args type 507 """ 508 for x in args: 509 assert isinstance(x, str) 510 511 512def to_string(x, enc) -> str: 513 if isinstance(x, (bytes, bytearray)): 514 return x.decode(enc) 515 if isinstance(x, str): 516 return x 517 else: 518 raise TypeError("Not a string or bytes like object") 519 520 521def to_bytes(something, encoding='utf8') -> bytes: 522 """ 523 cast string to bytes() like object, but for python2 support it's bytearray copy 524 """ 525 if isinstance(something, bytes): 526 return something 527 if isinstance(something, str): 528 return something.encode(encoding) 529 elif isinstance(something, bytearray): 530 return bytes(something) 531 else: 532 raise TypeError("Not a string or bytes like object") 533 534 535bfh = bytes.fromhex 536 537 538def bh2u(x: bytes) -> str: 539 """ 540 str with hex representation of a bytes-like object 541 542 >>> x = bytes((1, 2, 10)) 543 >>> bh2u(x) 544 '01020A' 545 """ 546 return x.hex() 547 548 549def xor_bytes(a: bytes, b: bytes) -> bytes: 550 size = min(len(a), len(b)) 551 return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big")) 552 .to_bytes(size, "big")) 553 554 555def user_dir(): 556 if "ELECTRUMDIR" in os.environ: 557 return os.environ["ELECTRUMDIR"] 558 elif 'ANDROID_DATA' in os.environ: 559 return android_data_dir() 560 elif os.name == 'posix': 561 return os.path.join(os.environ["HOME"], ".electrum") 562 elif "APPDATA" in os.environ: 563 return os.path.join(os.environ["APPDATA"], "Electrum") 564 elif "LOCALAPPDATA" in os.environ: 565 return os.path.join(os.environ["LOCALAPPDATA"], "Electrum") 566 else: 567 #raise Exception("No home directory found in environment variables.") 568 return 569 570 571def resource_path(*parts): 572 return os.path.join(pkg_dir, *parts) 573 574 575# absolute path to python package folder of electrum ("lib") 576pkg_dir = os.path.split(os.path.realpath(__file__))[0] 577 578 579def is_valid_email(s): 580 regexp = r"[^@]+@[^@]+\.[^@]+" 581 return re.match(regexp, s) is not None 582 583 584def is_hash256_str(text: Any) -> bool: 585 if not isinstance(text, str): return False 586 if len(text) != 64: return False 587 return is_hex_str(text) 588 589 590def is_hex_str(text: Any) -> bool: 591 if not isinstance(text, str): return False 592 try: 593 b = bytes.fromhex(text) 594 except: 595 return False 596 # forbid whitespaces in text: 597 if len(text) != 2 * len(b): 598 return False 599 return True 600 601 602def is_integer(val: Any) -> bool: 603 return isinstance(val, int) 604 605 606def is_non_negative_integer(val: Any) -> bool: 607 if is_integer(val): 608 return val >= 0 609 return False 610 611 612def is_int_or_float(val: Any) -> bool: 613 return isinstance(val, (int, float)) 614 615 616def is_non_negative_int_or_float(val: Any) -> bool: 617 if is_int_or_float(val): 618 return val >= 0 619 return False 620 621 622def chunks(items, size: int): 623 """Break up items, an iterable, into chunks of length size.""" 624 if size < 1: 625 raise ValueError(f"size must be positive, not {repr(size)}") 626 for i in range(0, len(items), size): 627 yield items[i: i + size] 628 629 630def format_satoshis_plain(x, *, decimal_point=8) -> str: 631 """Display a satoshi amount scaled. Always uses a '.' as a decimal 632 point and has no thousands separator""" 633 if x == '!': 634 return 'max' 635 scale_factor = pow(10, decimal_point) 636 return "{:.8f}".format(Decimal(x) / scale_factor).rstrip('0').rstrip('.') 637 638 639# Check that Decimal precision is sufficient. 640# We need at the very least ~20, as we deal with msat amounts, and 641# log10(21_000_000 * 10**8 * 1000) ~= 18.3 642# decimal.DefaultContext.prec == 28 by default, but it is mutable. 643# We enforce that we have at least that available. 644assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}" 645 646DECIMAL_POINT = localeconv()['decimal_point'] # type: str 647 648 649def format_satoshis( 650 x, # in satoshis 651 *, 652 num_zeros=0, 653 decimal_point=8, 654 precision=None, 655 is_diff=False, 656 whitespaces=False, 657) -> str: 658 if x is None: 659 return 'unknown' 660 if x == '!': 661 return 'max' 662 if precision is None: 663 precision = decimal_point 664 # format string 665 decimal_format = "." + str(precision) if precision > 0 else "" 666 if is_diff: 667 decimal_format = '+' + decimal_format 668 # initial result 669 scale_factor = pow(10, decimal_point) 670 if not isinstance(x, Decimal): 671 x = Decimal(x).quantize(Decimal('1E-8')) 672 result = ("{:" + decimal_format + "f}").format(x / scale_factor) 673 if "." not in result: result += "." 674 result = result.rstrip('0') 675 # extra decimal places 676 integer_part, fract_part = result.split(".") 677 if len(fract_part) < num_zeros: 678 fract_part += "0" * (num_zeros - len(fract_part)) 679 result = integer_part + DECIMAL_POINT + fract_part 680 # leading/trailing whitespaces 681 if whitespaces: 682 result += " " * (decimal_point - len(fract_part)) 683 result = " " * (15 - len(result)) + result 684 return result 685 686 687FEERATE_PRECISION = 1 # num fractional decimal places for sat/byte fee rates 688_feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION) 689 690 691def format_fee_satoshis(fee, *, num_zeros=0, precision=None): 692 if precision is None: 693 precision = FEERATE_PRECISION 694 num_zeros = min(num_zeros, FEERATE_PRECISION) # no more zeroes than available prec 695 return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision) 696 697 698def quantize_feerate(fee) -> Union[None, Decimal, int]: 699 """Strip sat/byte fee rate of excess precision.""" 700 if fee is None: 701 return None 702 return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN) 703 704 705def timestamp_to_datetime(timestamp: Optional[int]) -> Optional[datetime]: 706 if timestamp is None: 707 return None 708 return datetime.fromtimestamp(timestamp) 709 710def format_time(timestamp): 711 date = timestamp_to_datetime(timestamp) 712 return date.isoformat(' ')[:-3] if date else _("Unknown") 713 714 715# Takes a timestamp and returns a string with the approximation of the age 716def age(from_date, since_date = None, target_tz=None, include_seconds=False): 717 if from_date is None: 718 return "Unknown" 719 720 from_date = datetime.fromtimestamp(from_date) 721 if since_date is None: 722 since_date = datetime.now(target_tz) 723 724 td = time_difference(from_date - since_date, include_seconds) 725 return td + " ago" if from_date < since_date else "in " + td 726 727 728def time_difference(distance_in_time, include_seconds): 729 #distance_in_time = since_date - from_date 730 distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds))) 731 distance_in_minutes = int(round(distance_in_seconds/60)) 732 733 if distance_in_minutes == 0: 734 if include_seconds: 735 return "%s seconds" % distance_in_seconds 736 else: 737 return "less than a minute" 738 elif distance_in_minutes < 45: 739 return "%s minutes" % distance_in_minutes 740 elif distance_in_minutes < 90: 741 return "about 1 hour" 742 elif distance_in_minutes < 1440: 743 return "about %d hours" % (round(distance_in_minutes / 60.0)) 744 elif distance_in_minutes < 2880: 745 return "1 day" 746 elif distance_in_minutes < 43220: 747 return "%d days" % (round(distance_in_minutes / 1440)) 748 elif distance_in_minutes < 86400: 749 return "about 1 month" 750 elif distance_in_minutes < 525600: 751 return "%d months" % (round(distance_in_minutes / 43200)) 752 elif distance_in_minutes < 1051200: 753 return "about 1 year" 754 else: 755 return "over %d years" % (round(distance_in_minutes / 525600)) 756 757mainnet_block_explorers = { 758 'Bitupper Explorer': ('https://bitupper.com/en/explorer/bitcoin/', 759 {'tx': 'transactions/', 'addr': 'addresses/'}), 760 'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/', 761 {'tx': 'Transaction/', 'addr': 'Address/'}), 762 'Blockchain.info': ('https://blockchain.com/btc/', 763 {'tx': 'tx/', 'addr': 'address/'}), 764 'blockchainbdgpzk.onion': ('https://blockchainbdgpzk.onion/', 765 {'tx': 'tx/', 'addr': 'address/'}), 766 'Blockstream.info': ('https://blockstream.info/', 767 {'tx': 'tx/', 'addr': 'address/'}), 768 'Bitaps.com': ('https://btc.bitaps.com/', 769 {'tx': '', 'addr': ''}), 770 'BTC.com': ('https://btc.com/', 771 {'tx': '', 'addr': ''}), 772 'Chain.so': ('https://www.chain.so/', 773 {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}), 774 'Insight.is': ('https://insight.bitpay.com/', 775 {'tx': 'tx/', 'addr': 'address/'}), 776 'TradeBlock.com': ('https://tradeblock.com/blockchain/', 777 {'tx': 'tx/', 'addr': 'address/'}), 778 'BlockCypher.com': ('https://live.blockcypher.com/btc/', 779 {'tx': 'tx/', 'addr': 'address/'}), 780 'Blockchair.com': ('https://blockchair.com/bitcoin/', 781 {'tx': 'transaction/', 'addr': 'address/'}), 782 'blockonomics.co': ('https://www.blockonomics.co/', 783 {'tx': 'api/tx?txid=', 'addr': '#/search?q='}), 784 'mempool.space': ('https://mempool.space/', 785 {'tx': 'tx/', 'addr': 'address/'}), 786 'mempool.emzy.de': ('https://mempool.emzy.de/', 787 {'tx': 'tx/', 'addr': 'address/'}), 788 'OXT.me': ('https://oxt.me/', 789 {'tx': 'transaction/', 'addr': 'address/'}), 790 'smartbit.com.au': ('https://www.smartbit.com.au/', 791 {'tx': 'tx/', 'addr': 'address/'}), 792 'mynode.local': ('http://mynode.local:3002/', 793 {'tx': 'tx/', 'addr': 'address/'}), 794 'system default': ('blockchain:/', 795 {'tx': 'tx/', 'addr': 'address/'}), 796} 797 798testnet_block_explorers = { 799 'Bitaps.com': ('https://tbtc.bitaps.com/', 800 {'tx': '', 'addr': ''}), 801 'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/', 802 {'tx': 'tx/', 'addr': 'address/'}), 803 'Blockchain.info': ('https://www.blockchain.com/btc-testnet/', 804 {'tx': 'tx/', 'addr': 'address/'}), 805 'Blockstream.info': ('https://blockstream.info/testnet/', 806 {'tx': 'tx/', 'addr': 'address/'}), 807 'mempool.space': ('https://mempool.space/testnet/', 808 {'tx': 'tx/', 'addr': 'address/'}), 809 'smartbit.com.au': ('https://testnet.smartbit.com.au/', 810 {'tx': 'tx/', 'addr': 'address/'}), 811 'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/', 812 {'tx': 'tx/', 'addr': 'address/'}), 813} 814 815signet_block_explorers = { 816 'bc-2.jp': ('https://explorer.bc-2.jp/', 817 {'tx': 'tx/', 'addr': 'address/'}), 818 'mempool.space': ('https://mempool.space/signet/', 819 {'tx': 'tx/', 'addr': 'address/'}), 820 'bitcoinexplorer.org': ('https://signet.bitcoinexplorer.org/', 821 {'tx': 'tx/', 'addr': 'address/'}), 822 'wakiyamap.dev': ('https://signet-explorer.wakiyamap.dev/', 823 {'tx': 'tx/', 'addr': 'address/'}), 824 'system default': ('blockchain:/', 825 {'tx': 'tx/', 'addr': 'address/'}), 826} 827 828_block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'} 829 830 831def block_explorer_info(): 832 from . import constants 833 if constants.net.NET_NAME == "testnet": 834 return testnet_block_explorers 835 elif constants.net.NET_NAME == "signet": 836 return signet_block_explorers 837 return mainnet_block_explorers 838 839 840def block_explorer(config: 'SimpleConfig') -> Optional[str]: 841 """Returns name of selected block explorer, 842 or None if a custom one (not among hardcoded ones) is configured. 843 """ 844 if config.get('block_explorer_custom') is not None: 845 return None 846 default_ = 'Blockstream.info' 847 be_key = config.get('block_explorer', default_) 848 be_tuple = block_explorer_info().get(be_key) 849 if be_tuple is None: 850 be_key = default_ 851 assert isinstance(be_key, str), f"{be_key!r} should be str" 852 return be_key 853 854 855def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]: 856 custom_be = config.get('block_explorer_custom') 857 if custom_be: 858 if isinstance(custom_be, str): 859 return custom_be, _block_explorer_default_api_loc 860 if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2: 861 return tuple(custom_be) 862 _logger.warning(f"not using 'block_explorer_custom' from config. " 863 f"expected a str or a pair but got {custom_be!r}") 864 return None 865 else: 866 # using one of the hardcoded block explorers 867 return block_explorer_info().get(block_explorer(config)) 868 869 870def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]: 871 be_tuple = block_explorer_tuple(config) 872 if not be_tuple: 873 return 874 explorer_url, explorer_dict = be_tuple 875 kind_str = explorer_dict.get(kind) 876 if kind_str is None: 877 return 878 if explorer_url[-1] != "/": 879 explorer_url += "/" 880 url_parts = [explorer_url, kind_str, item] 881 return ''.join(url_parts) 882 883# URL decode 884#_ud = re.compile('%([0-9a-hA-H]{2})', re.MULTILINE) 885#urldecode = lambda x: _ud.sub(lambda m: chr(int(m.group(1), 16)), x) 886 887 888# note: when checking against these, use .lower() to support case-insensitivity 889BITCOIN_BIP21_URI_SCHEME = 'bitcoin' 890LIGHTNING_URI_SCHEME = 'lightning' 891 892 893class InvalidBitcoinURI(Exception): pass 894 895 896# TODO rename to parse_bip21_uri or similar 897def parse_URI(uri: str, on_pr: Callable = None, *, loop=None) -> dict: 898 """Raises InvalidBitcoinURI on malformed URI.""" 899 from . import bitcoin 900 from .bitcoin import COIN, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC 901 902 if not isinstance(uri, str): 903 raise InvalidBitcoinURI(f"expected string, not {repr(uri)}") 904 905 if ':' not in uri: 906 if not bitcoin.is_address(uri): 907 raise InvalidBitcoinURI("Not a bitcoin address") 908 return {'address': uri} 909 910 u = urllib.parse.urlparse(uri) 911 if u.scheme.lower() != BITCOIN_BIP21_URI_SCHEME: 912 raise InvalidBitcoinURI("Not a bitcoin URI") 913 address = u.path 914 915 # python for android fails to parse query 916 if address.find('?') > 0: 917 address, query = u.path.split('?') 918 pq = urllib.parse.parse_qs(query) 919 else: 920 pq = urllib.parse.parse_qs(u.query) 921 922 for k, v in pq.items(): 923 if len(v) != 1: 924 raise InvalidBitcoinURI(f'Duplicate Key: {repr(k)}') 925 926 out = {k: v[0] for k, v in pq.items()} 927 if address: 928 if not bitcoin.is_address(address): 929 raise InvalidBitcoinURI(f"Invalid bitcoin address: {address}") 930 out['address'] = address 931 if 'amount' in out: 932 am = out['amount'] 933 try: 934 m = re.match(r'([0-9.]+)X([0-9])', am) 935 if m: 936 k = int(m.group(2)) - 8 937 amount = Decimal(m.group(1)) * pow(Decimal(10), k) 938 else: 939 amount = Decimal(am) * COIN 940 if amount > TOTAL_COIN_SUPPLY_LIMIT_IN_BTC * COIN: 941 raise InvalidBitcoinURI(f"amount is out-of-bounds: {amount!r} BTC") 942 out['amount'] = int(amount) 943 except Exception as e: 944 raise InvalidBitcoinURI(f"failed to parse 'amount' field: {repr(e)}") from e 945 if 'message' in out: 946 out['message'] = out['message'] 947 out['memo'] = out['message'] 948 if 'time' in out: 949 try: 950 out['time'] = int(out['time']) 951 except Exception as e: 952 raise InvalidBitcoinURI(f"failed to parse 'time' field: {repr(e)}") from e 953 if 'exp' in out: 954 try: 955 out['exp'] = int(out['exp']) 956 except Exception as e: 957 raise InvalidBitcoinURI(f"failed to parse 'exp' field: {repr(e)}") from e 958 if 'sig' in out: 959 try: 960 out['sig'] = bh2u(bitcoin.base_decode(out['sig'], base=58)) 961 except Exception as e: 962 raise InvalidBitcoinURI(f"failed to parse 'sig' field: {repr(e)}") from e 963 964 r = out.get('r') 965 sig = out.get('sig') 966 name = out.get('name') 967 if on_pr and (r or (name and sig)): 968 @log_exceptions 969 async def get_payment_request(): 970 from . import paymentrequest as pr 971 if name and sig: 972 s = pr.serialize_request(out).SerializeToString() 973 request = pr.PaymentRequest(s) 974 else: 975 request = await pr.get_payment_request(r) 976 if on_pr: 977 on_pr(request) 978 loop = loop or asyncio.get_event_loop() 979 asyncio.run_coroutine_threadsafe(get_payment_request(), loop) 980 981 return out 982 983 984def create_bip21_uri(addr, amount_sat: Optional[int], message: Optional[str], 985 *, extra_query_params: Optional[dict] = None) -> str: 986 from . import bitcoin 987 if not bitcoin.is_address(addr): 988 return "" 989 if extra_query_params is None: 990 extra_query_params = {} 991 query = [] 992 if amount_sat: 993 query.append('amount=%s'%format_satoshis_plain(amount_sat)) 994 if message: 995 query.append('message=%s'%urllib.parse.quote(message)) 996 for k, v in extra_query_params.items(): 997 if not isinstance(k, str) or k != urllib.parse.quote(k): 998 raise Exception(f"illegal key for URI: {repr(k)}") 999 v = urllib.parse.quote(v) 1000 query.append(f"{k}={v}") 1001 p = urllib.parse.ParseResult( 1002 scheme=BITCOIN_BIP21_URI_SCHEME, 1003 netloc='', 1004 path=addr, 1005 params='', 1006 query='&'.join(query), 1007 fragment='', 1008 ) 1009 return str(urllib.parse.urlunparse(p)) 1010 1011 1012def maybe_extract_bolt11_invoice(data: str) -> Optional[str]: 1013 data = data.strip() # whitespaces 1014 data = data.lower() 1015 if data.startswith(LIGHTNING_URI_SCHEME + ':ln'): 1016 data = data[10:] 1017 if data.startswith('ln'): 1018 return data 1019 return None 1020 1021 1022# Python bug (http://bugs.python.org/issue1927) causes raw_input 1023# to be redirected improperly between stdin/stderr on Unix systems 1024#TODO: py3 1025def raw_input(prompt=None): 1026 if prompt: 1027 sys.stdout.write(prompt) 1028 return builtin_raw_input() 1029 1030builtin_raw_input = builtins.input 1031builtins.input = raw_input 1032 1033 1034def parse_json(message): 1035 # TODO: check \r\n pattern 1036 n = message.find(b'\n') 1037 if n==-1: 1038 return None, message 1039 try: 1040 j = json.loads(message[0:n].decode('utf8')) 1041 except: 1042 j = None 1043 return j, message[n+1:] 1044 1045 1046def setup_thread_excepthook(): 1047 """ 1048 Workaround for `sys.excepthook` thread bug from: 1049 http://bugs.python.org/issue1230540 1050 1051 Call once from the main thread before creating any threads. 1052 """ 1053 1054 init_original = threading.Thread.__init__ 1055 1056 def init(self, *args, **kwargs): 1057 1058 init_original(self, *args, **kwargs) 1059 run_original = self.run 1060 1061 def run_with_except_hook(*args2, **kwargs2): 1062 try: 1063 run_original(*args2, **kwargs2) 1064 except Exception: 1065 sys.excepthook(*sys.exc_info()) 1066 1067 self.run = run_with_except_hook 1068 1069 threading.Thread.__init__ = init 1070 1071 1072def send_exception_to_crash_reporter(e: BaseException): 1073 sys.excepthook(type(e), e, e.__traceback__) 1074 1075 1076def versiontuple(v): 1077 return tuple(map(int, (v.split(".")))) 1078 1079 1080def read_json_file(path): 1081 try: 1082 with open(path, 'r', encoding='utf-8') as f: 1083 data = json.loads(f.read()) 1084 #backwards compatibility for JSONDecodeError 1085 except ValueError: 1086 _logger.exception('') 1087 raise FileImportFailed(_("Invalid JSON code.")) 1088 except BaseException as e: 1089 _logger.exception('') 1090 raise FileImportFailed(e) 1091 return data 1092 1093def write_json_file(path, data): 1094 try: 1095 with open(path, 'w+', encoding='utf-8') as f: 1096 json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder) 1097 except (IOError, os.error) as e: 1098 _logger.exception('') 1099 raise FileExportFailed(e) 1100 1101 1102def make_dir(path, allow_symlink=True): 1103 """Make directory if it does not yet exist.""" 1104 if not os.path.exists(path): 1105 if not allow_symlink and os.path.islink(path): 1106 raise Exception('Dangling link: ' + path) 1107 os.mkdir(path) 1108 os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) 1109 1110 1111def log_exceptions(func): 1112 """Decorator to log AND re-raise exceptions.""" 1113 assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine' 1114 @functools.wraps(func) 1115 async def wrapper(*args, **kwargs): 1116 self = args[0] if len(args) > 0 else None 1117 try: 1118 return await func(*args, **kwargs) 1119 except asyncio.CancelledError as e: 1120 raise 1121 except BaseException as e: 1122 mylogger = self.logger if hasattr(self, 'logger') else _logger 1123 try: 1124 mylogger.exception(f"Exception in {func.__name__}: {repr(e)}") 1125 except BaseException as e2: 1126 print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}") 1127 raise 1128 return wrapper 1129 1130 1131def ignore_exceptions(func): 1132 """Decorator to silently swallow all exceptions.""" 1133 assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine' 1134 @functools.wraps(func) 1135 async def wrapper(*args, **kwargs): 1136 try: 1137 return await func(*args, **kwargs) 1138 except asyncio.CancelledError: 1139 # note: with python 3.8, CancelledError no longer inherits Exception, so this catch is redundant 1140 raise 1141 except Exception as e: 1142 pass 1143 return wrapper 1144 1145 1146def with_lock(func): 1147 """Decorator to enforce a lock on a function call.""" 1148 def func_wrapper(self, *args, **kwargs): 1149 with self.lock: 1150 return func(self, *args, **kwargs) 1151 return func_wrapper 1152 1153 1154class TxMinedInfo(NamedTuple): 1155 height: int # height of block that mined tx 1156 conf: Optional[int] = None # number of confirmations, SPV verified (None means unknown) 1157 timestamp: Optional[int] = None # timestamp of block that mined tx 1158 txpos: Optional[int] = None # position of tx in serialized block 1159 header_hash: Optional[str] = None # hash of block that mined tx 1160 1161 1162def make_aiohttp_session(proxy: Optional[dict], headers=None, timeout=None): 1163 if headers is None: 1164 headers = {'User-Agent': 'Electrum'} 1165 if timeout is None: 1166 # The default timeout is high intentionally. 1167 # DNS on some systems can be really slow, see e.g. #5337 1168 timeout = aiohttp.ClientTimeout(total=45) 1169 elif isinstance(timeout, (int, float)): 1170 timeout = aiohttp.ClientTimeout(total=timeout) 1171 ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path) 1172 1173 if proxy: 1174 connector = ProxyConnector( 1175 proxy_type=ProxyType.SOCKS5 if proxy['mode'] == 'socks5' else ProxyType.SOCKS4, 1176 host=proxy['host'], 1177 port=int(proxy['port']), 1178 username=proxy.get('user', None), 1179 password=proxy.get('password', None), 1180 rdns=True, 1181 ssl=ssl_context, 1182 ) 1183 else: 1184 connector = aiohttp.TCPConnector(ssl=ssl_context) 1185 1186 return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector) 1187 1188 1189class SilentTaskGroup(TaskGroup): 1190 1191 def spawn(self, *args, **kwargs): 1192 # don't complain if group is already closed. 1193 if self._closed: 1194 raise asyncio.CancelledError() 1195 return super().spawn(*args, **kwargs) 1196 1197 1198class NetworkJobOnDefaultServer(Logger, ABC): 1199 """An abstract base class for a job that runs on the main network 1200 interface. Every time the main interface changes, the job is 1201 restarted, and some of its internals are reset. 1202 """ 1203 def __init__(self, network: 'Network'): 1204 Logger.__init__(self) 1205 asyncio.set_event_loop(network.asyncio_loop) 1206 self.network = network 1207 self.interface = None # type: Interface 1208 self._restart_lock = asyncio.Lock() 1209 # Ensure fairness between NetworkJobs. e.g. if multiple wallets 1210 # are open, a large wallet's Synchronizer should not starve the small wallets: 1211 self._network_request_semaphore = asyncio.Semaphore(100) 1212 1213 self._reset() 1214 # every time the main interface changes, restart: 1215 register_callback(self._restart, ['default_server_changed']) 1216 # also schedule a one-off restart now, as there might already be a main interface: 1217 asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop) 1218 1219 def _reset(self): 1220 """Initialise fields. Called every time the underlying 1221 server connection changes. 1222 """ 1223 self.taskgroup = SilentTaskGroup() 1224 1225 async def _start(self, interface: 'Interface'): 1226 self.interface = interface 1227 await interface.taskgroup.spawn(self._run_tasks(taskgroup=self.taskgroup)) 1228 1229 @abstractmethod 1230 async def _run_tasks(self, *, taskgroup: TaskGroup) -> None: 1231 """Start tasks in taskgroup. Called every time the underlying 1232 server connection changes. 1233 """ 1234 # If self.taskgroup changed, don't start tasks. This can happen if we have 1235 # been restarted *just now*, i.e. after the _run_tasks coroutine object was created. 1236 if taskgroup != self.taskgroup: 1237 raise asyncio.CancelledError() 1238 1239 async def stop(self, *, full_shutdown: bool = True): 1240 if full_shutdown: 1241 unregister_callback(self._restart) 1242 await self.taskgroup.cancel_remaining() 1243 1244 @log_exceptions 1245 async def _restart(self, *args): 1246 interface = self.network.interface 1247 if interface is None: 1248 return # we should get called again soon 1249 1250 async with self._restart_lock: 1251 await self.stop(full_shutdown=False) 1252 self._reset() 1253 await self._start(interface) 1254 1255 @property 1256 def session(self): 1257 s = self.interface.session 1258 assert s is not None 1259 return s 1260 1261 1262def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop, 1263 asyncio.Future, 1264 threading.Thread]: 1265 def on_exception(loop, context): 1266 """Suppress spurious messages it appears we cannot control.""" 1267 SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|' 1268 'SSL error in data received') 1269 message = context.get('message') 1270 if message and SUPPRESS_MESSAGE_REGEX.match(message): 1271 return 1272 loop.default_exception_handler(context) 1273 1274 loop = asyncio.get_event_loop() 1275 loop.set_exception_handler(on_exception) 1276 # loop.set_debug(1) 1277 stopping_fut = asyncio.Future() 1278 loop_thread = threading.Thread(target=loop.run_until_complete, 1279 args=(stopping_fut,), 1280 name='EventLoop') 1281 loop_thread.start() 1282 return loop, stopping_fut, loop_thread 1283 1284 1285class OrderedDictWithIndex(OrderedDict): 1286 """An OrderedDict that keeps track of the positions of keys. 1287 1288 Note: very inefficient to modify contents, except to add new items. 1289 """ 1290 1291 def __init__(self): 1292 super().__init__() 1293 self._key_to_pos = {} 1294 self._pos_to_key = {} 1295 1296 def _recalc_index(self): 1297 self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())} 1298 self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())} 1299 1300 def pos_from_key(self, key): 1301 return self._key_to_pos[key] 1302 1303 def value_from_pos(self, pos): 1304 key = self._pos_to_key[pos] 1305 return self[key] 1306 1307 def popitem(self, *args, **kwargs): 1308 ret = super().popitem(*args, **kwargs) 1309 self._recalc_index() 1310 return ret 1311 1312 def move_to_end(self, *args, **kwargs): 1313 ret = super().move_to_end(*args, **kwargs) 1314 self._recalc_index() 1315 return ret 1316 1317 def clear(self): 1318 ret = super().clear() 1319 self._recalc_index() 1320 return ret 1321 1322 def pop(self, *args, **kwargs): 1323 ret = super().pop(*args, **kwargs) 1324 self._recalc_index() 1325 return ret 1326 1327 def update(self, *args, **kwargs): 1328 ret = super().update(*args, **kwargs) 1329 self._recalc_index() 1330 return ret 1331 1332 def __delitem__(self, *args, **kwargs): 1333 ret = super().__delitem__(*args, **kwargs) 1334 self._recalc_index() 1335 return ret 1336 1337 def __setitem__(self, key, *args, **kwargs): 1338 is_new_key = key not in self 1339 ret = super().__setitem__(key, *args, **kwargs) 1340 if is_new_key: 1341 pos = len(self) - 1 1342 self._key_to_pos[key] = pos 1343 self._pos_to_key[pos] = key 1344 return ret 1345 1346 1347def multisig_type(wallet_type): 1348 '''If wallet_type is mofn multi-sig, return [m, n], 1349 otherwise return None.''' 1350 if not wallet_type: 1351 return None 1352 match = re.match(r'(\d+)of(\d+)', wallet_type) 1353 if match: 1354 match = [int(x) for x in match.group(1, 2)] 1355 return match 1356 1357 1358def is_ip_address(x: Union[str, bytes]) -> bool: 1359 if isinstance(x, bytes): 1360 x = x.decode("utf-8") 1361 try: 1362 ipaddress.ip_address(x) 1363 return True 1364 except ValueError: 1365 return False 1366 1367 1368def is_private_netaddress(host: str) -> bool: 1369 if str(host) in ('localhost', 'localhost.',): 1370 return True 1371 if host[0] == '[' and host[-1] == ']': # IPv6 1372 host = host[1:-1] 1373 try: 1374 ip_addr = ipaddress.ip_address(host) # type: Union[IPv4Address, IPv6Address] 1375 return ip_addr.is_private 1376 except ValueError: 1377 pass # not an IP 1378 return False 1379 1380 1381def list_enabled_bits(x: int) -> Sequence[int]: 1382 """e.g. 77 (0b1001101) --> (0, 2, 3, 6)""" 1383 binary = bin(x)[2:] 1384 rev_bin = reversed(binary) 1385 return tuple(i for i, b in enumerate(rev_bin) if b == '1') 1386 1387 1388def resolve_dns_srv(host: str): 1389 srv_records = dns.resolver.resolve(host, 'SRV') 1390 # priority: prefer lower 1391 # weight: tie breaker; prefer higher 1392 srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight)) 1393 1394 def dict_from_srv_record(srv): 1395 return { 1396 'host': str(srv.target), 1397 'port': srv.port, 1398 } 1399 return [dict_from_srv_record(srv) for srv in srv_records] 1400 1401 1402def randrange(bound: int) -> int: 1403 """Return a random integer k such that 1 <= k < bound, uniformly 1404 distributed across that range.""" 1405 # secrets.randbelow(bound) returns a random int: 0 <= r < bound, 1406 # hence transformations: 1407 return secrets.randbelow(bound - 1) + 1 1408 1409 1410class CallbackManager: 1411 # callbacks set by the GUI or any thread 1412 # guarantee: the callbacks will always get triggered from the asyncio thread. 1413 1414 def __init__(self): 1415 self.callback_lock = threading.Lock() 1416 self.callbacks = defaultdict(list) # note: needs self.callback_lock 1417 self.asyncio_loop = None 1418 1419 def register_callback(self, callback, events): 1420 with self.callback_lock: 1421 for event in events: 1422 self.callbacks[event].append(callback) 1423 1424 def unregister_callback(self, callback): 1425 with self.callback_lock: 1426 for callbacks in self.callbacks.values(): 1427 if callback in callbacks: 1428 callbacks.remove(callback) 1429 1430 def trigger_callback(self, event, *args): 1431 """Trigger a callback with given arguments. 1432 Can be called from any thread. The callback itself will get scheduled 1433 on the event loop. 1434 """ 1435 if self.asyncio_loop is None: 1436 self.asyncio_loop = asyncio.get_event_loop() 1437 assert self.asyncio_loop.is_running(), "event loop not running" 1438 with self.callback_lock: 1439 callbacks = self.callbacks[event][:] 1440 for callback in callbacks: 1441 # FIXME: if callback throws, we will lose the traceback 1442 if asyncio.iscoroutinefunction(callback): 1443 asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop) 1444 else: 1445 self.asyncio_loop.call_soon_threadsafe(callback, event, *args) 1446 1447 1448callback_mgr = CallbackManager() 1449trigger_callback = callback_mgr.trigger_callback 1450register_callback = callback_mgr.register_callback 1451unregister_callback = callback_mgr.unregister_callback 1452 1453 1454_NetAddrType = TypeVar("_NetAddrType") 1455 1456 1457class NetworkRetryManager(Generic[_NetAddrType]): 1458 """Truncated Exponential Backoff for network connections.""" 1459 1460 def __init__( 1461 self, *, 1462 max_retry_delay_normal: float, 1463 init_retry_delay_normal: float, 1464 max_retry_delay_urgent: float = None, 1465 init_retry_delay_urgent: float = None, 1466 ): 1467 self._last_tried_addr = {} # type: Dict[_NetAddrType, Tuple[float, int]] # (unix ts, num_attempts) 1468 1469 # note: these all use "seconds" as unit 1470 if max_retry_delay_urgent is None: 1471 max_retry_delay_urgent = max_retry_delay_normal 1472 if init_retry_delay_urgent is None: 1473 init_retry_delay_urgent = init_retry_delay_normal 1474 self._max_retry_delay_normal = max_retry_delay_normal 1475 self._init_retry_delay_normal = init_retry_delay_normal 1476 self._max_retry_delay_urgent = max_retry_delay_urgent 1477 self._init_retry_delay_urgent = init_retry_delay_urgent 1478 1479 def _trying_addr_now(self, addr: _NetAddrType) -> None: 1480 last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0)) 1481 # we add up to 1 second of noise to the time, so that clients are less likely 1482 # to get synchronised and bombard the remote in connection waves: 1483 cur_time = time.time() + random.random() 1484 self._last_tried_addr[addr] = cur_time, num_attempts + 1 1485 1486 def _on_connection_successfully_established(self, addr: _NetAddrType) -> None: 1487 self._last_tried_addr[addr] = time.time(), 0 1488 1489 def _can_retry_addr(self, addr: _NetAddrType, *, 1490 now: float = None, urgent: bool = False) -> bool: 1491 if now is None: 1492 now = time.time() 1493 last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0)) 1494 if urgent: 1495 max_delay = self._max_retry_delay_urgent 1496 init_delay = self._init_retry_delay_urgent 1497 else: 1498 max_delay = self._max_retry_delay_normal 1499 init_delay = self._init_retry_delay_normal 1500 delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts) 1501 next_time = last_time + delay 1502 return next_time < now 1503 1504 @classmethod 1505 def __calc_delay(cls, *, multiplier: float, max_delay: float, 1506 num_attempts: int) -> float: 1507 num_attempts = min(num_attempts, 100_000) 1508 try: 1509 res = multiplier * 2 ** num_attempts 1510 except OverflowError: 1511 return max_delay 1512 return max(0, min(max_delay, res)) 1513 1514 def _clear_addr_retry_times(self) -> None: 1515 self._last_tried_addr.clear() 1516 1517 1518class MySocksProxy(aiorpcx.SOCKSProxy): 1519 1520 async def open_connection(self, host=None, port=None, **kwargs): 1521 loop = asyncio.get_event_loop() 1522 reader = asyncio.StreamReader(loop=loop) 1523 protocol = asyncio.StreamReaderProtocol(reader, loop=loop) 1524 transport, _ = await self.create_connection( 1525 lambda: protocol, host, port, **kwargs) 1526 writer = asyncio.StreamWriter(transport, protocol, reader, loop) 1527 return reader, writer 1528 1529 @classmethod 1530 def from_proxy_dict(cls, proxy: dict = None) -> Optional['MySocksProxy']: 1531 if not proxy: 1532 return None 1533 username, pw = proxy.get('user'), proxy.get('password') 1534 if not username or not pw: 1535 auth = None 1536 else: 1537 auth = aiorpcx.socks.SOCKSUserAuth(username, pw) 1538 addr = aiorpcx.NetAddress(proxy['host'], proxy['port']) 1539 if proxy['mode'] == "socks4": 1540 ret = cls(addr, aiorpcx.socks.SOCKS4a, auth) 1541 elif proxy['mode'] == "socks5": 1542 ret = cls(addr, aiorpcx.socks.SOCKS5, auth) 1543 else: 1544 raise NotImplementedError # http proxy not available with aiorpcx 1545 return ret 1546 1547 1548class JsonRPCClient: 1549 1550 def __init__(self, session: aiohttp.ClientSession, url: str): 1551 self.session = session 1552 self.url = url 1553 self._id = 0 1554 1555 async def request(self, endpoint, *args): 1556 self._id += 1 1557 data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }' 1558 % (self._id, endpoint, json.dumps(args))) 1559 async with self.session.post(self.url, data=data) as resp: 1560 if resp.status == 200: 1561 r = await resp.json() 1562 result = r.get('result') 1563 error = r.get('error') 1564 if error: 1565 return 'Error: ' + str(error) 1566 else: 1567 return result 1568 else: 1569 text = await resp.text() 1570 return 'Error: ' + str(text) 1571 1572 def add_method(self, endpoint): 1573 async def coro(*args): 1574 return await self.request(endpoint, *args) 1575 setattr(self, endpoint, coro) 1576 1577 1578T = TypeVar('T') 1579 1580def random_shuffled_copy(x: Iterable[T]) -> List[T]: 1581 """Returns a shuffled copy of the input.""" 1582 x_copy = list(x) # copy 1583 random.shuffle(x_copy) # shuffle in-place 1584 return x_copy 1585 1586 1587def test_read_write_permissions(path) -> None: 1588 # note: There might already be a file at 'path'. 1589 # Make sure we do NOT overwrite/corrupt that! 1590 temp_path = "%s.tmptest.%s" % (path, os.getpid()) 1591 echo = "fs r/w test" 1592 try: 1593 # test READ permissions for actual path 1594 if os.path.exists(path): 1595 with open(path, "rb") as f: 1596 f.read(1) # read 1 byte 1597 # test R/W sanity for "similar" path 1598 with open(temp_path, "w", encoding='utf-8') as f: 1599 f.write(echo) 1600 with open(temp_path, "r", encoding='utf-8') as f: 1601 echo2 = f.read() 1602 os.remove(temp_path) 1603 except Exception as e: 1604 raise IOError(e) from e 1605 if echo != echo2: 1606 raise IOError('echo sanity-check failed') 1607 1608 1609class nullcontext: 1610 """Context manager that does no additional processing. 1611 This is a ~backport of contextlib.nullcontext from Python 3.10 1612 """ 1613 1614 def __init__(self, enter_result=None): 1615 self.enter_result = enter_result 1616 1617 def __enter__(self): 1618 return self.enter_result 1619 1620 def __exit__(self, *excinfo): 1621 pass 1622 1623 async def __aenter__(self): 1624 return self.enter_result 1625 1626 async def __aexit__(self, *excinfo): 1627 pass 1628 1629def get_running_loop(): 1630 """Mimics _get_running_loop convenient functionality for sanity checks on all python versions""" 1631 if sys.version_info < (3, 7): 1632 return asyncio._get_running_loop() 1633 try: 1634 return asyncio.get_running_loop() 1635 except RuntimeError: 1636 return None 1637