1# Electrum - Lightweight Bitcoin Client 2# Copyright (c) 2011-2016 Thomas Voegtlin 3# 4# Permission is hereby granted, free of charge, to any person 5# obtaining a copy of this software and associated documentation files 6# (the "Software"), to deal in the Software without restriction, 7# including without limitation the rights to use, copy, modify, merge, 8# publish, distribute, sublicense, and/or sell copies of the Software, 9# and to permit persons to whom the Software is furnished to do so, 10# subject to the following conditions: 11# 12# The above copyright notice and this permission notice shall be 13# included in all copies or substantial portions of the Software. 14# 15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 16# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 17# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 18# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 19# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 20# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 21# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 22# SOFTWARE. 23import asyncio 24import time 25import queue 26import os 27import random 28import re 29from collections import defaultdict 30import threading 31import socket 32import json 33import sys 34import asyncio 35from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable, Set, Any 36import traceback 37import concurrent 38from concurrent import futures 39import copy 40import functools 41 42import aiorpcx 43from aiorpcx import TaskGroup, ignore_after 44from aiohttp import ClientResponse 45 46from . import util 47from .util import (log_exceptions, ignore_exceptions, 48 bfh, SilentTaskGroup, make_aiohttp_session, send_exception_to_crash_reporter, 49 is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager, 50 nullcontext) 51from .bitcoin import COIN 52from . import constants 53from . import blockchain 54from . import bitcoin 55from . import dns_hacks 56from .transaction import Transaction 57from .blockchain import Blockchain, HEADER_SIZE 58from .interface import (Interface, PREFERRED_NETWORK_PROTOCOL, 59 RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS, 60 NetworkException, RequestCorrupted, ServerAddr) 61from .version import PROTOCOL_VERSION 62from .simple_config import SimpleConfig 63from .i18n import _ 64from .logging import get_logger, Logger 65 66if TYPE_CHECKING: 67 from .channel_db import ChannelDB 68 from .lnrouter import LNPathFinder 69 from .lnworker import LNGossip 70 from .lnwatcher import WatchTower 71 from .daemon import Daemon 72 73 74_logger = get_logger(__name__) 75 76 77NUM_TARGET_CONNECTED_SERVERS = 10 78NUM_STICKY_SERVERS = 4 79NUM_RECENT_SERVERS = 20 80 81 82def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]: 83 """Convert servers list (from protocol method "server.peers.subscribe") into dict format. 84 Also validate values, such as IP addresses and ports. 85 """ 86 servers = {} 87 for item in result: 88 host = item[1] 89 out = {} 90 version = None 91 pruning_level = '-' 92 if len(item) > 2: 93 for v in item[2]: 94 if re.match(r"[st]\d*", v): 95 protocol, port = v[0], v[1:] 96 if port == '': port = constants.net.DEFAULT_PORTS[protocol] 97 ServerAddr(host, port, protocol=protocol) # check if raises 98 out[protocol] = port 99 elif re.match("v(.?)+", v): 100 version = v[1:] 101 elif re.match(r"p\d*", v): 102 pruning_level = v[1:] 103 if pruning_level == '': pruning_level = '0' 104 if out: 105 out['pruning'] = pruning_level 106 out['version'] = version 107 servers[host] = out 108 return servers 109 110 111def filter_version(servers): 112 def is_recent(version): 113 try: 114 return util.versiontuple(version) >= util.versiontuple(PROTOCOL_VERSION) 115 except Exception as e: 116 return False 117 return {k: v for k, v in servers.items() if is_recent(v.get('version'))} 118 119 120def filter_noonion(servers): 121 return {k: v for k, v in servers.items() if not k.endswith('.onion')} 122 123 124def filter_protocol(hostmap, *, allowed_protocols: Iterable[str] = None) -> Sequence[ServerAddr]: 125 """Filters the hostmap for those implementing protocol.""" 126 if allowed_protocols is None: 127 allowed_protocols = {PREFERRED_NETWORK_PROTOCOL} 128 eligible = [] 129 for host, portmap in hostmap.items(): 130 for protocol in allowed_protocols: 131 port = portmap.get(protocol) 132 if port: 133 eligible.append(ServerAddr(host, port, protocol=protocol)) 134 return eligible 135 136 137def pick_random_server(hostmap=None, *, allowed_protocols: Iterable[str], 138 exclude_set: Set[ServerAddr] = None) -> Optional[ServerAddr]: 139 if hostmap is None: 140 hostmap = constants.net.DEFAULT_SERVERS 141 if exclude_set is None: 142 exclude_set = set() 143 servers = set(filter_protocol(hostmap, allowed_protocols=allowed_protocols)) 144 eligible = list(servers - exclude_set) 145 return random.choice(eligible) if eligible else None 146 147 148class NetworkParameters(NamedTuple): 149 server: ServerAddr 150 proxy: Optional[dict] 151 auto_connect: bool 152 oneserver: bool = False 153 154 155proxy_modes = ['socks4', 'socks5'] 156 157 158def serialize_proxy(p): 159 if not isinstance(p, dict): 160 return None 161 return ':'.join([p.get('mode'), p.get('host'), p.get('port'), 162 p.get('user', ''), p.get('password', '')]) 163 164 165def deserialize_proxy(s: str) -> Optional[dict]: 166 if not isinstance(s, str): 167 return None 168 if s.lower() == 'none': 169 return None 170 proxy = {"mode":"socks5", "host":"localhost"} 171 # FIXME raw IPv6 address fails here 172 args = s.split(':') 173 n = 0 174 if proxy_modes.count(args[n]) == 1: 175 proxy["mode"] = args[n] 176 n += 1 177 if len(args) > n: 178 proxy["host"] = args[n] 179 n += 1 180 if len(args) > n: 181 proxy["port"] = args[n] 182 n += 1 183 else: 184 proxy["port"] = "8080" if proxy["mode"] == "http" else "1080" 185 if len(args) > n: 186 proxy["user"] = args[n] 187 n += 1 188 if len(args) > n: 189 proxy["password"] = args[n] 190 return proxy 191 192 193class BestEffortRequestFailed(NetworkException): pass 194 195 196class TxBroadcastError(NetworkException): 197 def get_message_for_gui(self): 198 raise NotImplementedError() 199 200 201class TxBroadcastHashMismatch(TxBroadcastError): 202 def get_message_for_gui(self): 203 return "{}\n{}\n\n{}" \ 204 .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."), 205 _("Consider trying to connect to a different server, or updating Electrum."), 206 str(self)) 207 208 209class TxBroadcastServerReturnedError(TxBroadcastError): 210 def get_message_for_gui(self): 211 return "{}\n{}\n\n{}" \ 212 .format(_("The server returned an error when broadcasting the transaction."), 213 _("Consider trying to connect to a different server, or updating Electrum."), 214 str(self)) 215 216 217class TxBroadcastUnknownError(TxBroadcastError): 218 def get_message_for_gui(self): 219 return "{}\n{}" \ 220 .format(_("Unknown error when broadcasting the transaction."), 221 _("Consider trying to connect to a different server, or updating Electrum.")) 222 223 224class UntrustedServerReturnedError(NetworkException): 225 def __init__(self, *, original_exception): 226 self.original_exception = original_exception 227 228 def get_message_for_gui(self) -> str: 229 return str(self) 230 231 def __str__(self): 232 return _("The server returned an error.") 233 234 def __repr__(self): 235 return (f"<UntrustedServerReturnedError " 236 f"[DO NOT TRUST THIS MESSAGE] original_exception: {repr(self.original_exception)}>") 237 238 239_INSTANCE = None 240 241 242class Network(Logger, NetworkRetryManager[ServerAddr]): 243 """The Network class manages a set of connections to remote electrum 244 servers, each connected socket is handled by an Interface() object. 245 """ 246 247 LOGGING_SHORTCUT = 'n' 248 249 taskgroup: Optional[TaskGroup] 250 interface: Optional[Interface] 251 interfaces: Dict[ServerAddr, Interface] 252 _connecting_ifaces: Set[ServerAddr] 253 _closing_ifaces: Set[ServerAddr] 254 default_server: ServerAddr 255 _recent_servers: List[ServerAddr] 256 257 channel_db: Optional['ChannelDB'] = None 258 lngossip: Optional['LNGossip'] = None 259 local_watchtower: Optional['WatchTower'] = None 260 path_finder: Optional['LNPathFinder'] = None 261 262 def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None): 263 global _INSTANCE 264 assert _INSTANCE is None, "Network is a singleton!" 265 _INSTANCE = self 266 267 Logger.__init__(self) 268 NetworkRetryManager.__init__( 269 self, 270 max_retry_delay_normal=600, 271 init_retry_delay_normal=15, 272 max_retry_delay_urgent=10, 273 init_retry_delay_urgent=1, 274 ) 275 276 self.asyncio_loop = asyncio.get_event_loop() 277 assert self.asyncio_loop.is_running(), "event loop not running" 278 279 assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}" 280 self.config = config 281 282 self.daemon = daemon 283 284 blockchain.read_blockchains(self.config) 285 blockchain.init_headers_file_for_best_chain() 286 self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}") 287 self._blockchain_preferred_block = self.config.get('blockchain_preferred_block', None) # type: Dict[str, Any] 288 if self._blockchain_preferred_block is None: 289 self._set_preferred_chain(None) 290 self._blockchain = blockchain.get_best_chain() 291 292 self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL} 293 294 # Server for addresses and transactions 295 self.default_server = self.config.get('server', None) 296 # Sanitize default server 297 if self.default_server: 298 try: 299 self.default_server = ServerAddr.from_str(self.default_server) 300 except: 301 self.logger.warning('failed to parse server-string; falling back to localhost:1:s.') 302 self.default_server = ServerAddr.from_str("localhost:1:s") 303 else: 304 self.default_server = pick_random_server(allowed_protocols=self._allowed_protocols) 305 assert isinstance(self.default_server, ServerAddr), f"invalid type for default_server: {self.default_server!r}" 306 307 self.taskgroup = None 308 309 # locks 310 self.restart_lock = asyncio.Lock() 311 self.bhi_lock = asyncio.Lock() 312 self.recent_servers_lock = threading.RLock() # <- re-entrant 313 self.interfaces_lock = threading.Lock() # for mutating/iterating self.interfaces 314 315 self.server_peers = {} # returned by interface (servers that the main interface knows about) 316 self._recent_servers = self._read_recent_servers() # note: needs self.recent_servers_lock 317 318 self.banner = '' 319 self.donation_address = '' 320 self.relay_fee = None # type: Optional[int] 321 322 dir_path = os.path.join(self.config.path, 'certs') 323 util.make_dir(dir_path) 324 325 # the main server we are currently communicating with 326 self.interface = None 327 self.default_server_changed_event = asyncio.Event() 328 # Set of servers we have an ongoing connection with. 329 # For any ServerAddr, at most one corresponding Interface object 330 # can exist at any given time. Depending on the state of that Interface, 331 # the ServerAddr can be found in one of the following sets. 332 # Note: during a transition, the ServerAddr can appear in two sets momentarily. 333 self._connecting_ifaces = set() 334 self.interfaces = {} # these are the ifaces in "initialised and usable" state 335 self._closing_ifaces = set() 336 337 self.auto_connect = self.config.get('auto_connect', True) 338 self.proxy = None 339 self._maybe_set_oneserver() 340 341 # Dump network messages (all interfaces). Set at runtime from the console. 342 self.debug = False 343 344 self._set_status('disconnected') 345 self._has_ever_managed_to_connect_to_server = False 346 347 # lightning network 348 if self.config.get('run_watchtower', False): 349 from . import lnwatcher 350 self.local_watchtower = lnwatcher.WatchTower(self) 351 self.local_watchtower.start_network(self) 352 asyncio.ensure_future(self.local_watchtower.start_watching()) 353 354 def has_internet_connection(self) -> bool: 355 """Our guess whether the device has Internet-connectivity.""" 356 return self._has_ever_managed_to_connect_to_server 357 358 def has_channel_db(self): 359 return self.channel_db is not None 360 361 def start_gossip(self): 362 from . import lnrouter 363 from . import channel_db 364 from . import lnworker 365 if not self.config.get('use_gossip'): 366 return 367 if self.lngossip is None: 368 self.channel_db = channel_db.ChannelDB(self) 369 self.path_finder = lnrouter.LNPathFinder(self.channel_db) 370 self.channel_db.load_data() 371 self.lngossip = lnworker.LNGossip() 372 self.lngossip.start_network(self) 373 374 async def stop_gossip(self, *, full_shutdown: bool = False): 375 if self.lngossip: 376 await self.lngossip.stop() 377 self.lngossip = None 378 self.channel_db.stop() 379 if full_shutdown: 380 await self.channel_db.stopped_event.wait() 381 self.channel_db = None 382 self.path_finder = None 383 384 def run_from_another_thread(self, coro, *, timeout=None): 385 assert util.get_running_loop() != self.asyncio_loop, 'must not be called from network thread' 386 fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop) 387 return fut.result(timeout) 388 389 @staticmethod 390 def get_instance() -> Optional["Network"]: 391 return _INSTANCE 392 393 def with_recent_servers_lock(func): 394 def func_wrapper(self, *args, **kwargs): 395 with self.recent_servers_lock: 396 return func(self, *args, **kwargs) 397 return func_wrapper 398 399 def _read_recent_servers(self) -> List[ServerAddr]: 400 if not self.config.path: 401 return [] 402 path = os.path.join(self.config.path, "recent_servers") 403 try: 404 with open(path, "r", encoding='utf-8') as f: 405 data = f.read() 406 servers_list = json.loads(data) 407 return [ServerAddr.from_str(s) for s in servers_list] 408 except: 409 return [] 410 411 @with_recent_servers_lock 412 def _save_recent_servers(self): 413 if not self.config.path: 414 return 415 path = os.path.join(self.config.path, "recent_servers") 416 s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder) 417 try: 418 with open(path, "w", encoding='utf-8') as f: 419 f.write(s) 420 except: 421 pass 422 423 async def _server_is_lagging(self) -> bool: 424 sh = self.get_server_height() 425 if not sh: 426 self.logger.info('no height for main interface') 427 return True 428 lh = self.get_local_height() 429 result = (lh - sh) > 1 430 if result: 431 self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})') 432 return result 433 434 def _set_status(self, status): 435 self.connection_status = status 436 self.notify('status') 437 438 def is_connected(self): 439 interface = self.interface 440 return interface is not None and interface.ready.done() 441 442 def is_connecting(self): 443 return self.connection_status == 'connecting' 444 445 async def _request_server_info(self, interface: 'Interface'): 446 await interface.ready 447 session = interface.session 448 449 async def get_banner(): 450 self.banner = await interface.get_server_banner() 451 self.notify('banner') 452 async def get_donation_address(): 453 self.donation_address = await interface.get_donation_address() 454 async def get_server_peers(): 455 server_peers = await session.send_request('server.peers.subscribe') 456 random.shuffle(server_peers) 457 max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS 458 server_peers = server_peers[:max_accepted_peers] 459 # note that 'parse_servers' also validates the data (which is untrusted input!) 460 self.server_peers = parse_servers(server_peers) 461 self.notify('servers') 462 async def get_relay_fee(): 463 self.relay_fee = await interface.get_relay_fee() 464 465 async with TaskGroup() as group: 466 await group.spawn(get_banner) 467 await group.spawn(get_donation_address) 468 await group.spawn(get_server_peers) 469 await group.spawn(get_relay_fee) 470 await group.spawn(self._request_fee_estimates(interface)) 471 472 async def _request_fee_estimates(self, interface): 473 self.config.requested_fee_estimates() 474 histogram = await interface.get_fee_histogram() 475 self.config.mempool_fees = histogram 476 self.logger.info(f'fee_histogram {histogram}') 477 self.notify('fee_histogram') 478 479 def get_status_value(self, key): 480 if key == 'status': 481 value = self.connection_status 482 elif key == 'banner': 483 value = self.banner 484 elif key == 'fee': 485 value = self.config.fee_estimates 486 elif key == 'fee_histogram': 487 value = self.config.mempool_fees 488 elif key == 'servers': 489 value = self.get_servers() 490 else: 491 raise Exception('unexpected trigger key {}'.format(key)) 492 return value 493 494 def notify(self, key): 495 if key in ['status', 'updated']: 496 util.trigger_callback(key) 497 else: 498 util.trigger_callback(key, self.get_status_value(key)) 499 500 def get_parameters(self) -> NetworkParameters: 501 return NetworkParameters(server=self.default_server, 502 proxy=self.proxy, 503 auto_connect=self.auto_connect, 504 oneserver=self.oneserver) 505 506 def get_donation_address(self): 507 if self.is_connected(): 508 return self.donation_address 509 510 def get_interfaces(self) -> List[ServerAddr]: 511 """The list of servers for the connected interfaces.""" 512 with self.interfaces_lock: 513 return list(self.interfaces) 514 515 def get_fee_estimates(self): 516 from statistics import median 517 from .simple_config import FEE_ETA_TARGETS 518 if self.auto_connect: 519 with self.interfaces_lock: 520 out = {} 521 for n in FEE_ETA_TARGETS: 522 try: 523 out[n] = int(median(filter(None, [i.fee_estimates_eta.get(n) for i in self.interfaces.values()]))) 524 except: 525 continue 526 return out 527 else: 528 if not self.interface: 529 return {} 530 return self.interface.fee_estimates_eta 531 532 def update_fee_estimates(self, *, fee_est: Dict[int, int] = None): 533 if fee_est is None: 534 fee_est = self.get_fee_estimates() 535 for nblock_target, fee in fee_est.items(): 536 self.config.update_fee_estimates(nblock_target, fee) 537 if not hasattr(self, "_prev_fee_est") or self._prev_fee_est != fee_est: 538 self._prev_fee_est = copy.copy(fee_est) 539 self.logger.info(f'fee_estimates {fee_est}') 540 self.notify('fee') 541 542 @with_recent_servers_lock 543 def get_servers(self): 544 # note: order of sources when adding servers here is crucial! 545 # don't let "server_peers" overwrite anything, 546 # otherwise main server can eclipse the client 547 out = dict() 548 # add servers received from main interface 549 server_peers = self.server_peers 550 if server_peers: 551 out.update(filter_version(server_peers.copy())) 552 # hardcoded servers 553 out.update(constants.net.DEFAULT_SERVERS) 554 # add recent servers 555 for server in self._recent_servers: 556 port = str(server.port) 557 if server.host in out: 558 out[server.host].update({server.protocol: port}) 559 else: 560 out[server.host] = {server.protocol: port} 561 # potentially filter out some 562 if self.config.get('noonion'): 563 out = filter_noonion(out) 564 return out 565 566 def _get_next_server_to_try(self) -> Optional[ServerAddr]: 567 now = time.time() 568 with self.interfaces_lock: 569 connected_servers = set(self.interfaces) | self._connecting_ifaces | self._closing_ifaces 570 # First try from recent servers. (which are persisted) 571 # As these are servers we successfully connected to recently, they are 572 # most likely to work. This also makes servers "sticky". 573 # Note: with sticky servers, it is more difficult for an attacker to eclipse the client, 574 # however if they succeed, the eclipsing would persist. To try to balance this, 575 # we only give priority to recent_servers up to NUM_STICKY_SERVERS. 576 with self.recent_servers_lock: 577 recent_servers = list(self._recent_servers) 578 recent_servers = [s for s in recent_servers if s.protocol in self._allowed_protocols] 579 if len(connected_servers & set(recent_servers)) < NUM_STICKY_SERVERS: 580 for server in recent_servers: 581 if server in connected_servers: 582 continue 583 if not self._can_retry_addr(server, now=now): 584 continue 585 return server 586 # try all servers we know about, pick one at random 587 hostmap = self.get_servers() 588 servers = list(set(filter_protocol(hostmap, allowed_protocols=self._allowed_protocols)) - connected_servers) 589 random.shuffle(servers) 590 for server in servers: 591 if not self._can_retry_addr(server, now=now): 592 continue 593 return server 594 return None 595 596 def _set_proxy(self, proxy: Optional[dict]): 597 self.proxy = proxy 598 dns_hacks.configure_dns_depending_on_proxy(bool(proxy)) 599 self.logger.info(f'setting proxy {proxy}') 600 util.trigger_callback('proxy_set', self.proxy) 601 602 @log_exceptions 603 async def set_parameters(self, net_params: NetworkParameters): 604 proxy = net_params.proxy 605 proxy_str = serialize_proxy(proxy) 606 server = net_params.server 607 # sanitize parameters 608 try: 609 if proxy: 610 proxy_modes.index(proxy['mode']) + 1 611 int(proxy['port']) 612 except: 613 return 614 self.config.set_key('auto_connect', net_params.auto_connect, False) 615 self.config.set_key('oneserver', net_params.oneserver, False) 616 self.config.set_key('proxy', proxy_str, False) 617 self.config.set_key('server', str(server), True) 618 # abort if changes were not allowed by config 619 if self.config.get('server') != str(server) \ 620 or self.config.get('proxy') != proxy_str \ 621 or self.config.get('oneserver') != net_params.oneserver: 622 return 623 624 async with self.restart_lock: 625 self.auto_connect = net_params.auto_connect 626 if self.proxy != proxy or self.oneserver != net_params.oneserver: 627 # Restart the network defaulting to the given server 628 await self.stop(full_shutdown=False) 629 self.default_server = server 630 await self._start() 631 elif self.default_server != server: 632 await self.switch_to_interface(server) 633 else: 634 await self.switch_lagging_interface() 635 util.trigger_callback('network_updated') 636 637 def _maybe_set_oneserver(self) -> None: 638 oneserver = bool(self.config.get('oneserver', False)) 639 self.oneserver = oneserver 640 self.num_server = NUM_TARGET_CONNECTED_SERVERS if not oneserver else 0 641 642 async def _switch_to_random_interface(self): 643 '''Switch to a random connected server other than the current one''' 644 servers = self.get_interfaces() # Those in connected state 645 if self.default_server in servers: 646 servers.remove(self.default_server) 647 if servers: 648 await self.switch_to_interface(random.choice(servers)) 649 650 async def switch_lagging_interface(self): 651 """If auto_connect and lagging, switch interface (only within fork).""" 652 if self.auto_connect and await self._server_is_lagging(): 653 # switch to one that has the correct header (not height) 654 best_header = self.blockchain().header_at_tip() 655 with self.interfaces_lock: interfaces = list(self.interfaces.values()) 656 filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces)) 657 if filtered: 658 chosen_iface = random.choice(filtered) 659 await self.switch_to_interface(chosen_iface.server) 660 661 async def switch_unwanted_fork_interface(self) -> None: 662 """If auto_connect, maybe switch to another fork/chain.""" 663 if not self.auto_connect or not self.interface: 664 return 665 with self.interfaces_lock: interfaces = list(self.interfaces.values()) 666 pref_height = self._blockchain_preferred_block['height'] 667 pref_hash = self._blockchain_preferred_block['hash'] 668 # shortcut for common case 669 if pref_height == 0: 670 return 671 # maybe try switching chains; starting with most desirable first 672 matching_chains = blockchain.get_chains_that_contain_header(pref_height, pref_hash) 673 chains_to_try = list(matching_chains) + [blockchain.get_best_chain()] 674 for rank, chain in enumerate(chains_to_try): 675 # check if main interface is already on this fork 676 if self.interface.blockchain == chain: 677 return 678 # switch to another random interface that is on this fork, if any 679 filtered = [iface for iface in interfaces 680 if iface.blockchain == chain] 681 if filtered: 682 self.logger.info(f"switching to (more) preferred fork (rank {rank})") 683 chosen_iface = random.choice(filtered) 684 await self.switch_to_interface(chosen_iface.server) 685 return 686 self.logger.info("tried to switch to (more) preferred fork but no interfaces are on any") 687 688 async def switch_to_interface(self, server: ServerAddr): 689 """Switch to server as our main interface. If no connection exists, 690 queue interface to be started. The actual switch will 691 happen when the interface becomes ready. 692 """ 693 self.default_server = server 694 old_interface = self.interface 695 old_server = old_interface.server if old_interface else None 696 697 # Stop any current interface in order to terminate subscriptions, 698 # and to cancel tasks in interface.taskgroup. 699 if old_server and old_server != server: 700 # don't wait for old_interface to close as that might be slow: 701 await self.taskgroup.spawn(self._close_interface(old_interface)) 702 703 if server not in self.interfaces: 704 self.interface = None 705 await self.taskgroup.spawn(self._run_new_interface(server)) 706 return 707 708 i = self.interfaces[server] 709 if old_interface != i: 710 self.logger.info(f"switching to {server}") 711 assert i.ready.done(), "interface we are switching to is not ready yet" 712 blockchain_updated = i.blockchain != self.blockchain() 713 self.interface = i 714 await i.taskgroup.spawn(self._request_server_info(i)) 715 util.trigger_callback('default_server_changed') 716 self.default_server_changed_event.set() 717 self.default_server_changed_event.clear() 718 self._set_status('connected') 719 util.trigger_callback('network_updated') 720 if blockchain_updated: 721 util.trigger_callback('blockchain_updated') 722 723 async def _close_interface(self, interface: Optional[Interface]): 724 if not interface: 725 return 726 if interface.server in self._closing_ifaces: 727 return 728 self._closing_ifaces.add(interface.server) 729 with self.interfaces_lock: 730 if self.interfaces.get(interface.server) == interface: 731 self.interfaces.pop(interface.server) 732 if interface == self.interface: 733 self.interface = None 734 try: 735 # this can take some time if server/connection is slow: 736 await interface.close() 737 await interface.got_disconnected.wait() 738 finally: 739 self._closing_ifaces.discard(interface.server) 740 741 @with_recent_servers_lock 742 def _add_recent_server(self, server: ServerAddr) -> None: 743 self._on_connection_successfully_established(server) 744 # list is ordered 745 if server in self._recent_servers: 746 self._recent_servers.remove(server) 747 self._recent_servers.insert(0, server) 748 self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS] 749 self._save_recent_servers() 750 751 async def connection_down(self, interface: Interface): 752 '''A connection to server either went down, or was never made. 753 We distinguish by whether it is in self.interfaces.''' 754 if not interface: return 755 if interface.server == self.default_server: 756 self._set_status('disconnected') 757 await self._close_interface(interface) 758 util.trigger_callback('network_updated') 759 760 def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int: 761 if self.oneserver and not self.auto_connect: 762 return request_type.MOST_RELAXED 763 if self.proxy: 764 return request_type.RELAXED 765 return request_type.NORMAL 766 767 @ignore_exceptions # do not kill outer taskgroup 768 @log_exceptions 769 async def _run_new_interface(self, server: ServerAddr): 770 if (server in self.interfaces 771 or server in self._connecting_ifaces 772 or server in self._closing_ifaces): 773 return 774 self._connecting_ifaces.add(server) 775 if server == self.default_server: 776 self.logger.info(f"connecting to {server} as new interface") 777 self._set_status('connecting') 778 self._trying_addr_now(server) 779 780 interface = Interface(network=self, server=server, proxy=self.proxy) 781 # note: using longer timeouts here as DNS can sometimes be slow! 782 timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic) 783 try: 784 await asyncio.wait_for(interface.ready, timeout) 785 except BaseException as e: 786 self.logger.info(f"couldn't launch iface {server} -- {repr(e)}") 787 await interface.close() 788 return 789 else: 790 with self.interfaces_lock: 791 assert server not in self.interfaces 792 self.interfaces[server] = interface 793 finally: 794 self._connecting_ifaces.discard(server) 795 796 if server == self.default_server: 797 await self.switch_to_interface(server) 798 799 self._has_ever_managed_to_connect_to_server = True 800 self._add_recent_server(server) 801 util.trigger_callback('network_updated') 802 803 def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool: 804 # main interface is exempt. this makes switching servers easier 805 if iface_to_check.is_main_server(): 806 return True 807 if not iface_to_check.bucket_based_on_ipaddress(): 808 return True 809 # bucket connected interfaces 810 with self.interfaces_lock: 811 interfaces = list(self.interfaces.values()) 812 if iface_to_check in interfaces: 813 interfaces.remove(iface_to_check) 814 buckets = defaultdict(list) 815 for iface in interfaces: 816 buckets[iface.bucket_based_on_ipaddress()].append(iface) 817 # check proposed server against buckets 818 onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS] 819 if iface_to_check.is_tor(): 820 # keep number of onion servers below half of all connected servers 821 if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2: 822 return False 823 else: 824 bucket = iface_to_check.bucket_based_on_ipaddress() 825 if len(buckets[bucket]) > 0: 826 return False 827 return True 828 829 def best_effort_reliable(func): 830 @functools.wraps(func) 831 async def make_reliable_wrapper(self: 'Network', *args, **kwargs): 832 for i in range(10): 833 iface = self.interface 834 # retry until there is a main interface 835 if not iface: 836 async with ignore_after(1): 837 await self.default_server_changed_event.wait() 838 continue # try again 839 assert iface.ready.done(), "interface not ready yet" 840 # try actual request 841 try: 842 async with TaskGroup(wait=any) as group: 843 task = await group.spawn(func(self, *args, **kwargs)) 844 await group.spawn(iface.got_disconnected.wait()) 845 except RequestTimedOut: 846 await iface.close() 847 await iface.got_disconnected.wait() 848 continue # try again 849 except RequestCorrupted as e: 850 # TODO ban server? 851 iface.logger.exception(f"RequestCorrupted: {e}") 852 await iface.close() 853 await iface.got_disconnected.wait() 854 continue # try again 855 if task.done() and not task.cancelled(): 856 return task.result() 857 # otherwise; try again 858 raise BestEffortRequestFailed('no interface to do request on... gave up.') 859 return make_reliable_wrapper 860 861 def catch_server_exceptions(func): 862 @functools.wraps(func) 863 async def wrapper(self, *args, **kwargs): 864 try: 865 return await func(self, *args, **kwargs) 866 except aiorpcx.jsonrpc.CodeMessageError as e: 867 raise UntrustedServerReturnedError(original_exception=e) from e 868 return wrapper 869 870 @best_effort_reliable 871 @catch_server_exceptions 872 async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict: 873 return await self.interface.get_merkle_for_transaction(tx_hash=tx_hash, tx_height=tx_height) 874 875 @best_effort_reliable 876 async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None: 877 if timeout is None: 878 timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent) 879 try: 880 out = await self.interface.session.send_request('blockchain.transaction.broadcast', [tx.serialize()], timeout=timeout) 881 # note: both 'out' and exception messages are untrusted input from the server 882 except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError): 883 raise # pass-through 884 except aiorpcx.jsonrpc.CodeMessageError as e: 885 self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {repr(e)}") 886 raise TxBroadcastServerReturnedError(self.sanitize_tx_broadcast_response(e.message)) from e 887 except BaseException as e: # intentional BaseException for sanity! 888 self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {repr(e)}") 889 send_exception_to_crash_reporter(e) 890 raise TxBroadcastUnknownError() from e 891 if out != tx.txid(): 892 self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: {out} != {tx.txid()}") 893 raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID.")) 894 895 async def try_broadcasting(self, tx, name): 896 try: 897 await self.broadcast_transaction(tx) 898 except Exception as e: 899 self.logger.info(f'error: could not broadcast {name} {tx.txid()}, {str(e)}') 900 else: 901 self.logger.info(f'success: broadcasting {name} {tx.txid()}') 902 903 @staticmethod 904 def sanitize_tx_broadcast_response(server_msg) -> str: 905 # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code. 906 # So, we use substring matching to grok the error message. 907 # server_msg is untrusted input so it should not be shown to the user. see #4968 908 server_msg = str(server_msg) 909 server_msg = server_msg.replace("\n", r"\n") 910 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp 911 # grep "reason =" 912 policy_error_messages = { 913 r"version": _("Transaction uses non-standard version."), 914 r"tx-size": _("The transaction was rejected because it is too large (in bytes)."), 915 r"scriptsig-size": None, 916 r"scriptsig-not-pushonly": None, 917 r"scriptpubkey": 918 ("scriptpubkey\n" + 919 _("Some of the outputs pay to a non-standard script.")), 920 r"bare-multisig": None, 921 r"dust": 922 (_("Transaction could not be broadcast due to dust outputs.\n" 923 "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n" 924 "Check the units, make sure you haven't confused e.g. mBTC and BTC.")), 925 r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."), 926 } 927 for substring in policy_error_messages: 928 if substring in server_msg: 929 msg = policy_error_messages[substring] 930 return msg if msg else substring 931 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp 932 script_error_messages = { 933 r"Script evaluated without error but finished with a false/empty top stack element", 934 r"Script failed an OP_VERIFY operation", 935 r"Script failed an OP_EQUALVERIFY operation", 936 r"Script failed an OP_CHECKMULTISIGVERIFY operation", 937 r"Script failed an OP_CHECKSIGVERIFY operation", 938 r"Script failed an OP_NUMEQUALVERIFY operation", 939 r"Script is too big", 940 r"Push value size limit exceeded", 941 r"Operation limit exceeded", 942 r"Stack size limit exceeded", 943 r"Signature count negative or greater than pubkey count", 944 r"Pubkey count negative or limit exceeded", 945 r"Opcode missing or not understood", 946 r"Attempted to use a disabled opcode", 947 r"Operation not valid with the current stack size", 948 r"Operation not valid with the current altstack size", 949 r"OP_RETURN was encountered", 950 r"Invalid OP_IF construction", 951 r"Negative locktime", 952 r"Locktime requirement not satisfied", 953 r"Signature hash type missing or not understood", 954 r"Non-canonical DER signature", 955 r"Data push larger than necessary", 956 r"Only push operators allowed in signatures", 957 r"Non-canonical signature: S value is unnecessarily high", 958 r"Dummy CHECKMULTISIG argument must be zero", 959 r"OP_IF/NOTIF argument must be minimal", 960 r"Signature must be zero for failed CHECK(MULTI)SIG operation", 961 r"NOPx reserved for soft-fork upgrades", 962 r"Witness version reserved for soft-fork upgrades", 963 r"Taproot version reserved for soft-fork upgrades", 964 r"OP_SUCCESSx reserved for soft-fork upgrades", 965 r"Public key version reserved for soft-fork upgrades", 966 r"Public key is neither compressed or uncompressed", 967 r"Stack size must be exactly one after execution", 968 r"Extra items left on stack after execution", 969 r"Witness program has incorrect length", 970 r"Witness program was passed an empty witness", 971 r"Witness program hash mismatch", 972 r"Witness requires empty scriptSig", 973 r"Witness requires only-redeemscript scriptSig", 974 r"Witness provided for non-witness script", 975 r"Using non-compressed keys in segwit", 976 r"Invalid Schnorr signature size", 977 r"Invalid Schnorr signature hash type", 978 r"Invalid Schnorr signature", 979 r"Invalid Taproot control block size", 980 r"Too much signature validation relative to witness weight", 981 r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript", 982 r"OP_IF/NOTIF argument must be minimal in tapscript", 983 r"Using OP_CODESEPARATOR in non-witness script", 984 r"Signature is found in scriptCode", 985 } 986 for substring in script_error_messages: 987 if substring in server_msg: 988 return substring 989 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp 990 # grep "REJECT_" 991 # grep "TxValidationResult" 992 # should come after script_error.cpp (due to e.g. non-mandatory-script-verify-flag) 993 validation_error_messages = { 994 r"coinbase": None, 995 r"tx-size-small": None, 996 r"non-final": None, 997 r"txn-already-in-mempool": None, 998 r"txn-mempool-conflict": None, 999 r"txn-already-known": None, 1000 r"non-BIP68-final": None, 1001 r"bad-txns-nonstandard-inputs": None, 1002 r"bad-witness-nonstandard": None, 1003 r"bad-txns-too-many-sigops": None, 1004 r"mempool min fee not met": 1005 ("mempool min fee not met\n" + 1006 _("Your transaction is paying a fee that is so low that the bitcoin node cannot " 1007 "fit it into its mempool. The mempool is already full of hundreds of megabytes " 1008 "of transactions that all pay higher fees. Try to increase the fee.")), 1009 r"min relay fee not met": None, 1010 r"absurdly-high-fee": None, 1011 r"max-fee-exceeded": None, 1012 r"too-long-mempool-chain": None, 1013 r"bad-txns-spends-conflicting-tx": None, 1014 r"insufficient fee": ("insufficient fee\n" + 1015 _("Your transaction is trying to replace another one in the mempool but it " 1016 "does not meet the rules to do so. Try to increase the fee.")), 1017 r"too many potential replacements": None, 1018 r"replacement-adds-unconfirmed": None, 1019 r"mempool full": None, 1020 r"non-mandatory-script-verify-flag": None, 1021 r"mandatory-script-verify-flag-failed": None, 1022 r"Transaction check failed": None, 1023 } 1024 for substring in validation_error_messages: 1025 if substring in server_msg: 1026 msg = validation_error_messages[substring] 1027 return msg if msg else substring 1028 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp 1029 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp 1030 # grep "RPC_TRANSACTION" 1031 # grep "RPC_DESERIALIZATION_ERROR" 1032 rawtransaction_error_messages = { 1033 r"Missing inputs": None, 1034 r"Inputs missing or spent": None, 1035 r"transaction already in block chain": None, 1036 r"Transaction already in block chain": None, 1037 r"TX decode failed": None, 1038 r"Peer-to-peer functionality missing or disabled": None, 1039 r"Transaction rejected by AcceptToMemoryPool": None, 1040 r"AcceptToMemoryPool failed": None, 1041 r"Fee exceeds maximum configured by user": None, 1042 } 1043 for substring in rawtransaction_error_messages: 1044 if substring in server_msg: 1045 msg = rawtransaction_error_messages[substring] 1046 return msg if msg else substring 1047 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp 1048 # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp 1049 # grep "REJECT_" 1050 # grep "TxValidationResult" 1051 tx_verify_error_messages = { 1052 r"bad-txns-vin-empty": None, 1053 r"bad-txns-vout-empty": None, 1054 r"bad-txns-oversize": None, 1055 r"bad-txns-vout-negative": None, 1056 r"bad-txns-vout-toolarge": None, 1057 r"bad-txns-txouttotal-toolarge": None, 1058 r"bad-txns-inputs-duplicate": None, 1059 r"bad-cb-length": None, 1060 r"bad-txns-prevout-null": None, 1061 r"bad-txns-inputs-missingorspent": 1062 ("bad-txns-inputs-missingorspent\n" + 1063 _("You might have a local transaction in your wallet that this transaction " 1064 "builds on top. You need to either broadcast or remove the local tx.")), 1065 r"bad-txns-premature-spend-of-coinbase": None, 1066 r"bad-txns-inputvalues-outofrange": None, 1067 r"bad-txns-in-belowout": None, 1068 r"bad-txns-fee-outofrange": None, 1069 } 1070 for substring in tx_verify_error_messages: 1071 if substring in server_msg: 1072 msg = tx_verify_error_messages[substring] 1073 return msg if msg else substring 1074 # otherwise: 1075 return _("Unknown error") 1076 1077 @best_effort_reliable 1078 @catch_server_exceptions 1079 async def request_chunk(self, height: int, tip=None, *, can_return_early=False): 1080 return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early) 1081 1082 @best_effort_reliable 1083 @catch_server_exceptions 1084 async def get_transaction(self, tx_hash: str, *, timeout=None) -> str: 1085 return await self.interface.get_transaction(tx_hash=tx_hash, timeout=timeout) 1086 1087 @best_effort_reliable 1088 @catch_server_exceptions 1089 async def get_history_for_scripthash(self, sh: str) -> List[dict]: 1090 return await self.interface.get_history_for_scripthash(sh) 1091 1092 @best_effort_reliable 1093 @catch_server_exceptions 1094 async def listunspent_for_scripthash(self, sh: str) -> List[dict]: 1095 return await self.interface.listunspent_for_scripthash(sh) 1096 1097 @best_effort_reliable 1098 @catch_server_exceptions 1099 async def get_balance_for_scripthash(self, sh: str) -> dict: 1100 return await self.interface.get_balance_for_scripthash(sh) 1101 1102 @best_effort_reliable 1103 @catch_server_exceptions 1104 async def get_txid_from_txpos(self, tx_height, tx_pos, merkle): 1105 return await self.interface.get_txid_from_txpos(tx_height, tx_pos, merkle) 1106 1107 def blockchain(self) -> Blockchain: 1108 interface = self.interface 1109 if interface and interface.blockchain is not None: 1110 self._blockchain = interface.blockchain 1111 return self._blockchain 1112 1113 def get_blockchains(self): 1114 out = {} # blockchain_id -> list(interfaces) 1115 with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items()) 1116 with self.interfaces_lock: interfaces_values = list(self.interfaces.values()) 1117 for chain_id, bc in blockchain_items: 1118 r = list(filter(lambda i: i.blockchain==bc, interfaces_values)) 1119 if r: 1120 out[chain_id] = r 1121 return out 1122 1123 def _set_preferred_chain(self, chain: Optional[Blockchain]): 1124 if chain: 1125 height = chain.get_max_forkpoint() 1126 header_hash = chain.get_hash(height) 1127 else: 1128 height = 0 1129 header_hash = constants.net.GENESIS 1130 self._blockchain_preferred_block = { 1131 'height': height, 1132 'hash': header_hash, 1133 } 1134 self.config.set_key('blockchain_preferred_block', self._blockchain_preferred_block) 1135 1136 async def follow_chain_given_id(self, chain_id: str) -> None: 1137 bc = blockchain.blockchains.get(chain_id) 1138 if not bc: 1139 raise Exception('blockchain {} not found'.format(chain_id)) 1140 self._set_preferred_chain(bc) 1141 # select server on this chain 1142 with self.interfaces_lock: interfaces = list(self.interfaces.values()) 1143 interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces)) 1144 if len(interfaces_on_selected_chain) == 0: return 1145 chosen_iface = random.choice(interfaces_on_selected_chain) # type: Interface 1146 # switch to server (and save to config) 1147 net_params = self.get_parameters() 1148 net_params = net_params._replace(server=chosen_iface.server) 1149 await self.set_parameters(net_params) 1150 1151 async def follow_chain_given_server(self, server: ServerAddr) -> None: 1152 # note that server_str should correspond to a connected interface 1153 iface = self.interfaces.get(server) 1154 if iface is None: 1155 return 1156 self._set_preferred_chain(iface.blockchain) 1157 # switch to server (and save to config) 1158 net_params = self.get_parameters() 1159 net_params = net_params._replace(server=server) 1160 await self.set_parameters(net_params) 1161 1162 def get_server_height(self) -> int: 1163 """Length of header chain, as claimed by main interface.""" 1164 interface = self.interface 1165 return interface.tip if interface else 0 1166 1167 def get_local_height(self): 1168 """Length of header chain, POW-verified. 1169 In case of a chain split, this is for the branch the main interface is on, 1170 but it is the tip of that branch (even if main interface is behind). 1171 """ 1172 return self.blockchain().height() 1173 1174 def export_checkpoints(self, path): 1175 """Run manually to generate blockchain checkpoints. 1176 Kept for console use only. 1177 """ 1178 cp = self.blockchain().get_checkpoints() 1179 with open(path, 'w', encoding='utf-8') as f: 1180 f.write(json.dumps(cp, indent=4)) 1181 1182 async def _start(self): 1183 assert not self.taskgroup 1184 self.taskgroup = taskgroup = SilentTaskGroup() 1185 assert not self.interface and not self.interfaces 1186 assert not self._connecting_ifaces 1187 assert not self._closing_ifaces 1188 self.logger.info('starting network') 1189 self._clear_addr_retry_times() 1190 self._set_proxy(deserialize_proxy(self.config.get('proxy'))) 1191 self._maybe_set_oneserver() 1192 await self.taskgroup.spawn(self._run_new_interface(self.default_server)) 1193 1194 async def main(): 1195 self.logger.info("starting taskgroup.") 1196 try: 1197 # note: if a task finishes with CancelledError, that 1198 # will NOT raise, and the group will keep the other tasks running 1199 async with taskgroup as group: 1200 await group.spawn(self._maintain_sessions()) 1201 [await group.spawn(job) for job in self._jobs] 1202 except asyncio.CancelledError: 1203 raise 1204 except Exception as e: 1205 self.logger.exception("taskgroup died.") 1206 finally: 1207 self.logger.info("taskgroup stopped.") 1208 asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop) 1209 1210 util.trigger_callback('network_updated') 1211 1212 def start(self, jobs: Iterable = None): 1213 """Schedule starting the network, along with the given job co-routines. 1214 1215 Note: the jobs will *restart* every time the network restarts, e.g. on proxy 1216 setting changes. 1217 """ 1218 self._jobs = jobs or [] 1219 asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop) 1220 1221 @log_exceptions 1222 async def stop(self, *, full_shutdown: bool = True): 1223 self.logger.info("stopping network") 1224 # timeout: if full_shutdown, it is up to the caller to time us out, 1225 # otherwise if e.g. restarting due to proxy changes, we time out fast 1226 async with (nullcontext() if full_shutdown else ignore_after(1)): 1227 async with TaskGroup() as group: 1228 await group.spawn(self.taskgroup.cancel_remaining()) 1229 if full_shutdown: 1230 await group.spawn(self.stop_gossip(full_shutdown=full_shutdown)) 1231 self.taskgroup = None 1232 self.interface = None 1233 self.interfaces = {} 1234 self._connecting_ifaces.clear() 1235 self._closing_ifaces.clear() 1236 if not full_shutdown: 1237 util.trigger_callback('network_updated') 1238 1239 async def _ensure_there_is_a_main_interface(self): 1240 if self.is_connected(): 1241 return 1242 # if auto_connect is set, try a different server 1243 if self.auto_connect and not self.is_connecting(): 1244 await self._switch_to_random_interface() 1245 # if auto_connect is not set, or still no main interface, retry current 1246 if not self.is_connected() and not self.is_connecting(): 1247 if self._can_retry_addr(self.default_server, urgent=True): 1248 await self.switch_to_interface(self.default_server) 1249 1250 async def _maintain_sessions(self): 1251 async def maybe_start_new_interfaces(): 1252 num_existing_ifaces = len(self.interfaces) + len(self._connecting_ifaces) + len(self._closing_ifaces) 1253 for i in range(self.num_server - num_existing_ifaces): 1254 # FIXME this should try to honour "healthy spread of connected servers" 1255 server = self._get_next_server_to_try() 1256 if server: 1257 await self.taskgroup.spawn(self._run_new_interface(server)) 1258 async def maintain_healthy_spread_of_connected_servers(): 1259 with self.interfaces_lock: interfaces = list(self.interfaces.values()) 1260 random.shuffle(interfaces) 1261 for iface in interfaces: 1262 if not self.check_interface_against_healthy_spread_of_connected_servers(iface): 1263 self.logger.info(f"disconnecting from {iface.server}. too many connected " 1264 f"servers already in bucket {iface.bucket_based_on_ipaddress()}") 1265 await self._close_interface(iface) 1266 async def maintain_main_interface(): 1267 await self._ensure_there_is_a_main_interface() 1268 if self.is_connected(): 1269 if self.config.is_fee_estimates_update_required(): 1270 await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface) 1271 1272 while True: 1273 try: 1274 await maybe_start_new_interfaces() 1275 await maintain_healthy_spread_of_connected_servers() 1276 await maintain_main_interface() 1277 except asyncio.CancelledError: 1278 # suppress spurious cancellations 1279 group = self.taskgroup 1280 if not group or group.closed(): 1281 raise 1282 await asyncio.sleep(0.1) 1283 1284 @classmethod 1285 async def _send_http_on_proxy(cls, method: str, url: str, params: str = None, 1286 body: bytes = None, json: dict = None, headers=None, 1287 on_finish=None, timeout=None): 1288 async def default_on_finish(resp: ClientResponse): 1289 resp.raise_for_status() 1290 return await resp.text() 1291 if headers is None: 1292 headers = {} 1293 if on_finish is None: 1294 on_finish = default_on_finish 1295 network = cls.get_instance() 1296 proxy = network.proxy if network else None 1297 async with make_aiohttp_session(proxy, timeout=timeout) as session: 1298 if method == 'get': 1299 async with session.get(url, params=params, headers=headers) as resp: 1300 return await on_finish(resp) 1301 elif method == 'post': 1302 assert body is not None or json is not None, 'body or json must be supplied if method is post' 1303 if body is not None: 1304 async with session.post(url, data=body, headers=headers) as resp: 1305 return await on_finish(resp) 1306 elif json is not None: 1307 async with session.post(url, json=json, headers=headers) as resp: 1308 return await on_finish(resp) 1309 else: 1310 assert False 1311 1312 @classmethod 1313 def send_http_on_proxy(cls, method, url, **kwargs): 1314 network = cls.get_instance() 1315 if network: 1316 assert util.get_running_loop() != network.asyncio_loop 1317 loop = network.asyncio_loop 1318 else: 1319 loop = asyncio.get_event_loop() 1320 coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop) 1321 # note: _send_http_on_proxy has its own timeout, so no timeout here: 1322 return coro.result() 1323 1324 # methods used in scripts 1325 async def get_peers(self): 1326 while not self.is_connected(): 1327 await asyncio.sleep(1) 1328 session = self.interface.session 1329 return parse_servers(await session.send_request('server.peers.subscribe')) 1330 1331 async def send_multiple_requests(self, servers: Sequence[ServerAddr], method: str, params: Sequence): 1332 responses = dict() 1333 async def get_response(server: ServerAddr): 1334 interface = Interface(network=self, server=server, proxy=self.proxy) 1335 timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent) 1336 try: 1337 await asyncio.wait_for(interface.ready, timeout) 1338 except BaseException as e: 1339 await interface.close() 1340 return 1341 try: 1342 res = await interface.session.send_request(method, params, timeout=10) 1343 except Exception as e: 1344 res = e 1345 responses[interface.server] = res 1346 async with TaskGroup() as group: 1347 for server in servers: 1348 await group.spawn(get_response(server)) 1349 return responses 1350