1# Electrum - Lightweight Bitcoin Client
2# Copyright (c) 2011-2016 Thomas Voegtlin
3#
4# Permission is hereby granted, free of charge, to any person
5# obtaining a copy of this software and associated documentation files
6# (the "Software"), to deal in the Software without restriction,
7# including without limitation the rights to use, copy, modify, merge,
8# publish, distribute, sublicense, and/or sell copies of the Software,
9# and to permit persons to whom the Software is furnished to do so,
10# subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be
13# included in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22# SOFTWARE.
23import asyncio
24import time
25import queue
26import os
27import random
28import re
29from collections import defaultdict
30import threading
31import socket
32import json
33import sys
34import asyncio
35from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable, Set, Any
36import traceback
37import concurrent
38from concurrent import futures
39import copy
40import functools
41
42import aiorpcx
43from aiorpcx import TaskGroup, ignore_after
44from aiohttp import ClientResponse
45
46from . import util
47from .util import (log_exceptions, ignore_exceptions,
48                   bfh, SilentTaskGroup, make_aiohttp_session, send_exception_to_crash_reporter,
49                   is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager,
50                   nullcontext)
51from .bitcoin import COIN
52from . import constants
53from . import blockchain
54from . import bitcoin
55from . import dns_hacks
56from .transaction import Transaction
57from .blockchain import Blockchain, HEADER_SIZE
58from .interface import (Interface, PREFERRED_NETWORK_PROTOCOL,
59                        RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS,
60                        NetworkException, RequestCorrupted, ServerAddr)
61from .version import PROTOCOL_VERSION
62from .simple_config import SimpleConfig
63from .i18n import _
64from .logging import get_logger, Logger
65
66if TYPE_CHECKING:
67    from .channel_db import ChannelDB
68    from .lnrouter import LNPathFinder
69    from .lnworker import LNGossip
70    from .lnwatcher import WatchTower
71    from .daemon import Daemon
72
73
74_logger = get_logger(__name__)
75
76
77NUM_TARGET_CONNECTED_SERVERS = 10
78NUM_STICKY_SERVERS = 4
79NUM_RECENT_SERVERS = 20
80
81
82def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
83    """Convert servers list (from protocol method "server.peers.subscribe") into dict format.
84    Also validate values, such as IP addresses and ports.
85    """
86    servers = {}
87    for item in result:
88        host = item[1]
89        out = {}
90        version = None
91        pruning_level = '-'
92        if len(item) > 2:
93            for v in item[2]:
94                if re.match(r"[st]\d*", v):
95                    protocol, port = v[0], v[1:]
96                    if port == '': port = constants.net.DEFAULT_PORTS[protocol]
97                    ServerAddr(host, port, protocol=protocol)  # check if raises
98                    out[protocol] = port
99                elif re.match("v(.?)+", v):
100                    version = v[1:]
101                elif re.match(r"p\d*", v):
102                    pruning_level = v[1:]
103                if pruning_level == '': pruning_level = '0'
104        if out:
105            out['pruning'] = pruning_level
106            out['version'] = version
107            servers[host] = out
108    return servers
109
110
111def filter_version(servers):
112    def is_recent(version):
113        try:
114            return util.versiontuple(version) >= util.versiontuple(PROTOCOL_VERSION)
115        except Exception as e:
116            return False
117    return {k: v for k, v in servers.items() if is_recent(v.get('version'))}
118
119
120def filter_noonion(servers):
121    return {k: v for k, v in servers.items() if not k.endswith('.onion')}
122
123
124def filter_protocol(hostmap, *, allowed_protocols: Iterable[str] = None) -> Sequence[ServerAddr]:
125    """Filters the hostmap for those implementing protocol."""
126    if allowed_protocols is None:
127        allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
128    eligible = []
129    for host, portmap in hostmap.items():
130        for protocol in allowed_protocols:
131            port = portmap.get(protocol)
132            if port:
133                eligible.append(ServerAddr(host, port, protocol=protocol))
134    return eligible
135
136
137def pick_random_server(hostmap=None, *, allowed_protocols: Iterable[str],
138                       exclude_set: Set[ServerAddr] = None) -> Optional[ServerAddr]:
139    if hostmap is None:
140        hostmap = constants.net.DEFAULT_SERVERS
141    if exclude_set is None:
142        exclude_set = set()
143    servers = set(filter_protocol(hostmap, allowed_protocols=allowed_protocols))
144    eligible = list(servers - exclude_set)
145    return random.choice(eligible) if eligible else None
146
147
148class NetworkParameters(NamedTuple):
149    server: ServerAddr
150    proxy: Optional[dict]
151    auto_connect: bool
152    oneserver: bool = False
153
154
155proxy_modes = ['socks4', 'socks5']
156
157
158def serialize_proxy(p):
159    if not isinstance(p, dict):
160        return None
161    return ':'.join([p.get('mode'), p.get('host'), p.get('port'),
162                     p.get('user', ''), p.get('password', '')])
163
164
165def deserialize_proxy(s: str) -> Optional[dict]:
166    if not isinstance(s, str):
167        return None
168    if s.lower() == 'none':
169        return None
170    proxy = {"mode":"socks5", "host":"localhost"}
171    # FIXME raw IPv6 address fails here
172    args = s.split(':')
173    n = 0
174    if proxy_modes.count(args[n]) == 1:
175        proxy["mode"] = args[n]
176        n += 1
177    if len(args) > n:
178        proxy["host"] = args[n]
179        n += 1
180    if len(args) > n:
181        proxy["port"] = args[n]
182        n += 1
183    else:
184        proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
185    if len(args) > n:
186        proxy["user"] = args[n]
187        n += 1
188    if len(args) > n:
189        proxy["password"] = args[n]
190    return proxy
191
192
193class BestEffortRequestFailed(NetworkException): pass
194
195
196class TxBroadcastError(NetworkException):
197    def get_message_for_gui(self):
198        raise NotImplementedError()
199
200
201class TxBroadcastHashMismatch(TxBroadcastError):
202    def get_message_for_gui(self):
203        return "{}\n{}\n\n{}" \
204            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
205                    _("Consider trying to connect to a different server, or updating Electrum."),
206                    str(self))
207
208
209class TxBroadcastServerReturnedError(TxBroadcastError):
210    def get_message_for_gui(self):
211        return "{}\n{}\n\n{}" \
212            .format(_("The server returned an error when broadcasting the transaction."),
213                    _("Consider trying to connect to a different server, or updating Electrum."),
214                    str(self))
215
216
217class TxBroadcastUnknownError(TxBroadcastError):
218    def get_message_for_gui(self):
219        return "{}\n{}" \
220            .format(_("Unknown error when broadcasting the transaction."),
221                    _("Consider trying to connect to a different server, or updating Electrum."))
222
223
224class UntrustedServerReturnedError(NetworkException):
225    def __init__(self, *, original_exception):
226        self.original_exception = original_exception
227
228    def get_message_for_gui(self) -> str:
229        return str(self)
230
231    def __str__(self):
232        return _("The server returned an error.")
233
234    def __repr__(self):
235        return (f"<UntrustedServerReturnedError "
236                f"[DO NOT TRUST THIS MESSAGE] original_exception: {repr(self.original_exception)}>")
237
238
239_INSTANCE = None
240
241
242class Network(Logger, NetworkRetryManager[ServerAddr]):
243    """The Network class manages a set of connections to remote electrum
244    servers, each connected socket is handled by an Interface() object.
245    """
246
247    LOGGING_SHORTCUT = 'n'
248
249    taskgroup: Optional[TaskGroup]
250    interface: Optional[Interface]
251    interfaces: Dict[ServerAddr, Interface]
252    _connecting_ifaces: Set[ServerAddr]
253    _closing_ifaces: Set[ServerAddr]
254    default_server: ServerAddr
255    _recent_servers: List[ServerAddr]
256
257    channel_db: Optional['ChannelDB'] = None
258    lngossip: Optional['LNGossip'] = None
259    local_watchtower: Optional['WatchTower'] = None
260    path_finder: Optional['LNPathFinder'] = None
261
262    def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None):
263        global _INSTANCE
264        assert _INSTANCE is None, "Network is a singleton!"
265        _INSTANCE = self
266
267        Logger.__init__(self)
268        NetworkRetryManager.__init__(
269            self,
270            max_retry_delay_normal=600,
271            init_retry_delay_normal=15,
272            max_retry_delay_urgent=10,
273            init_retry_delay_urgent=1,
274        )
275
276        self.asyncio_loop = asyncio.get_event_loop()
277        assert self.asyncio_loop.is_running(), "event loop not running"
278
279        assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}"
280        self.config = config
281
282        self.daemon = daemon
283
284        blockchain.read_blockchains(self.config)
285        blockchain.init_headers_file_for_best_chain()
286        self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
287        self._blockchain_preferred_block = self.config.get('blockchain_preferred_block', None)  # type: Dict[str, Any]
288        if self._blockchain_preferred_block is None:
289            self._set_preferred_chain(None)
290        self._blockchain = blockchain.get_best_chain()
291
292        self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
293
294        # Server for addresses and transactions
295        self.default_server = self.config.get('server', None)
296        # Sanitize default server
297        if self.default_server:
298            try:
299                self.default_server = ServerAddr.from_str(self.default_server)
300            except:
301                self.logger.warning('failed to parse server-string; falling back to localhost:1:s.')
302                self.default_server = ServerAddr.from_str("localhost:1:s")
303        else:
304            self.default_server = pick_random_server(allowed_protocols=self._allowed_protocols)
305        assert isinstance(self.default_server, ServerAddr), f"invalid type for default_server: {self.default_server!r}"
306
307        self.taskgroup = None
308
309        # locks
310        self.restart_lock = asyncio.Lock()
311        self.bhi_lock = asyncio.Lock()
312        self.recent_servers_lock = threading.RLock()       # <- re-entrant
313        self.interfaces_lock = threading.Lock()            # for mutating/iterating self.interfaces
314
315        self.server_peers = {}  # returned by interface (servers that the main interface knows about)
316        self._recent_servers = self._read_recent_servers()  # note: needs self.recent_servers_lock
317
318        self.banner = ''
319        self.donation_address = ''
320        self.relay_fee = None  # type: Optional[int]
321
322        dir_path = os.path.join(self.config.path, 'certs')
323        util.make_dir(dir_path)
324
325        # the main server we are currently communicating with
326        self.interface = None
327        self.default_server_changed_event = asyncio.Event()
328        # Set of servers we have an ongoing connection with.
329        # For any ServerAddr, at most one corresponding Interface object
330        # can exist at any given time. Depending on the state of that Interface,
331        # the ServerAddr can be found in one of the following sets.
332        # Note: during a transition, the ServerAddr can appear in two sets momentarily.
333        self._connecting_ifaces = set()
334        self.interfaces = {}  # these are the ifaces in "initialised and usable" state
335        self._closing_ifaces = set()
336
337        self.auto_connect = self.config.get('auto_connect', True)
338        self.proxy = None
339        self._maybe_set_oneserver()
340
341        # Dump network messages (all interfaces).  Set at runtime from the console.
342        self.debug = False
343
344        self._set_status('disconnected')
345        self._has_ever_managed_to_connect_to_server = False
346
347        # lightning network
348        if self.config.get('run_watchtower', False):
349            from . import lnwatcher
350            self.local_watchtower = lnwatcher.WatchTower(self)
351            self.local_watchtower.start_network(self)
352            asyncio.ensure_future(self.local_watchtower.start_watching())
353
354    def has_internet_connection(self) -> bool:
355        """Our guess whether the device has Internet-connectivity."""
356        return self._has_ever_managed_to_connect_to_server
357
358    def has_channel_db(self):
359        return self.channel_db is not None
360
361    def start_gossip(self):
362        from . import lnrouter
363        from . import channel_db
364        from . import lnworker
365        if not self.config.get('use_gossip'):
366            return
367        if self.lngossip is None:
368            self.channel_db = channel_db.ChannelDB(self)
369            self.path_finder = lnrouter.LNPathFinder(self.channel_db)
370            self.channel_db.load_data()
371            self.lngossip = lnworker.LNGossip()
372            self.lngossip.start_network(self)
373
374    async def stop_gossip(self, *, full_shutdown: bool = False):
375        if self.lngossip:
376            await self.lngossip.stop()
377            self.lngossip = None
378            self.channel_db.stop()
379            if full_shutdown:
380                await self.channel_db.stopped_event.wait()
381            self.channel_db = None
382            self.path_finder = None
383
384    def run_from_another_thread(self, coro, *, timeout=None):
385        assert util.get_running_loop() != self.asyncio_loop, 'must not be called from network thread'
386        fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
387        return fut.result(timeout)
388
389    @staticmethod
390    def get_instance() -> Optional["Network"]:
391        return _INSTANCE
392
393    def with_recent_servers_lock(func):
394        def func_wrapper(self, *args, **kwargs):
395            with self.recent_servers_lock:
396                return func(self, *args, **kwargs)
397        return func_wrapper
398
399    def _read_recent_servers(self) -> List[ServerAddr]:
400        if not self.config.path:
401            return []
402        path = os.path.join(self.config.path, "recent_servers")
403        try:
404            with open(path, "r", encoding='utf-8') as f:
405                data = f.read()
406                servers_list = json.loads(data)
407            return [ServerAddr.from_str(s) for s in servers_list]
408        except:
409            return []
410
411    @with_recent_servers_lock
412    def _save_recent_servers(self):
413        if not self.config.path:
414            return
415        path = os.path.join(self.config.path, "recent_servers")
416        s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder)
417        try:
418            with open(path, "w", encoding='utf-8') as f:
419                f.write(s)
420        except:
421            pass
422
423    async def _server_is_lagging(self) -> bool:
424        sh = self.get_server_height()
425        if not sh:
426            self.logger.info('no height for main interface')
427            return True
428        lh = self.get_local_height()
429        result = (lh - sh) > 1
430        if result:
431            self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})')
432        return result
433
434    def _set_status(self, status):
435        self.connection_status = status
436        self.notify('status')
437
438    def is_connected(self):
439        interface = self.interface
440        return interface is not None and interface.ready.done()
441
442    def is_connecting(self):
443        return self.connection_status == 'connecting'
444
445    async def _request_server_info(self, interface: 'Interface'):
446        await interface.ready
447        session = interface.session
448
449        async def get_banner():
450            self.banner = await interface.get_server_banner()
451            self.notify('banner')
452        async def get_donation_address():
453            self.donation_address = await interface.get_donation_address()
454        async def get_server_peers():
455            server_peers = await session.send_request('server.peers.subscribe')
456            random.shuffle(server_peers)
457            max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS
458            server_peers = server_peers[:max_accepted_peers]
459            # note that 'parse_servers' also validates the data (which is untrusted input!)
460            self.server_peers = parse_servers(server_peers)
461            self.notify('servers')
462        async def get_relay_fee():
463            self.relay_fee = await interface.get_relay_fee()
464
465        async with TaskGroup() as group:
466            await group.spawn(get_banner)
467            await group.spawn(get_donation_address)
468            await group.spawn(get_server_peers)
469            await group.spawn(get_relay_fee)
470            await group.spawn(self._request_fee_estimates(interface))
471
472    async def _request_fee_estimates(self, interface):
473        self.config.requested_fee_estimates()
474        histogram = await interface.get_fee_histogram()
475        self.config.mempool_fees = histogram
476        self.logger.info(f'fee_histogram {histogram}')
477        self.notify('fee_histogram')
478
479    def get_status_value(self, key):
480        if key == 'status':
481            value = self.connection_status
482        elif key == 'banner':
483            value = self.banner
484        elif key == 'fee':
485            value = self.config.fee_estimates
486        elif key == 'fee_histogram':
487            value = self.config.mempool_fees
488        elif key == 'servers':
489            value = self.get_servers()
490        else:
491            raise Exception('unexpected trigger key {}'.format(key))
492        return value
493
494    def notify(self, key):
495        if key in ['status', 'updated']:
496            util.trigger_callback(key)
497        else:
498            util.trigger_callback(key, self.get_status_value(key))
499
500    def get_parameters(self) -> NetworkParameters:
501        return NetworkParameters(server=self.default_server,
502                                 proxy=self.proxy,
503                                 auto_connect=self.auto_connect,
504                                 oneserver=self.oneserver)
505
506    def get_donation_address(self):
507        if self.is_connected():
508            return self.donation_address
509
510    def get_interfaces(self) -> List[ServerAddr]:
511        """The list of servers for the connected interfaces."""
512        with self.interfaces_lock:
513            return list(self.interfaces)
514
515    def get_fee_estimates(self):
516        from statistics import median
517        from .simple_config import FEE_ETA_TARGETS
518        if self.auto_connect:
519            with self.interfaces_lock:
520                out = {}
521                for n in FEE_ETA_TARGETS:
522                    try:
523                        out[n] = int(median(filter(None, [i.fee_estimates_eta.get(n) for i in self.interfaces.values()])))
524                    except:
525                        continue
526                return out
527        else:
528            if not self.interface:
529                return {}
530            return self.interface.fee_estimates_eta
531
532    def update_fee_estimates(self, *, fee_est: Dict[int, int] = None):
533        if fee_est is None:
534            fee_est = self.get_fee_estimates()
535        for nblock_target, fee in fee_est.items():
536            self.config.update_fee_estimates(nblock_target, fee)
537        if not hasattr(self, "_prev_fee_est") or self._prev_fee_est != fee_est:
538            self._prev_fee_est = copy.copy(fee_est)
539            self.logger.info(f'fee_estimates {fee_est}')
540        self.notify('fee')
541
542    @with_recent_servers_lock
543    def get_servers(self):
544        # note: order of sources when adding servers here is crucial!
545        # don't let "server_peers" overwrite anything,
546        # otherwise main server can eclipse the client
547        out = dict()
548        # add servers received from main interface
549        server_peers = self.server_peers
550        if server_peers:
551            out.update(filter_version(server_peers.copy()))
552        # hardcoded servers
553        out.update(constants.net.DEFAULT_SERVERS)
554        # add recent servers
555        for server in self._recent_servers:
556            port = str(server.port)
557            if server.host in out:
558                out[server.host].update({server.protocol: port})
559            else:
560                out[server.host] = {server.protocol: port}
561        # potentially filter out some
562        if self.config.get('noonion'):
563            out = filter_noonion(out)
564        return out
565
566    def _get_next_server_to_try(self) -> Optional[ServerAddr]:
567        now = time.time()
568        with self.interfaces_lock:
569            connected_servers = set(self.interfaces) | self._connecting_ifaces | self._closing_ifaces
570        # First try from recent servers. (which are persisted)
571        # As these are servers we successfully connected to recently, they are
572        # most likely to work. This also makes servers "sticky".
573        # Note: with sticky servers, it is more difficult for an attacker to eclipse the client,
574        #       however if they succeed, the eclipsing would persist. To try to balance this,
575        #       we only give priority to recent_servers up to NUM_STICKY_SERVERS.
576        with self.recent_servers_lock:
577            recent_servers = list(self._recent_servers)
578        recent_servers = [s for s in recent_servers if s.protocol in self._allowed_protocols]
579        if len(connected_servers & set(recent_servers)) < NUM_STICKY_SERVERS:
580            for server in recent_servers:
581                if server in connected_servers:
582                    continue
583                if not self._can_retry_addr(server, now=now):
584                    continue
585                return server
586        # try all servers we know about, pick one at random
587        hostmap = self.get_servers()
588        servers = list(set(filter_protocol(hostmap, allowed_protocols=self._allowed_protocols)) - connected_servers)
589        random.shuffle(servers)
590        for server in servers:
591            if not self._can_retry_addr(server, now=now):
592                continue
593            return server
594        return None
595
596    def _set_proxy(self, proxy: Optional[dict]):
597        self.proxy = proxy
598        dns_hacks.configure_dns_depending_on_proxy(bool(proxy))
599        self.logger.info(f'setting proxy {proxy}')
600        util.trigger_callback('proxy_set', self.proxy)
601
602    @log_exceptions
603    async def set_parameters(self, net_params: NetworkParameters):
604        proxy = net_params.proxy
605        proxy_str = serialize_proxy(proxy)
606        server = net_params.server
607        # sanitize parameters
608        try:
609            if proxy:
610                proxy_modes.index(proxy['mode']) + 1
611                int(proxy['port'])
612        except:
613            return
614        self.config.set_key('auto_connect', net_params.auto_connect, False)
615        self.config.set_key('oneserver', net_params.oneserver, False)
616        self.config.set_key('proxy', proxy_str, False)
617        self.config.set_key('server', str(server), True)
618        # abort if changes were not allowed by config
619        if self.config.get('server') != str(server) \
620                or self.config.get('proxy') != proxy_str \
621                or self.config.get('oneserver') != net_params.oneserver:
622            return
623
624        async with self.restart_lock:
625            self.auto_connect = net_params.auto_connect
626            if self.proxy != proxy or self.oneserver != net_params.oneserver:
627                # Restart the network defaulting to the given server
628                await self.stop(full_shutdown=False)
629                self.default_server = server
630                await self._start()
631            elif self.default_server != server:
632                await self.switch_to_interface(server)
633            else:
634                await self.switch_lagging_interface()
635        util.trigger_callback('network_updated')
636
637    def _maybe_set_oneserver(self) -> None:
638        oneserver = bool(self.config.get('oneserver', False))
639        self.oneserver = oneserver
640        self.num_server = NUM_TARGET_CONNECTED_SERVERS if not oneserver else 0
641
642    async def _switch_to_random_interface(self):
643        '''Switch to a random connected server other than the current one'''
644        servers = self.get_interfaces()    # Those in connected state
645        if self.default_server in servers:
646            servers.remove(self.default_server)
647        if servers:
648            await self.switch_to_interface(random.choice(servers))
649
650    async def switch_lagging_interface(self):
651        """If auto_connect and lagging, switch interface (only within fork)."""
652        if self.auto_connect and await self._server_is_lagging():
653            # switch to one that has the correct header (not height)
654            best_header = self.blockchain().header_at_tip()
655            with self.interfaces_lock: interfaces = list(self.interfaces.values())
656            filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces))
657            if filtered:
658                chosen_iface = random.choice(filtered)
659                await self.switch_to_interface(chosen_iface.server)
660
661    async def switch_unwanted_fork_interface(self) -> None:
662        """If auto_connect, maybe switch to another fork/chain."""
663        if not self.auto_connect or not self.interface:
664            return
665        with self.interfaces_lock: interfaces = list(self.interfaces.values())
666        pref_height = self._blockchain_preferred_block['height']
667        pref_hash   = self._blockchain_preferred_block['hash']
668        # shortcut for common case
669        if pref_height == 0:
670            return
671        # maybe try switching chains; starting with most desirable first
672        matching_chains = blockchain.get_chains_that_contain_header(pref_height, pref_hash)
673        chains_to_try = list(matching_chains) + [blockchain.get_best_chain()]
674        for rank, chain in enumerate(chains_to_try):
675            # check if main interface is already on this fork
676            if self.interface.blockchain == chain:
677                return
678            # switch to another random interface that is on this fork, if any
679            filtered = [iface for iface in interfaces
680                        if iface.blockchain == chain]
681            if filtered:
682                self.logger.info(f"switching to (more) preferred fork (rank {rank})")
683                chosen_iface = random.choice(filtered)
684                await self.switch_to_interface(chosen_iface.server)
685                return
686        self.logger.info("tried to switch to (more) preferred fork but no interfaces are on any")
687
688    async def switch_to_interface(self, server: ServerAddr):
689        """Switch to server as our main interface. If no connection exists,
690        queue interface to be started. The actual switch will
691        happen when the interface becomes ready.
692        """
693        self.default_server = server
694        old_interface = self.interface
695        old_server = old_interface.server if old_interface else None
696
697        # Stop any current interface in order to terminate subscriptions,
698        # and to cancel tasks in interface.taskgroup.
699        if old_server and old_server != server:
700            # don't wait for old_interface to close as that might be slow:
701            await self.taskgroup.spawn(self._close_interface(old_interface))
702
703        if server not in self.interfaces:
704            self.interface = None
705            await self.taskgroup.spawn(self._run_new_interface(server))
706            return
707
708        i = self.interfaces[server]
709        if old_interface != i:
710            self.logger.info(f"switching to {server}")
711            assert i.ready.done(), "interface we are switching to is not ready yet"
712            blockchain_updated = i.blockchain != self.blockchain()
713            self.interface = i
714            await i.taskgroup.spawn(self._request_server_info(i))
715            util.trigger_callback('default_server_changed')
716            self.default_server_changed_event.set()
717            self.default_server_changed_event.clear()
718            self._set_status('connected')
719            util.trigger_callback('network_updated')
720            if blockchain_updated:
721                util.trigger_callback('blockchain_updated')
722
723    async def _close_interface(self, interface: Optional[Interface]):
724        if not interface:
725            return
726        if interface.server in self._closing_ifaces:
727            return
728        self._closing_ifaces.add(interface.server)
729        with self.interfaces_lock:
730            if self.interfaces.get(interface.server) == interface:
731                self.interfaces.pop(interface.server)
732        if interface == self.interface:
733            self.interface = None
734        try:
735            # this can take some time if server/connection is slow:
736            await interface.close()
737            await interface.got_disconnected.wait()
738        finally:
739            self._closing_ifaces.discard(interface.server)
740
741    @with_recent_servers_lock
742    def _add_recent_server(self, server: ServerAddr) -> None:
743        self._on_connection_successfully_established(server)
744        # list is ordered
745        if server in self._recent_servers:
746            self._recent_servers.remove(server)
747        self._recent_servers.insert(0, server)
748        self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS]
749        self._save_recent_servers()
750
751    async def connection_down(self, interface: Interface):
752        '''A connection to server either went down, or was never made.
753        We distinguish by whether it is in self.interfaces.'''
754        if not interface: return
755        if interface.server == self.default_server:
756            self._set_status('disconnected')
757        await self._close_interface(interface)
758        util.trigger_callback('network_updated')
759
760    def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int:
761        if self.oneserver and not self.auto_connect:
762            return request_type.MOST_RELAXED
763        if self.proxy:
764            return request_type.RELAXED
765        return request_type.NORMAL
766
767    @ignore_exceptions  # do not kill outer taskgroup
768    @log_exceptions
769    async def _run_new_interface(self, server: ServerAddr):
770        if (server in self.interfaces
771                or server in self._connecting_ifaces
772                or server in self._closing_ifaces):
773            return
774        self._connecting_ifaces.add(server)
775        if server == self.default_server:
776            self.logger.info(f"connecting to {server} as new interface")
777            self._set_status('connecting')
778        self._trying_addr_now(server)
779
780        interface = Interface(network=self, server=server, proxy=self.proxy)
781        # note: using longer timeouts here as DNS can sometimes be slow!
782        timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
783        try:
784            await asyncio.wait_for(interface.ready, timeout)
785        except BaseException as e:
786            self.logger.info(f"couldn't launch iface {server} -- {repr(e)}")
787            await interface.close()
788            return
789        else:
790            with self.interfaces_lock:
791                assert server not in self.interfaces
792                self.interfaces[server] = interface
793        finally:
794            self._connecting_ifaces.discard(server)
795
796        if server == self.default_server:
797            await self.switch_to_interface(server)
798
799        self._has_ever_managed_to_connect_to_server = True
800        self._add_recent_server(server)
801        util.trigger_callback('network_updated')
802
803    def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool:
804        # main interface is exempt. this makes switching servers easier
805        if iface_to_check.is_main_server():
806            return True
807        if not iface_to_check.bucket_based_on_ipaddress():
808            return True
809        # bucket connected interfaces
810        with self.interfaces_lock:
811            interfaces = list(self.interfaces.values())
812        if iface_to_check in interfaces:
813            interfaces.remove(iface_to_check)
814        buckets = defaultdict(list)
815        for iface in interfaces:
816            buckets[iface.bucket_based_on_ipaddress()].append(iface)
817        # check proposed server against buckets
818        onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS]
819        if iface_to_check.is_tor():
820            # keep number of onion servers below half of all connected servers
821            if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2:
822                return False
823        else:
824            bucket = iface_to_check.bucket_based_on_ipaddress()
825            if len(buckets[bucket]) > 0:
826                return False
827        return True
828
829    def best_effort_reliable(func):
830        @functools.wraps(func)
831        async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
832            for i in range(10):
833                iface = self.interface
834                # retry until there is a main interface
835                if not iface:
836                    async with ignore_after(1):
837                        await self.default_server_changed_event.wait()
838                    continue  # try again
839                assert iface.ready.done(), "interface not ready yet"
840                # try actual request
841                try:
842                    async with TaskGroup(wait=any) as group:
843                        task = await group.spawn(func(self, *args, **kwargs))
844                        await group.spawn(iface.got_disconnected.wait())
845                except RequestTimedOut:
846                    await iface.close()
847                    await iface.got_disconnected.wait()
848                    continue  # try again
849                except RequestCorrupted as e:
850                    # TODO ban server?
851                    iface.logger.exception(f"RequestCorrupted: {e}")
852                    await iface.close()
853                    await iface.got_disconnected.wait()
854                    continue  # try again
855                if task.done() and not task.cancelled():
856                    return task.result()
857                # otherwise; try again
858            raise BestEffortRequestFailed('no interface to do request on... gave up.')
859        return make_reliable_wrapper
860
861    def catch_server_exceptions(func):
862        @functools.wraps(func)
863        async def wrapper(self, *args, **kwargs):
864            try:
865                return await func(self, *args, **kwargs)
866            except aiorpcx.jsonrpc.CodeMessageError as e:
867                raise UntrustedServerReturnedError(original_exception=e) from e
868        return wrapper
869
870    @best_effort_reliable
871    @catch_server_exceptions
872    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
873        return await self.interface.get_merkle_for_transaction(tx_hash=tx_hash, tx_height=tx_height)
874
875    @best_effort_reliable
876    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
877        if timeout is None:
878            timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
879        try:
880            out = await self.interface.session.send_request('blockchain.transaction.broadcast', [tx.serialize()], timeout=timeout)
881            # note: both 'out' and exception messages are untrusted input from the server
882        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
883            raise  # pass-through
884        except aiorpcx.jsonrpc.CodeMessageError as e:
885            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {repr(e)}")
886            raise TxBroadcastServerReturnedError(self.sanitize_tx_broadcast_response(e.message)) from e
887        except BaseException as e:  # intentional BaseException for sanity!
888            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {repr(e)}")
889            send_exception_to_crash_reporter(e)
890            raise TxBroadcastUnknownError() from e
891        if out != tx.txid():
892            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: {out} != {tx.txid()}")
893            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
894
895    async def try_broadcasting(self, tx, name):
896        try:
897            await self.broadcast_transaction(tx)
898        except Exception as e:
899            self.logger.info(f'error: could not broadcast {name} {tx.txid()}, {str(e)}')
900        else:
901            self.logger.info(f'success: broadcasting {name} {tx.txid()}')
902
903    @staticmethod
904    def sanitize_tx_broadcast_response(server_msg) -> str:
905        # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
906        # So, we use substring matching to grok the error message.
907        # server_msg is untrusted input so it should not be shown to the user. see #4968
908        server_msg = str(server_msg)
909        server_msg = server_msg.replace("\n", r"\n")
910        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
911        # grep "reason ="
912        policy_error_messages = {
913            r"version": _("Transaction uses non-standard version."),
914            r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
915            r"scriptsig-size": None,
916            r"scriptsig-not-pushonly": None,
917            r"scriptpubkey":
918                ("scriptpubkey\n" +
919                 _("Some of the outputs pay to a non-standard script.")),
920            r"bare-multisig": None,
921            r"dust":
922                (_("Transaction could not be broadcast due to dust outputs.\n"
923                   "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
924                   "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
925            r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
926        }
927        for substring in policy_error_messages:
928            if substring in server_msg:
929                msg = policy_error_messages[substring]
930                return msg if msg else substring
931        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
932        script_error_messages = {
933            r"Script evaluated without error but finished with a false/empty top stack element",
934            r"Script failed an OP_VERIFY operation",
935            r"Script failed an OP_EQUALVERIFY operation",
936            r"Script failed an OP_CHECKMULTISIGVERIFY operation",
937            r"Script failed an OP_CHECKSIGVERIFY operation",
938            r"Script failed an OP_NUMEQUALVERIFY operation",
939            r"Script is too big",
940            r"Push value size limit exceeded",
941            r"Operation limit exceeded",
942            r"Stack size limit exceeded",
943            r"Signature count negative or greater than pubkey count",
944            r"Pubkey count negative or limit exceeded",
945            r"Opcode missing or not understood",
946            r"Attempted to use a disabled opcode",
947            r"Operation not valid with the current stack size",
948            r"Operation not valid with the current altstack size",
949            r"OP_RETURN was encountered",
950            r"Invalid OP_IF construction",
951            r"Negative locktime",
952            r"Locktime requirement not satisfied",
953            r"Signature hash type missing or not understood",
954            r"Non-canonical DER signature",
955            r"Data push larger than necessary",
956            r"Only push operators allowed in signatures",
957            r"Non-canonical signature: S value is unnecessarily high",
958            r"Dummy CHECKMULTISIG argument must be zero",
959            r"OP_IF/NOTIF argument must be minimal",
960            r"Signature must be zero for failed CHECK(MULTI)SIG operation",
961            r"NOPx reserved for soft-fork upgrades",
962            r"Witness version reserved for soft-fork upgrades",
963            r"Taproot version reserved for soft-fork upgrades",
964            r"OP_SUCCESSx reserved for soft-fork upgrades",
965            r"Public key version reserved for soft-fork upgrades",
966            r"Public key is neither compressed or uncompressed",
967            r"Stack size must be exactly one after execution",
968            r"Extra items left on stack after execution",
969            r"Witness program has incorrect length",
970            r"Witness program was passed an empty witness",
971            r"Witness program hash mismatch",
972            r"Witness requires empty scriptSig",
973            r"Witness requires only-redeemscript scriptSig",
974            r"Witness provided for non-witness script",
975            r"Using non-compressed keys in segwit",
976            r"Invalid Schnorr signature size",
977            r"Invalid Schnorr signature hash type",
978            r"Invalid Schnorr signature",
979            r"Invalid Taproot control block size",
980            r"Too much signature validation relative to witness weight",
981            r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
982            r"OP_IF/NOTIF argument must be minimal in tapscript",
983            r"Using OP_CODESEPARATOR in non-witness script",
984            r"Signature is found in scriptCode",
985        }
986        for substring in script_error_messages:
987            if substring in server_msg:
988                return substring
989        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
990        # grep "REJECT_"
991        # grep "TxValidationResult"
992        # should come after script_error.cpp (due to e.g. non-mandatory-script-verify-flag)
993        validation_error_messages = {
994            r"coinbase": None,
995            r"tx-size-small": None,
996            r"non-final": None,
997            r"txn-already-in-mempool": None,
998            r"txn-mempool-conflict": None,
999            r"txn-already-known": None,
1000            r"non-BIP68-final": None,
1001            r"bad-txns-nonstandard-inputs": None,
1002            r"bad-witness-nonstandard": None,
1003            r"bad-txns-too-many-sigops": None,
1004            r"mempool min fee not met":
1005                ("mempool min fee not met\n" +
1006                 _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1007                   "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1008                   "of transactions that all pay higher fees. Try to increase the fee.")),
1009            r"min relay fee not met": None,
1010            r"absurdly-high-fee": None,
1011            r"max-fee-exceeded": None,
1012            r"too-long-mempool-chain": None,
1013            r"bad-txns-spends-conflicting-tx": None,
1014            r"insufficient fee": ("insufficient fee\n" +
1015                 _("Your transaction is trying to replace another one in the mempool but it "
1016                   "does not meet the rules to do so. Try to increase the fee.")),
1017            r"too many potential replacements": None,
1018            r"replacement-adds-unconfirmed": None,
1019            r"mempool full": None,
1020            r"non-mandatory-script-verify-flag": None,
1021            r"mandatory-script-verify-flag-failed": None,
1022            r"Transaction check failed": None,
1023        }
1024        for substring in validation_error_messages:
1025            if substring in server_msg:
1026                msg = validation_error_messages[substring]
1027                return msg if msg else substring
1028        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1029        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1030        # grep "RPC_TRANSACTION"
1031        # grep "RPC_DESERIALIZATION_ERROR"
1032        rawtransaction_error_messages = {
1033            r"Missing inputs": None,
1034            r"Inputs missing or spent": None,
1035            r"transaction already in block chain": None,
1036            r"Transaction already in block chain": None,
1037            r"TX decode failed": None,
1038            r"Peer-to-peer functionality missing or disabled": None,
1039            r"Transaction rejected by AcceptToMemoryPool": None,
1040            r"AcceptToMemoryPool failed": None,
1041            r"Fee exceeds maximum configured by user": None,
1042        }
1043        for substring in rawtransaction_error_messages:
1044            if substring in server_msg:
1045                msg = rawtransaction_error_messages[substring]
1046                return msg if msg else substring
1047        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1048        # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1049        # grep "REJECT_"
1050        # grep "TxValidationResult"
1051        tx_verify_error_messages = {
1052            r"bad-txns-vin-empty": None,
1053            r"bad-txns-vout-empty": None,
1054            r"bad-txns-oversize": None,
1055            r"bad-txns-vout-negative": None,
1056            r"bad-txns-vout-toolarge": None,
1057            r"bad-txns-txouttotal-toolarge": None,
1058            r"bad-txns-inputs-duplicate": None,
1059            r"bad-cb-length": None,
1060            r"bad-txns-prevout-null": None,
1061            r"bad-txns-inputs-missingorspent":
1062                ("bad-txns-inputs-missingorspent\n" +
1063                 _("You might have a local transaction in your wallet that this transaction "
1064                   "builds on top. You need to either broadcast or remove the local tx.")),
1065            r"bad-txns-premature-spend-of-coinbase": None,
1066            r"bad-txns-inputvalues-outofrange": None,
1067            r"bad-txns-in-belowout": None,
1068            r"bad-txns-fee-outofrange": None,
1069        }
1070        for substring in tx_verify_error_messages:
1071            if substring in server_msg:
1072                msg = tx_verify_error_messages[substring]
1073                return msg if msg else substring
1074        # otherwise:
1075        return _("Unknown error")
1076
1077    @best_effort_reliable
1078    @catch_server_exceptions
1079    async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
1080        return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early)
1081
1082    @best_effort_reliable
1083    @catch_server_exceptions
1084    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
1085        return await self.interface.get_transaction(tx_hash=tx_hash, timeout=timeout)
1086
1087    @best_effort_reliable
1088    @catch_server_exceptions
1089    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
1090        return await self.interface.get_history_for_scripthash(sh)
1091
1092    @best_effort_reliable
1093    @catch_server_exceptions
1094    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
1095        return await self.interface.listunspent_for_scripthash(sh)
1096
1097    @best_effort_reliable
1098    @catch_server_exceptions
1099    async def get_balance_for_scripthash(self, sh: str) -> dict:
1100        return await self.interface.get_balance_for_scripthash(sh)
1101
1102    @best_effort_reliable
1103    @catch_server_exceptions
1104    async def get_txid_from_txpos(self, tx_height, tx_pos, merkle):
1105        return await self.interface.get_txid_from_txpos(tx_height, tx_pos, merkle)
1106
1107    def blockchain(self) -> Blockchain:
1108        interface = self.interface
1109        if interface and interface.blockchain is not None:
1110            self._blockchain = interface.blockchain
1111        return self._blockchain
1112
1113    def get_blockchains(self):
1114        out = {}  # blockchain_id -> list(interfaces)
1115        with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items())
1116        with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
1117        for chain_id, bc in blockchain_items:
1118            r = list(filter(lambda i: i.blockchain==bc, interfaces_values))
1119            if r:
1120                out[chain_id] = r
1121        return out
1122
1123    def _set_preferred_chain(self, chain: Optional[Blockchain]):
1124        if chain:
1125            height = chain.get_max_forkpoint()
1126            header_hash = chain.get_hash(height)
1127        else:
1128            height = 0
1129            header_hash = constants.net.GENESIS
1130        self._blockchain_preferred_block = {
1131            'height': height,
1132            'hash': header_hash,
1133        }
1134        self.config.set_key('blockchain_preferred_block', self._blockchain_preferred_block)
1135
1136    async def follow_chain_given_id(self, chain_id: str) -> None:
1137        bc = blockchain.blockchains.get(chain_id)
1138        if not bc:
1139            raise Exception('blockchain {} not found'.format(chain_id))
1140        self._set_preferred_chain(bc)
1141        # select server on this chain
1142        with self.interfaces_lock: interfaces = list(self.interfaces.values())
1143        interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces))
1144        if len(interfaces_on_selected_chain) == 0: return
1145        chosen_iface = random.choice(interfaces_on_selected_chain)  # type: Interface
1146        # switch to server (and save to config)
1147        net_params = self.get_parameters()
1148        net_params = net_params._replace(server=chosen_iface.server)
1149        await self.set_parameters(net_params)
1150
1151    async def follow_chain_given_server(self, server: ServerAddr) -> None:
1152        # note that server_str should correspond to a connected interface
1153        iface = self.interfaces.get(server)
1154        if iface is None:
1155            return
1156        self._set_preferred_chain(iface.blockchain)
1157        # switch to server (and save to config)
1158        net_params = self.get_parameters()
1159        net_params = net_params._replace(server=server)
1160        await self.set_parameters(net_params)
1161
1162    def get_server_height(self) -> int:
1163        """Length of header chain, as claimed by main interface."""
1164        interface = self.interface
1165        return interface.tip if interface else 0
1166
1167    def get_local_height(self):
1168        """Length of header chain, POW-verified.
1169        In case of a chain split, this is for the branch the main interface is on,
1170        but it is the tip of that branch (even if main interface is behind).
1171        """
1172        return self.blockchain().height()
1173
1174    def export_checkpoints(self, path):
1175        """Run manually to generate blockchain checkpoints.
1176        Kept for console use only.
1177        """
1178        cp = self.blockchain().get_checkpoints()
1179        with open(path, 'w', encoding='utf-8') as f:
1180            f.write(json.dumps(cp, indent=4))
1181
1182    async def _start(self):
1183        assert not self.taskgroup
1184        self.taskgroup = taskgroup = SilentTaskGroup()
1185        assert not self.interface and not self.interfaces
1186        assert not self._connecting_ifaces
1187        assert not self._closing_ifaces
1188        self.logger.info('starting network')
1189        self._clear_addr_retry_times()
1190        self._set_proxy(deserialize_proxy(self.config.get('proxy')))
1191        self._maybe_set_oneserver()
1192        await self.taskgroup.spawn(self._run_new_interface(self.default_server))
1193
1194        async def main():
1195            self.logger.info("starting taskgroup.")
1196            try:
1197                # note: if a task finishes with CancelledError, that
1198                # will NOT raise, and the group will keep the other tasks running
1199                async with taskgroup as group:
1200                    await group.spawn(self._maintain_sessions())
1201                    [await group.spawn(job) for job in self._jobs]
1202            except asyncio.CancelledError:
1203                raise
1204            except Exception as e:
1205                self.logger.exception("taskgroup died.")
1206            finally:
1207                self.logger.info("taskgroup stopped.")
1208        asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
1209
1210        util.trigger_callback('network_updated')
1211
1212    def start(self, jobs: Iterable = None):
1213        """Schedule starting the network, along with the given job co-routines.
1214
1215        Note: the jobs will *restart* every time the network restarts, e.g. on proxy
1216        setting changes.
1217        """
1218        self._jobs = jobs or []
1219        asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop)
1220
1221    @log_exceptions
1222    async def stop(self, *, full_shutdown: bool = True):
1223        self.logger.info("stopping network")
1224        # timeout: if full_shutdown, it is up to the caller to time us out,
1225        #          otherwise if e.g. restarting due to proxy changes, we time out fast
1226        async with (nullcontext() if full_shutdown else ignore_after(1)):
1227            async with TaskGroup() as group:
1228                await group.spawn(self.taskgroup.cancel_remaining())
1229                if full_shutdown:
1230                    await group.spawn(self.stop_gossip(full_shutdown=full_shutdown))
1231        self.taskgroup = None
1232        self.interface = None
1233        self.interfaces = {}
1234        self._connecting_ifaces.clear()
1235        self._closing_ifaces.clear()
1236        if not full_shutdown:
1237            util.trigger_callback('network_updated')
1238
1239    async def _ensure_there_is_a_main_interface(self):
1240        if self.is_connected():
1241            return
1242        # if auto_connect is set, try a different server
1243        if self.auto_connect and not self.is_connecting():
1244            await self._switch_to_random_interface()
1245        # if auto_connect is not set, or still no main interface, retry current
1246        if not self.is_connected() and not self.is_connecting():
1247            if self._can_retry_addr(self.default_server, urgent=True):
1248                await self.switch_to_interface(self.default_server)
1249
1250    async def _maintain_sessions(self):
1251        async def maybe_start_new_interfaces():
1252            num_existing_ifaces = len(self.interfaces) + len(self._connecting_ifaces) + len(self._closing_ifaces)
1253            for i in range(self.num_server - num_existing_ifaces):
1254                # FIXME this should try to honour "healthy spread of connected servers"
1255                server = self._get_next_server_to_try()
1256                if server:
1257                    await self.taskgroup.spawn(self._run_new_interface(server))
1258        async def maintain_healthy_spread_of_connected_servers():
1259            with self.interfaces_lock: interfaces = list(self.interfaces.values())
1260            random.shuffle(interfaces)
1261            for iface in interfaces:
1262                if not self.check_interface_against_healthy_spread_of_connected_servers(iface):
1263                    self.logger.info(f"disconnecting from {iface.server}. too many connected "
1264                                     f"servers already in bucket {iface.bucket_based_on_ipaddress()}")
1265                    await self._close_interface(iface)
1266        async def maintain_main_interface():
1267            await self._ensure_there_is_a_main_interface()
1268            if self.is_connected():
1269                if self.config.is_fee_estimates_update_required():
1270                    await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface)
1271
1272        while True:
1273            try:
1274                await maybe_start_new_interfaces()
1275                await maintain_healthy_spread_of_connected_servers()
1276                await maintain_main_interface()
1277            except asyncio.CancelledError:
1278                # suppress spurious cancellations
1279                group = self.taskgroup
1280                if not group or group.closed():
1281                    raise
1282            await asyncio.sleep(0.1)
1283
1284    @classmethod
1285    async def _send_http_on_proxy(cls, method: str, url: str, params: str = None,
1286                                  body: bytes = None, json: dict = None, headers=None,
1287                                  on_finish=None, timeout=None):
1288        async def default_on_finish(resp: ClientResponse):
1289            resp.raise_for_status()
1290            return await resp.text()
1291        if headers is None:
1292            headers = {}
1293        if on_finish is None:
1294            on_finish = default_on_finish
1295        network = cls.get_instance()
1296        proxy = network.proxy if network else None
1297        async with make_aiohttp_session(proxy, timeout=timeout) as session:
1298            if method == 'get':
1299                async with session.get(url, params=params, headers=headers) as resp:
1300                    return await on_finish(resp)
1301            elif method == 'post':
1302                assert body is not None or json is not None, 'body or json must be supplied if method is post'
1303                if body is not None:
1304                    async with session.post(url, data=body, headers=headers) as resp:
1305                        return await on_finish(resp)
1306                elif json is not None:
1307                    async with session.post(url, json=json, headers=headers) as resp:
1308                        return await on_finish(resp)
1309            else:
1310                assert False
1311
1312    @classmethod
1313    def send_http_on_proxy(cls, method, url, **kwargs):
1314        network = cls.get_instance()
1315        if network:
1316            assert util.get_running_loop() != network.asyncio_loop
1317            loop = network.asyncio_loop
1318        else:
1319            loop = asyncio.get_event_loop()
1320        coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop)
1321        # note: _send_http_on_proxy has its own timeout, so no timeout here:
1322        return coro.result()
1323
1324    # methods used in scripts
1325    async def get_peers(self):
1326        while not self.is_connected():
1327            await asyncio.sleep(1)
1328        session = self.interface.session
1329        return parse_servers(await session.send_request('server.peers.subscribe'))
1330
1331    async def send_multiple_requests(self, servers: Sequence[ServerAddr], method: str, params: Sequence):
1332        responses = dict()
1333        async def get_response(server: ServerAddr):
1334            interface = Interface(network=self, server=server, proxy=self.proxy)
1335            timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
1336            try:
1337                await asyncio.wait_for(interface.ready, timeout)
1338            except BaseException as e:
1339                await interface.close()
1340                return
1341            try:
1342                res = await interface.session.send_request(method, params, timeout=10)
1343            except Exception as e:
1344                res = e
1345            responses[interface.server] = res
1346        async with TaskGroup() as group:
1347            for server in servers:
1348                await group.spawn(get_response(server))
1349        return responses
1350