1# Copyright (C) 2018 The Electrum developers
2# Distributed under the MIT software license, see the accompanying
3# file LICENCE or http://www.opensource.org/licenses/mit-license.php
4
5import asyncio
6import os
7from decimal import Decimal
8import random
9import time
10from typing import (Optional, Sequence, Tuple, List, Set, Dict, TYPE_CHECKING,
11                    NamedTuple, Union, Mapping, Any, Iterable, AsyncGenerator)
12import threading
13import socket
14import aiohttp
15import json
16from datetime import datetime, timezone
17from functools import partial
18from collections import defaultdict
19import concurrent
20from concurrent import futures
21import urllib.parse
22
23import dns.resolver
24import dns.exception
25from aiorpcx import run_in_thread, TaskGroup, NetAddress, ignore_after
26
27from . import constants, util
28from . import keystore
29from .util import profiler, chunks
30from .invoices import PR_TYPE_LN, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LNInvoice, LN_EXPIRY_NEVER
31from .util import NetworkRetryManager, JsonRPCClient
32from .lnutil import LN_MAX_FUNDING_SAT
33from .keystore import BIP32_KeyStore
34from .bitcoin import COIN
35from .bitcoin import opcodes, make_op_return, address_to_scripthash
36from .transaction import Transaction
37from .transaction import get_script_type_from_output_script
38from .crypto import sha256
39from .bip32 import BIP32Node
40from .util import bh2u, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions
41from .crypto import chacha20_encrypt, chacha20_decrypt
42from .util import ignore_exceptions, make_aiohttp_session, SilentTaskGroup
43from .util import timestamp_to_datetime, random_shuffled_copy
44from .util import MyEncoder, is_private_netaddress
45from .logging import Logger
46from .lntransport import LNTransport, LNResponderTransport, LNTransportBase
47from .lnpeer import Peer, LN_P2P_NETWORK_TIMEOUT
48from .lnaddr import lnencode, LnAddr, lndecode
49from .ecc import der_sig_from_sig_string
50from .lnchannel import Channel, AbstractChannel
51from .lnchannel import ChannelState, PeerState, HTLCWithStatus
52from .lnrater import LNRater
53from . import lnutil
54from .lnutil import funding_output_script
55from .bitcoin import redeem_script_to_address
56from .lnutil import (Outpoint, LNPeerAddr,
57                     get_compressed_pubkey_from_bech32, extract_nodeid,
58                     PaymentFailure, split_host_port, ConnStringFormatError,
59                     generate_keypair, LnKeyFamily, LOCAL, REMOTE,
60                     MIN_FINAL_CLTV_EXPIRY_FOR_INVOICE,
61                     NUM_MAX_EDGES_IN_PAYMENT_PATH, SENT, RECEIVED, HTLCOwner,
62                     UpdateAddHtlc, Direction, LnFeatures, ShortChannelID,
63                     HtlcLog, derive_payment_secret_from_payment_preimage,
64                     NoPathFound, InvalidGossipMsg)
65from .lnutil import ln_dummy_address, ln_compare_features, IncompatibleLightningFeatures
66from .transaction import PartialTxOutput, PartialTransaction, PartialTxInput
67from .lnonion import OnionFailureCode, OnionRoutingFailure
68from .lnmsg import decode_msg
69from .i18n import _
70from .lnrouter import (RouteEdge, LNPaymentRoute, LNPaymentPath, is_route_sane_to_use,
71                       NoChannelPolicy, LNPathInconsistent)
72from .address_synchronizer import TX_HEIGHT_LOCAL
73from . import lnsweep
74from .lnwatcher import LNWalletWatcher
75from .crypto import pw_encode_with_version_and_mac, pw_decode_with_version_and_mac
76from .lnutil import ImportedChannelBackupStorage, OnchainChannelBackupStorage
77from .lnchannel import ChannelBackup
78from .channel_db import UpdateStatus
79from .channel_db import get_mychannel_info, get_mychannel_policy
80from .submarine_swaps import SwapManager
81from .channel_db import ChannelInfo, Policy
82from .mpp_split import suggest_splits
83from .trampoline import create_trampoline_route_and_onion, TRAMPOLINE_FEES
84
85if TYPE_CHECKING:
86    from .network import Network
87    from .wallet import Abstract_Wallet
88    from .channel_db import ChannelDB
89    from .simple_config import SimpleConfig
90
91
92SAVED_PR_STATUS = [PR_PAID, PR_UNPAID] # status that are persisted
93
94
95NUM_PEERS_TARGET = 4
96
97# onchain channel backup data
98CB_VERSION = 0
99CB_MAGIC_BYTES = bytes([0, 0, 0, CB_VERSION])
100
101
102FALLBACK_NODE_LIST_TESTNET = (
103    LNPeerAddr(host='203.132.95.10', port=9735, pubkey=bfh('038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9')),
104    LNPeerAddr(host='2401:d002:4402:0:bf1d:986a:7598:6d49', port=9735, pubkey=bfh('038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9')),
105    LNPeerAddr(host='50.116.3.223', port=9734, pubkey=bfh('03236a685d30096b26692dce0cf0fa7c8528bdf61dbf5363a3ef6d5c92733a3016')),
106    LNPeerAddr(host='3.16.119.191', port=9735, pubkey=bfh('03d5e17a3c213fe490e1b0c389f8cfcfcea08a29717d50a9f453735e0ab2a7c003')),
107    LNPeerAddr(host='34.250.234.192', port=9735, pubkey=bfh('03933884aaf1d6b108397e5efe5c86bcf2d8ca8d2f700eda99db9214fc2712b134')),
108    LNPeerAddr(host='88.99.209.230', port=9735, pubkey=bfh('0260d9119979caedc570ada883ff614c6efb93f7f7382e25d73ecbeba0b62df2d7')),
109    LNPeerAddr(host='160.16.233.215', port=9735, pubkey=bfh('023ea0a53af875580899da0ab0a21455d9c19160c4ea1b7774c9d4be6810b02d2c')),
110    LNPeerAddr(host='197.155.6.173', port=9735, pubkey=bfh('0269a94e8b32c005e4336bfb743c08a6e9beb13d940d57c479d95c8e687ccbdb9f')),
111    LNPeerAddr(host='2c0f:fb18:406::4', port=9735, pubkey=bfh('0269a94e8b32c005e4336bfb743c08a6e9beb13d940d57c479d95c8e687ccbdb9f')),
112    LNPeerAddr(host='163.172.94.64', port=9735, pubkey=bfh('030f0bf260acdbd3edcad84d7588ec7c5df4711e87e6a23016f989b8d3a4147230')),
113    LNPeerAddr(host='23.237.77.12', port=9735, pubkey=bfh('02312627fdf07fbdd7e5ddb136611bdde9b00d26821d14d94891395452f67af248')),
114    LNPeerAddr(host='197.155.6.172', port=9735, pubkey=bfh('02ae2f22b02375e3e9b4b4a2db4f12e1b50752b4062dbefd6e01332acdaf680379')),
115    LNPeerAddr(host='2c0f:fb18:406::3', port=9735, pubkey=bfh('02ae2f22b02375e3e9b4b4a2db4f12e1b50752b4062dbefd6e01332acdaf680379')),
116    LNPeerAddr(host='23.239.23.44', port=9740, pubkey=bfh('034fe52e98a0e9d3c21b767e1b371881265d8c7578c21f5afd6d6438da10348b36')),
117    LNPeerAddr(host='2600:3c01::f03c:91ff:fe05:349c', port=9740, pubkey=bfh('034fe52e98a0e9d3c21b767e1b371881265d8c7578c21f5afd6d6438da10348b36')),
118)
119
120FALLBACK_NODE_LIST_MAINNET = [
121    LNPeerAddr(host='172.81.181.3', port=9735, pubkey=bfh('0214382bdce7750dfcb8126df8e2b12de38536902dc36abcebdaeefdeca1df8284')),
122    LNPeerAddr(host='35.230.100.60', port=9735, pubkey=bfh('023f5e3582716bed96f6f26cfcd8037e07474d7b4743afdc8b07e692df63464d7e')),
123    LNPeerAddr(host='40.69.71.114', port=9735, pubkey=bfh('028303182c9885da93b3b25c9621d22cf34475e63c123942e402ab530c0556e675')),
124    LNPeerAddr(host='94.177.171.73', port=9735, pubkey=bfh('0276e09a267592e7451a939c932cf685f0754de382a3ca85d2fb3a864d4c365ad5')),
125    LNPeerAddr(host='34.236.113.58', port=9735, pubkey=bfh('02fa50c72ee1e2eb5f1b6d9c3032080c4c864373c4201dfa2966aa34eee1051f97')),
126    LNPeerAddr(host='52.50.244.44', port=9735, pubkey=bfh('030c3f19d742ca294a55c00376b3b355c3c90d61c6b6b39554dbc7ac19b141c14f')),
127    LNPeerAddr(host='157.245.68.47', port=9735, pubkey=bfh('03c2abfa93eacec04721c019644584424aab2ba4dff3ac9bdab4e9c97007491dda')),
128    LNPeerAddr(host='18.221.23.28', port=9735, pubkey=bfh('03abf6f44c355dec0d5aa155bdbdd6e0c8fefe318eff402de65c6eb2e1be55dc3e')),
129    LNPeerAddr(host='52.224.178.244', port=9735, pubkey=bfh('026b105ac13212c48714c6be9b11577a9ce10f10e1c88a45ce217e6331209faf8b')),
130    LNPeerAddr(host='34.239.230.56', port=9735, pubkey=bfh('03864ef025fde8fb587d989186ce6a4a186895ee44a926bfc370e2c366597a3f8f')),
131    LNPeerAddr(host='46.229.165.136', port=9735, pubkey=bfh('0390b5d4492dc2f5318e5233ab2cebf6d48914881a33ef6a9c6bcdbb433ad986d0')),
132    LNPeerAddr(host='157.230.28.160', port=9735, pubkey=bfh('0279c22ed7a068d10dc1a38ae66d2d6461e269226c60258c021b1ddcdfe4b00bc4')),
133    LNPeerAddr(host='74.108.13.152', port=9735, pubkey=bfh('0331f80652fb840239df8dc99205792bba2e559a05469915804c08420230e23c7c')),
134    LNPeerAddr(host='167.172.44.148', port=9735, pubkey=bfh('0395033b252c6f40e3756984162d68174e2bd8060a129c0d3462a9370471c6d28f')),
135    LNPeerAddr(host='138.68.14.104', port=9735, pubkey=bfh('03bb88ccc444534da7b5b64b4f7b15e1eccb18e102db0e400d4b9cfe93763aa26d')),
136    LNPeerAddr(host='3.124.63.44', port=9735, pubkey=bfh('0242a4ae0c5bef18048fbecf995094b74bfb0f7391418d71ed394784373f41e4f3')),
137    LNPeerAddr(host='2001:470:8:2e1::43', port=9735, pubkey=bfh('03baa70886d9200af0ffbd3f9e18d96008331c858456b16e3a9b41e735c6208fef')),
138    LNPeerAddr(host='2601:186:c100:6bcd:219:d1ff:fe75:dc2f', port=9735, pubkey=bfh('0298f6074a454a1f5345cb2a7c6f9fce206cd0bf675d177cdbf0ca7508dd28852f')),
139    LNPeerAddr(host='2001:41d0:e:734::1', port=9735, pubkey=bfh('03a503d8e30f2ff407096d235b5db63b4fcf3f89a653acb6f43d3fc492a7674019')),
140    LNPeerAddr(host='2a01:4f9:2b:2254::2', port=9735, pubkey=bfh('02f3069a342ae2883a6f29e275f06f28a56a6ea2e2d96f5888a3266444dcf542b6')),
141    LNPeerAddr(host='2a02:8070:24c1:100:528c:2997:6dbc:a054', port=9735, pubkey=bfh('02a45def9ae014fdd2603dd7033d157faa3a55a72b06a63ae22ef46d9fafdc6e8d')),
142    LNPeerAddr(host='2600:3c01::f03c:91ff:fe05:349c', port=9736, pubkey=bfh('02731b798b39a09f9f14e90ee601afb6ebb796d6e5797de14582a978770b33700f')),
143    LNPeerAddr(host='2a00:8a60:e012:a00::21', port=9735, pubkey=bfh('027ce055380348d7812d2ae7745701c9f93e70c1adeb2657f053f91df4f2843c71')),
144    LNPeerAddr(host='2604:a880:400:d1::8bd:1001', port=9735, pubkey=bfh('03649c72a4816f0cd546f84aafbd657e92a30ab474de7ab795e8b5650a427611f7')),
145    LNPeerAddr(host='2a01:4f8:c0c:7b31::1', port=9735, pubkey=bfh('02c16cca44562b590dd279c942200bdccfd4f990c3a69fad620c10ef2f8228eaff')),
146    LNPeerAddr(host='2001:41d0:1:b40d::1', port=9735, pubkey=bfh('026726a4b043d413b45b334876d17b8a98848129604429ec65532ba286a42efeac')),
147]
148
149
150from .trampoline import trampolines_by_id, hardcoded_trampoline_nodes, is_hardcoded_trampoline
151
152
153class PaymentInfo(NamedTuple):
154    payment_hash: bytes
155    amount_msat: Optional[int]
156    direction: int
157    status: int
158
159
160class ErrorAddingPeer(Exception): pass
161
162
163# set some feature flags as baseline for both LNWallet and LNGossip
164# note that e.g. DATA_LOSS_PROTECT is needed for LNGossip as many peers require it
165BASE_FEATURES = LnFeatures(0)\
166    | LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT\
167    | LnFeatures.OPTION_STATIC_REMOTEKEY_OPT\
168    | LnFeatures.VAR_ONION_OPT\
169    | LnFeatures.PAYMENT_SECRET_OPT\
170    | LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT
171
172# we do not want to receive unrequested gossip (see lnpeer.maybe_save_remote_update)
173LNWALLET_FEATURES = BASE_FEATURES\
174    | LnFeatures.OPTION_DATA_LOSS_PROTECT_REQ\
175    | LnFeatures.OPTION_STATIC_REMOTEKEY_REQ\
176    | LnFeatures.GOSSIP_QUERIES_REQ\
177    | LnFeatures.BASIC_MPP_OPT\
178    | LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT
179
180LNGOSSIP_FEATURES = BASE_FEATURES\
181    | LnFeatures.GOSSIP_QUERIES_OPT\
182    | LnFeatures.GOSSIP_QUERIES_REQ
183
184
185class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]):
186
187    INITIAL_TRAMPOLINE_FEE_LEVEL = 1 # only used for trampoline payments. set to 0 in tests.
188
189    def __init__(self, xprv, features: LnFeatures):
190        Logger.__init__(self)
191        NetworkRetryManager.__init__(
192            self,
193            max_retry_delay_normal=3600,
194            init_retry_delay_normal=600,
195            max_retry_delay_urgent=300,
196            init_retry_delay_urgent=4,
197        )
198        self.lock = threading.RLock()
199        self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY)
200        self.backup_key = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.BACKUP_CIPHER).privkey
201        self._peers = {}  # type: Dict[bytes, Peer]  # pubkey -> Peer  # needs self.lock
202        self.taskgroup = SilentTaskGroup()
203        self.listen_server = None  # type: Optional[asyncio.AbstractServer]
204        self.features = features
205        self.network = None  # type: Optional[Network]
206        self.config = None  # type: Optional[SimpleConfig]
207        self.stopping_soon = False  # whether we are being shut down
208
209        util.register_callback(self.on_proxy_changed, ['proxy_set'])
210
211    @property
212    def channel_db(self):
213        return self.network.channel_db if self.network else None
214
215    @property
216    def peers(self) -> Mapping[bytes, Peer]:
217        """Returns a read-only copy of peers."""
218        with self.lock:
219            return self._peers.copy()
220
221    def channels_for_peer(self, node_id: bytes) -> Dict[bytes, Channel]:
222        return {}
223
224    def get_node_alias(self, node_id: bytes) -> Optional[str]:
225        """Returns the alias of the node, or None if unknown."""
226        node_alias = None
227        if self.channel_db:
228            node_info = self.channel_db.get_node_info_for_node_id(node_id)
229            if node_info:
230                node_alias = node_info.alias
231        else:
232            for k, v in hardcoded_trampoline_nodes().items():
233                if v.pubkey == node_id:
234                    node_alias = k
235                    break
236        return node_alias
237
238    async def maybe_listen(self):
239        # FIXME: only one LNWorker can listen at a time (single port)
240        listen_addr = self.config.get('lightning_listen')
241        if listen_addr:
242            self.logger.info(f'lightning_listen enabled. will try to bind: {listen_addr!r}')
243            try:
244                netaddr = NetAddress.from_string(listen_addr)
245            except Exception as e:
246                self.logger.error(f"failed to parse config key 'lightning_listen'. got: {e!r}")
247                return
248            addr = str(netaddr.host)
249            async def cb(reader, writer):
250                transport = LNResponderTransport(self.node_keypair.privkey, reader, writer)
251                try:
252                    node_id = await transport.handshake()
253                except Exception as e:
254                    self.logger.info(f'handshake failure from incoming connection: {e!r}')
255                    return
256                await self._add_peer_from_transport(node_id=node_id, transport=transport)
257            try:
258                self.listen_server = await asyncio.start_server(cb, addr, netaddr.port)
259            except OSError as e:
260                self.logger.error(f"cannot listen for lightning p2p. error: {e!r}")
261
262    @ignore_exceptions  # don't kill outer taskgroup
263    async def main_loop(self):
264        self.logger.info("starting taskgroup.")
265        try:
266            async with self.taskgroup as group:
267                await group.spawn(self._maintain_connectivity())
268        except asyncio.CancelledError:
269            raise
270        except Exception as e:
271            self.logger.exception("taskgroup died.")
272        finally:
273            self.logger.info("taskgroup stopped.")
274
275    async def _maintain_connectivity(self):
276        while True:
277            await asyncio.sleep(1)
278            if self.stopping_soon:
279                return
280            now = time.time()
281            if len(self._peers) >= NUM_PEERS_TARGET:
282                continue
283            peers = await self._get_next_peers_to_try()
284            for peer in peers:
285                if self._can_retry_addr(peer, now=now):
286                    try:
287                        await self._add_peer(peer.host, peer.port, peer.pubkey)
288                    except ErrorAddingPeer as e:
289                        self.logger.info(f"failed to add peer: {peer}. exc: {e!r}")
290
291    async def _add_peer(self, host: str, port: int, node_id: bytes) -> Peer:
292        if node_id in self._peers:
293            return self._peers[node_id]
294        port = int(port)
295        peer_addr = LNPeerAddr(host, port, node_id)
296        self._trying_addr_now(peer_addr)
297        self.logger.info(f"adding peer {peer_addr}")
298        if node_id == self.node_keypair.pubkey:
299            raise ErrorAddingPeer("cannot connect to self")
300        transport = LNTransport(self.node_keypair.privkey, peer_addr,
301                                proxy=self.network.proxy)
302        peer = await self._add_peer_from_transport(node_id=node_id, transport=transport)
303        return peer
304
305    async def _add_peer_from_transport(self, *, node_id: bytes, transport: LNTransportBase) -> Peer:
306        peer = Peer(self, node_id, transport)
307        with self.lock:
308            existing_peer = self._peers.get(node_id)
309            if existing_peer:
310                existing_peer.close_and_cleanup()
311            assert node_id not in self._peers
312            self._peers[node_id] = peer
313        await self.taskgroup.spawn(peer.main_loop())
314        return peer
315
316    def peer_closed(self, peer: Peer) -> None:
317        with self.lock:
318            peer2 = self._peers.get(peer.pubkey)
319            if peer2 is peer:
320                self._peers.pop(peer.pubkey)
321
322    def num_peers(self) -> int:
323        return sum([p.is_initialized() for p in self.peers.values()])
324
325    def start_network(self, network: 'Network'):
326        assert network
327        self.network = network
328        self.config = network.config
329        self._add_peers_from_config()
330        asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
331
332    async def stop(self):
333        if self.listen_server:
334            self.listen_server.close()
335        util.unregister_callback(self.on_proxy_changed)
336        await self.taskgroup.cancel_remaining()
337
338    def _add_peers_from_config(self):
339        peer_list = self.config.get('lightning_peers', [])
340        for host, port, pubkey in peer_list:
341            asyncio.run_coroutine_threadsafe(
342                self._add_peer(host, int(port), bfh(pubkey)),
343                self.network.asyncio_loop)
344
345    def is_good_peer(self, peer: LNPeerAddr) -> bool:
346        # the purpose of this method is to filter peers that advertise the desired feature bits
347        # it is disabled for now, because feature bits published in node announcements seem to be unreliable
348        return True
349        node_id = peer.pubkey
350        node = self.channel_db._nodes.get(node_id)
351        if not node:
352            return False
353        try:
354            ln_compare_features(self.features, node.features)
355        except IncompatibleLightningFeatures:
356            return False
357        #self.logger.info(f'is_good {peer.host}')
358        return True
359
360    def on_peer_successfully_established(self, peer: Peer) -> None:
361        if isinstance(peer.transport, LNTransport):
362            peer_addr = peer.transport.peer_addr
363            # reset connection attempt count
364            self._on_connection_successfully_established(peer_addr)
365            # add into channel db
366            if self.channel_db:
367                self.channel_db.add_recent_peer(peer_addr)
368            # save network address into channels we might have with peer
369            for chan in peer.channels.values():
370                chan.add_or_update_peer_addr(peer_addr)
371
372    async def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]:
373        now = time.time()
374        await self.channel_db.data_loaded.wait()
375        # first try from recent peers
376        recent_peers = self.channel_db.get_recent_peers()
377        for peer in recent_peers:
378            if not peer:
379                continue
380            if peer.pubkey in self._peers:
381                continue
382            if not self._can_retry_addr(peer, now=now):
383                continue
384            if not self.is_good_peer(peer):
385                continue
386            return [peer]
387        # try random peer from graph
388        unconnected_nodes = self.channel_db.get_200_randomly_sorted_nodes_not_in(self.peers.keys())
389        if unconnected_nodes:
390            for node_id in unconnected_nodes:
391                addrs = self.channel_db.get_node_addresses(node_id)
392                if not addrs:
393                    continue
394                host, port, timestamp = self.choose_preferred_address(list(addrs))
395                try:
396                    peer = LNPeerAddr(host, port, node_id)
397                except ValueError:
398                    continue
399                if not self._can_retry_addr(peer, now=now):
400                    continue
401                if not self.is_good_peer(peer):
402                    continue
403                #self.logger.info('taking random ln peer from our channel db')
404                return [peer]
405
406        # getting desperate... let's try hardcoded fallback list of peers
407        if constants.net in (constants.BitcoinTestnet,):
408            fallback_list = FALLBACK_NODE_LIST_TESTNET
409        elif constants.net in (constants.BitcoinMainnet,):
410            fallback_list = FALLBACK_NODE_LIST_MAINNET
411        else:
412            return []  # regtest??
413
414        fallback_list = [peer for peer in fallback_list if self._can_retry_addr(peer, now=now)]
415        if fallback_list:
416            return [random.choice(fallback_list)]
417
418        # last resort: try dns seeds (BOLT-10)
419        return await run_in_thread(self._get_peers_from_dns_seeds)
420
421    def _get_peers_from_dns_seeds(self) -> Sequence[LNPeerAddr]:
422        # NOTE: potentially long blocking call, do not run directly on asyncio event loop.
423        # Return several peers to reduce the number of dns queries.
424        if not constants.net.LN_DNS_SEEDS:
425            return []
426        dns_seed = random.choice(constants.net.LN_DNS_SEEDS)
427        self.logger.info('asking dns seed "{}" for ln peers'.format(dns_seed))
428        try:
429            # note: this might block for several seconds
430            # this will include bech32-encoded-pubkeys and ports
431            srv_answers = resolve_dns_srv('r{}.{}'.format(
432                constants.net.LN_REALM_BYTE, dns_seed))
433        except dns.exception.DNSException as e:
434            self.logger.info(f'failed querying (1) dns seed "{dns_seed}" for ln peers: {repr(e)}')
435            return []
436        random.shuffle(srv_answers)
437        num_peers = 2 * NUM_PEERS_TARGET
438        srv_answers = srv_answers[:num_peers]
439        # we now have pubkeys and ports but host is still needed
440        peers = []
441        for srv_ans in srv_answers:
442            try:
443                # note: this might block for several seconds
444                answers = dns.resolver.resolve(srv_ans['host'])
445            except dns.exception.DNSException as e:
446                self.logger.info(f'failed querying (2) dns seed "{dns_seed}" for ln peers: {repr(e)}')
447                continue
448            try:
449                ln_host = str(answers[0])
450                port = int(srv_ans['port'])
451                bech32_pubkey = srv_ans['host'].split('.')[0]
452                pubkey = get_compressed_pubkey_from_bech32(bech32_pubkey)
453                peers.append(LNPeerAddr(ln_host, port, pubkey))
454            except Exception as e:
455                self.logger.info(f'error with parsing peer from dns seed: {repr(e)}')
456                continue
457        self.logger.info(f'got {len(peers)} ln peers from dns seed')
458        return peers
459
460    @staticmethod
461    def choose_preferred_address(addr_list: Sequence[Tuple[str, int, int]]) -> Tuple[str, int, int]:
462        assert len(addr_list) >= 1
463        # choose first one that is an IP
464        for host, port, timestamp in addr_list:
465            if is_ip_address(host):
466                return host, port, timestamp
467        # otherwise choose one at random
468        # TODO maybe filter out onion if not on tor?
469        choice = random.choice(addr_list)
470        return choice
471
472    def on_proxy_changed(self, event, *args):
473        for peer in self.peers.values():
474            peer.close_and_cleanup()
475        self._clear_addr_retry_times()
476
477    @log_exceptions
478    async def add_peer(self, connect_str: str) -> Peer:
479        node_id, rest = extract_nodeid(connect_str)
480        peer = self._peers.get(node_id)
481        if not peer:
482            if rest is not None:
483                host, port = split_host_port(rest)
484            else:
485                if not self.channel_db:
486                    addr = trampolines_by_id().get(node_id)
487                    if not addr:
488                        raise ConnStringFormatError(_('Address unknown for node:') + ' ' + bh2u(node_id))
489                    host, port = addr.host, addr.port
490                else:
491                    addrs = self.channel_db.get_node_addresses(node_id)
492                    if not addrs:
493                        raise ConnStringFormatError(_('Don\'t know any addresses for node:') + ' ' + bh2u(node_id))
494                    host, port, timestamp = self.choose_preferred_address(list(addrs))
495            port = int(port)
496            # Try DNS-resolving the host (if needed). This is simply so that
497            # the caller gets a nice exception if it cannot be resolved.
498            try:
499                await asyncio.get_event_loop().getaddrinfo(host, port)
500            except socket.gaierror:
501                raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)'))
502            # add peer
503            peer = await self._add_peer(host, port, node_id)
504        return peer
505
506
507class LNGossip(LNWorker):
508    max_age = 14*24*3600
509    LOGGING_SHORTCUT = 'g'
510
511    def __init__(self):
512        seed = os.urandom(32)
513        node = BIP32Node.from_rootseed(seed, xtype='standard')
514        xprv = node.to_xprv()
515        super().__init__(xprv, LNGOSSIP_FEATURES)
516        self.unknown_ids = set()
517
518    def start_network(self, network: 'Network'):
519        assert network
520        super().start_network(network)
521        asyncio.run_coroutine_threadsafe(self.taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop)
522
523    async def maintain_db(self):
524        await self.channel_db.data_loaded.wait()
525        while True:
526            if len(self.unknown_ids) == 0:
527                self.channel_db.prune_old_policies(self.max_age)
528                self.channel_db.prune_orphaned_channels()
529            await asyncio.sleep(120)
530
531    async def add_new_ids(self, ids: Iterable[bytes]):
532        known = self.channel_db.get_channel_ids()
533        new = set(ids) - set(known)
534        self.unknown_ids.update(new)
535        util.trigger_callback('unknown_channels', len(self.unknown_ids))
536        util.trigger_callback('gossip_peers', self.num_peers())
537        util.trigger_callback('ln_gossip_sync_progress')
538
539    def get_ids_to_query(self) -> Sequence[bytes]:
540        N = 500
541        l = list(self.unknown_ids)
542        self.unknown_ids = set(l[N:])
543        util.trigger_callback('unknown_channels', len(self.unknown_ids))
544        util.trigger_callback('ln_gossip_sync_progress')
545        return l[0:N]
546
547    def get_sync_progress_estimate(self) -> Tuple[Optional[int], Optional[int], Optional[int]]:
548        """Estimates the gossip synchronization process and returns the number
549        of synchronized channels, the total channels in the network and a
550        rescaled percentage of the synchronization process."""
551        if self.num_peers() == 0:
552            return None, None, None
553        nchans_with_0p, nchans_with_1p, nchans_with_2p = self.channel_db.get_num_channels_partitioned_by_policy_count()
554        num_db_channels = nchans_with_0p + nchans_with_1p + nchans_with_2p
555        # some channels will never have two policies (only one is in gossip?...)
556        # so if we have at least 1 policy for a channel, we consider that channel "complete" here
557        current_est = num_db_channels - nchans_with_0p
558        total_est = len(self.unknown_ids) + num_db_channels
559
560        progress = current_est / total_est if total_est and current_est else 0
561        progress_percent = (1.0 / 0.95 * progress) * 100
562        progress_percent = min(progress_percent, 100)
563        progress_percent = round(progress_percent)
564        # take a minimal number of synchronized channels to get a more accurate
565        # percentage estimate
566        if current_est < 200:
567            progress_percent = 0
568        return current_est, total_est, progress_percent
569
570    async def process_gossip(self, chan_anns, node_anns, chan_upds):
571        # note: we run in the originating peer's TaskGroup, so we can safely raise here
572        #       and disconnect only from that peer
573        await self.channel_db.data_loaded.wait()
574        self.logger.debug(f'process_gossip {len(chan_anns)} {len(node_anns)} {len(chan_upds)}')
575        # channel announcements
576        def process_chan_anns():
577            for payload in chan_anns:
578                self.channel_db.verify_channel_announcement(payload)
579            self.channel_db.add_channel_announcements(chan_anns)
580        await run_in_thread(process_chan_anns)
581        # node announcements
582        def process_node_anns():
583            for payload in node_anns:
584                self.channel_db.verify_node_announcement(payload)
585            self.channel_db.add_node_announcements(node_anns)
586        await run_in_thread(process_node_anns)
587        # channel updates
588        categorized_chan_upds = await run_in_thread(partial(
589            self.channel_db.add_channel_updates,
590            chan_upds,
591            max_age=self.max_age))
592        orphaned = categorized_chan_upds.orphaned
593        if orphaned:
594            self.logger.info(f'adding {len(orphaned)} unknown channel ids')
595            orphaned_ids = [c['short_channel_id'] for c in orphaned]
596            await self.add_new_ids(orphaned_ids)
597        if categorized_chan_upds.good:
598            self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds)}')
599
600
601class LNWallet(LNWorker):
602
603    lnwatcher: Optional['LNWalletWatcher']
604    MPP_EXPIRY = 120
605    TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS = 3  # seconds
606
607    def __init__(self, wallet: 'Abstract_Wallet', xprv):
608        self.wallet = wallet
609        self.db = wallet.db
610        Logger.__init__(self)
611        LNWorker.__init__(self, xprv, LNWALLET_FEATURES)
612        self.config = wallet.config
613        self.lnwatcher = None
614        self.lnrater: LNRater = None
615        self.payments = self.db.get_dict('lightning_payments')     # RHASH -> amount, direction, is_paid
616        self.preimages = self.db.get_dict('lightning_preimages')   # RHASH -> preimage
617        # note: this sweep_address is only used as fallback; as it might result in address-reuse
618        self.sweep_address = wallet.get_new_sweep_address_for_channel()
619        self.logs = defaultdict(list)  # type: Dict[str, List[HtlcLog]]  # key is RHASH  # (not persisted)
620        # used in tests
621        self.enable_htlc_settle = True
622        self.enable_htlc_forwarding = True
623
624        # note: accessing channels (besides simple lookup) needs self.lock!
625        self._channels = {}  # type: Dict[bytes, Channel]
626        channels = self.db.get_dict("channels")
627        for channel_id, c in random_shuffled_copy(channels.items()):
628            self._channels[bfh(channel_id)] = Channel(c, sweep_address=self.sweep_address, lnworker=self)
629
630        self._channel_backups = {}  # type: Dict[bytes, ChannelBackup]
631        # order is important: imported should overwrite onchain
632        for name in ["onchain_channel_backups", "imported_channel_backups"]:
633            channel_backups = self.db.get_dict(name)
634            for channel_id, storage in channel_backups.items():
635                self._channel_backups[bfh(channel_id)] = ChannelBackup(storage, sweep_address=self.sweep_address, lnworker=self)
636
637        self.sent_htlcs = defaultdict(asyncio.Queue)  # type: Dict[bytes, asyncio.Queue[HtlcLog]]
638        self.sent_htlcs_routes = dict()               # (RHASH, scid, htlc_id) -> route, payment_secret, amount_msat, bucket_msat
639        self.sent_buckets = dict()                    # payment_secret -> (amount_sent, amount_failed)
640        self.received_mpp_htlcs = dict()                  # RHASH -> mpp_status, htlc_set
641
642        self.swap_manager = SwapManager(wallet=self.wallet, lnworker=self)
643        # detect inflight payments
644        self.inflight_payments = set()        # (not persisted) keys of invoices that are in PR_INFLIGHT state
645        for payment_hash in self.get_payments(status='inflight').keys():
646            self.set_invoice_status(payment_hash.hex(), PR_INFLIGHT)
647
648        self.trampoline_forwarding_failures = {} # todo: should be persisted
649
650    def has_deterministic_node_id(self):
651        return bool(self.db.get('lightning_xprv'))
652
653    def has_recoverable_channels(self):
654        # TODO: expose use_recoverable_channels in preferences
655        return self.has_deterministic_node_id() \
656            and self.config.get('use_recoverable_channels', True) \
657            and not (self.config.get('lightning_listen'))
658
659    @property
660    def channels(self) -> Mapping[bytes, Channel]:
661        """Returns a read-only copy of channels."""
662        with self.lock:
663            return self._channels.copy()
664
665    @property
666    def channel_backups(self) -> Mapping[bytes, ChannelBackup]:
667        """Returns a read-only copy of channels."""
668        with self.lock:
669            return self._channel_backups.copy()
670
671    def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
672        return self._channels.get(channel_id, None)
673
674    def diagnostic_name(self):
675        return self.wallet.diagnostic_name()
676
677    @ignore_exceptions
678    @log_exceptions
679    async def sync_with_local_watchtower(self):
680        watchtower = self.network.local_watchtower
681        if watchtower:
682            while True:
683                for chan in self.channels.values():
684                    await self.sync_channel_with_watchtower(chan, watchtower.sweepstore)
685                await asyncio.sleep(5)
686
687    @ignore_exceptions
688    @log_exceptions
689    async def sync_with_remote_watchtower(self):
690        while True:
691            # periodically poll if the user updated 'watchtower_url'
692            await asyncio.sleep(5)
693            watchtower_url = self.config.get('watchtower_url')
694            if not watchtower_url:
695                continue
696            parsed_url = urllib.parse.urlparse(watchtower_url)
697            if not (parsed_url.scheme == 'https' or is_private_netaddress(parsed_url.hostname)):
698                self.logger.warning(f"got watchtower URL for remote tower but we won't use it! "
699                                    f"can only use HTTPS (except if private IP): not using {watchtower_url!r}")
700                continue
701            # try to sync with the remote watchtower
702            try:
703                async with make_aiohttp_session(proxy=self.network.proxy) as session:
704                    watchtower = JsonRPCClient(session, watchtower_url)
705                    watchtower.add_method('get_ctn')
706                    watchtower.add_method('add_sweep_tx')
707                    for chan in self.channels.values():
708                        await self.sync_channel_with_watchtower(chan, watchtower)
709            except aiohttp.client_exceptions.ClientConnectorError:
710                self.logger.info(f'could not contact remote watchtower {watchtower_url}')
711
712    async def sync_channel_with_watchtower(self, chan: Channel, watchtower):
713        outpoint = chan.funding_outpoint.to_str()
714        addr = chan.get_funding_address()
715        current_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
716        watchtower_ctn = await watchtower.get_ctn(outpoint, addr)
717        for ctn in range(watchtower_ctn + 1, current_ctn):
718            sweeptxs = chan.create_sweeptxs(ctn)
719            for tx in sweeptxs:
720                await watchtower.add_sweep_tx(outpoint, ctn, tx.inputs()[0].prevout.to_str(), tx.serialize())
721
722    def start_network(self, network: 'Network'):
723        assert network
724        self.network = network
725        self.config = network.config
726        self.lnwatcher = LNWalletWatcher(self, network)
727        self.lnwatcher.start_network(network)
728        self.swap_manager.start_network(network=network, lnwatcher=self.lnwatcher)
729        self.lnrater = LNRater(self, network)
730
731        for chan in self.channels.values():
732            self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
733        for cb in self.channel_backups.values():
734            self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address())
735
736        for coro in [
737                self.maybe_listen(),
738                self.lnwatcher.on_network_update('network_updated'), # shortcut (don't block) if funding tx locked and verified
739                self.reestablish_peers_and_channels(),
740                self.sync_with_local_watchtower(),
741                self.sync_with_remote_watchtower(),
742        ]:
743            tg_coro = self.taskgroup.spawn(coro)
744            asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop)
745
746    async def stop(self):
747        self.stopping_soon = True
748        if self.listen_server:  # stop accepting new peers
749            self.listen_server.close()
750        async with ignore_after(self.TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS):
751            await self.wait_for_received_pending_htlcs_to_get_removed()
752        await LNWorker.stop(self)
753        if self.lnwatcher:
754            await self.lnwatcher.stop()
755            self.lnwatcher = None
756
757    async def wait_for_received_pending_htlcs_to_get_removed(self):
758        assert self.stopping_soon is True
759        # We try to fail pending MPP HTLCs, and wait a bit for them to get removed.
760        # Note: even without MPP, if we just failed/fulfilled an HTLC, it is good
761        #       to wait a bit for it to become irrevocably removed.
762        # Note: we don't wait for *all htlcs* to get removed, only for those
763        #       that we can already fail/fulfill. e.g. forwarded htlcs cannot be removed
764        async with TaskGroup() as group:
765            for peer in self.peers.values():
766                await group.spawn(peer.wait_one_htlc_switch_iteration())
767        while True:
768            if all(not peer.received_htlcs_pending_removal for peer in self.peers.values()):
769                break
770            async with TaskGroup(wait=any) as group:
771                for peer in self.peers.values():
772                    await group.spawn(peer.received_htlc_removed_event.wait())
773
774    def peer_closed(self, peer):
775        for chan in self.channels_for_peer(peer.pubkey).values():
776            chan.peer_state = PeerState.DISCONNECTED
777            util.trigger_callback('channel', self.wallet, chan)
778        super().peer_closed(peer)
779
780    def get_payments(self, *, status=None) -> Mapping[bytes, List[HTLCWithStatus]]:
781        out = defaultdict(list)
782        for chan in self.channels.values():
783            d = chan.get_payments(status=status)
784            for payment_hash, plist in d.items():
785                out[payment_hash] += plist
786        return out
787
788    def get_payment_value(
789            self, info: Optional['PaymentInfo'], plist: List[HTLCWithStatus],
790    ) -> Tuple[int, int, int]:
791        assert plist
792        amount_msat = 0
793        fee_msat = None
794        for htlc_with_status in plist:
795            htlc = htlc_with_status.htlc
796            _direction = htlc_with_status.direction
797            amount_msat += int(_direction) * htlc.amount_msat
798            if _direction == SENT and info and info.amount_msat:
799                fee_msat = (fee_msat or 0) - info.amount_msat - amount_msat
800        timestamp = min([htlc_with_status.htlc.timestamp for htlc_with_status in plist])
801        return amount_msat, fee_msat, timestamp
802
803    def get_lightning_history(self):
804        out = {}
805        for payment_hash, plist in self.get_payments(status='settled').items():
806            if len(plist) == 0:
807                continue
808            key = payment_hash.hex()
809            info = self.get_payment_info(payment_hash)
810            amount_msat, fee_msat, timestamp = self.get_payment_value(info, plist)
811            if info is not None:
812                label = self.wallet.get_label(key)
813                direction = ('sent' if info.direction == SENT else 'received') if len(plist)==1 else 'self-payment'
814            else:
815                direction = 'forwarding'
816                label = _('Forwarding')
817            preimage = self.get_preimage(payment_hash).hex()
818            item = {
819                'type': 'payment',
820                'label': label,
821                'timestamp': timestamp or 0,
822                'date': timestamp_to_datetime(timestamp),
823                'direction': direction,
824                'amount_msat': amount_msat,
825                'fee_msat': fee_msat,
826                'payment_hash': key,
827                'preimage': preimage,
828            }
829            # add group_id to swap transactions
830            swap = self.swap_manager.get_swap(payment_hash)
831            if swap:
832                if swap.is_reverse:
833                    item['group_id'] = swap.spending_txid
834                    item['group_label'] = 'Reverse swap' + ' ' + self.config.format_amount_and_units(swap.lightning_amount)
835                else:
836                    item['group_id'] = swap.funding_txid
837                    item['group_label'] = 'Forward swap' + ' ' + self.config.format_amount_and_units(swap.onchain_amount)
838            # done
839            out[payment_hash] = item
840        return out
841
842    def get_onchain_history(self):
843        current_height = self.wallet.get_local_height()
844        out = {}
845        # add funding events
846        for chan in self.channels.values():
847            item = chan.get_funding_height()
848            if item is None:
849                continue
850            if not self.lnwatcher:
851                continue  # lnwatcher not available with --offline (its data is not persisted)
852            funding_txid, funding_height, funding_timestamp = item
853            tx_height = self.lnwatcher.get_tx_height(funding_txid)
854            item = {
855                'channel_id': bh2u(chan.channel_id),
856                'type': 'channel_opening',
857                'label': self.wallet.get_label_for_txid(funding_txid) or (_('Open channel') + ' ' + chan.get_id_for_log()),
858                'txid': funding_txid,
859                'amount_msat': chan.balance(LOCAL, ctn=0),
860                'direction': 'received',
861                'timestamp': tx_height.timestamp,
862                'date': timestamp_to_datetime(tx_height.timestamp),
863                'fee_sat': None,
864                'fee_msat': None,
865                'height': tx_height.height,
866                'confirmations': tx_height.conf,
867            }
868            out[funding_txid] = item
869            item = chan.get_closing_height()
870            if item is None:
871                continue
872            closing_txid, closing_height, closing_timestamp = item
873            tx_height = self.lnwatcher.get_tx_height(closing_txid)
874            item = {
875                'channel_id': bh2u(chan.channel_id),
876                'txid': closing_txid,
877                'label': self.wallet.get_label_for_txid(closing_txid) or (_('Close channel') + ' ' + chan.get_id_for_log()),
878                'type': 'channel_closure',
879                'amount_msat': -chan.balance_minus_outgoing_htlcs(LOCAL),
880                'direction': 'sent',
881                'timestamp': tx_height.timestamp,
882                'date': timestamp_to_datetime(tx_height.timestamp),
883                'fee_sat': None,
884                'fee_msat': None,
885                'height': tx_height.height,
886                'confirmations': tx_height.conf,
887            }
888            out[closing_txid] = item
889        # add info about submarine swaps
890        settled_payments = self.get_payments(status='settled')
891        for payment_hash_hex, swap in self.swap_manager.swaps.items():
892            txid = swap.spending_txid if swap.is_reverse else swap.funding_txid
893            if txid is None:
894                continue
895            payment_hash = bytes.fromhex(payment_hash_hex)
896            if payment_hash in settled_payments:
897                plist = settled_payments[payment_hash]
898                info = self.get_payment_info(payment_hash)
899                amount_msat, fee_msat, timestamp = self.get_payment_value(info, plist)
900            else:
901                amount_msat = 0
902            label = 'Reverse swap' if swap.is_reverse else 'Forward swap'
903            delta = current_height - swap.locktime
904            if not swap.is_redeemed and swap.spending_txid is None and delta < 0:
905                label += f' (refundable in {-delta} blocks)' # fixme: only if unspent
906            out[txid] = {
907                'txid': txid,
908                'group_id': txid,
909                'amount_msat': 0,
910                #'amount_msat': amount_msat, # must not be added
911                'type': 'swap',
912                'label': self.wallet.get_label_for_txid(txid) or label,
913            }
914        return out
915
916    def get_history(self):
917        out = list(self.get_lightning_history().values()) + list(self.get_onchain_history().values())
918        # sort by timestamp
919        out.sort(key=lambda x: (x.get('timestamp') or float("inf")))
920        balance_msat = 0
921        for item in out:
922            balance_msat += item['amount_msat']
923            item['balance_msat'] = balance_msat
924        return out
925
926    def channel_peers(self) -> List[bytes]:
927        node_ids = [chan.node_id for chan in self.channels.values() if not chan.is_closed()]
928        return node_ids
929
930    def channels_for_peer(self, node_id):
931        assert type(node_id) is bytes
932        return {chan_id: chan for (chan_id, chan) in self.channels.items()
933                if chan.node_id == node_id}
934
935    def channel_state_changed(self, chan: Channel):
936        if type(chan) is Channel:
937            self.save_channel(chan)
938        util.trigger_callback('channel', self.wallet, chan)
939
940    def save_channel(self, chan: Channel):
941        assert type(chan) is Channel
942        if chan.config[REMOTE].next_per_commitment_point == chan.config[REMOTE].current_per_commitment_point:
943            raise Exception("Tried to save channel with next_point == current_point, this should not happen")
944        self.wallet.save_db()
945        util.trigger_callback('channel', self.wallet, chan)
946
947    def channel_by_txo(self, txo: str) -> Optional[AbstractChannel]:
948        for chan in self.channels.values():
949            if chan.funding_outpoint.to_str() == txo:
950                return chan
951        for chan in self.channel_backups.values():
952            if chan.funding_outpoint.to_str() == txo:
953                return chan
954
955    async def on_channel_update(self, chan: Channel):
956        if type(chan) is ChannelBackup:
957            util.trigger_callback('channel', self.wallet, chan)
958            return
959
960        if chan.get_state() == ChannelState.OPEN and chan.should_be_closed_due_to_expiring_htlcs(self.network.get_local_height()):
961            self.logger.info(f"force-closing due to expiring htlcs")
962            await self.try_force_closing(chan.channel_id)
963
964        elif chan.get_state() == ChannelState.FUNDED:
965            peer = self._peers.get(chan.node_id)
966            if peer and peer.is_initialized():
967                peer.send_funding_locked(chan)
968
969        elif chan.get_state() == ChannelState.OPEN:
970            peer = self._peers.get(chan.node_id)
971            if peer:
972                await peer.maybe_update_fee(chan)
973                conf = self.lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf
974                peer.on_network_update(chan, conf)
975
976        elif chan.get_state() == ChannelState.FORCE_CLOSING:
977            force_close_tx = chan.force_close_tx()
978            txid = force_close_tx.txid()
979            height = self.lnwatcher.get_tx_height(txid).height
980            if height == TX_HEIGHT_LOCAL:
981                self.logger.info('REBROADCASTING CLOSING TX')
982                await self.network.try_broadcasting(force_close_tx, 'force-close')
983
984    @log_exceptions
985    async def _open_channel_coroutine(
986            self, *,
987            connect_str: str,
988            funding_tx: PartialTransaction,
989            funding_sat: int,
990            push_sat: int,
991            password: Optional[str]) -> Tuple[Channel, PartialTransaction]:
992
993        peer = await self.add_peer(connect_str)
994        coro = peer.channel_establishment_flow(
995            funding_tx=funding_tx,
996            funding_sat=funding_sat,
997            push_msat=push_sat * 1000,
998            temp_channel_id=os.urandom(32))
999        chan, funding_tx = await asyncio.wait_for(coro, LN_P2P_NETWORK_TIMEOUT)
1000        util.trigger_callback('channels_updated', self.wallet)
1001        self.wallet.add_transaction(funding_tx)  # save tx as local into the wallet
1002        self.wallet.sign_transaction(funding_tx, password)
1003        self.wallet.set_label(funding_tx.txid(), _('Open channel'))
1004        if funding_tx.is_complete():
1005            await self.network.try_broadcasting(funding_tx, 'open_channel')
1006        return chan, funding_tx
1007
1008    def add_channel(self, chan: Channel):
1009        with self.lock:
1010            self._channels[chan.channel_id] = chan
1011        self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
1012
1013    def add_new_channel(self, chan: Channel):
1014        self.add_channel(chan)
1015        channels_db = self.db.get_dict('channels')
1016        channels_db[chan.channel_id.hex()] = chan.storage
1017        for addr in chan.get_wallet_addresses_channel_might_want_reserved():
1018            self.wallet.set_reserved_state_of_address(addr, reserved=True)
1019        try:
1020            self.save_channel(chan)
1021            backup_dir = self.config.get_backup_dir()
1022            if backup_dir is not None:
1023                self.wallet.save_backup(backup_dir)
1024        except:
1025            chan.set_state(ChannelState.REDEEMED)
1026            self.remove_channel(chan.channel_id)
1027            raise
1028
1029    def cb_data(self, node_id):
1030        return CB_MAGIC_BYTES + node_id[0:16]
1031
1032    def decrypt_cb_data(self, encrypted_data, funding_address):
1033        funding_scripthash = bytes.fromhex(address_to_scripthash(funding_address))
1034        nonce = funding_scripthash[0:12]
1035        return chacha20_decrypt(key=self.backup_key, data=encrypted_data, nonce=nonce)
1036
1037    def encrypt_cb_data(self, data, funding_address):
1038        funding_scripthash = bytes.fromhex(address_to_scripthash(funding_address))
1039        nonce = funding_scripthash[0:12]
1040        return chacha20_encrypt(key=self.backup_key, data=data, nonce=nonce)
1041
1042    def mktx_for_open_channel(
1043            self, *,
1044            coins: Sequence[PartialTxInput],
1045            funding_sat: int,
1046            node_id: bytes,
1047            fee_est=None) -> PartialTransaction:
1048        outputs = [PartialTxOutput.from_address_and_value(ln_dummy_address(), funding_sat)]
1049        if self.has_recoverable_channels():
1050            dummy_scriptpubkey = make_op_return(self.cb_data(node_id))
1051            outputs.append(PartialTxOutput(scriptpubkey=dummy_scriptpubkey, value=0))
1052        tx = self.wallet.make_unsigned_transaction(
1053            coins=coins,
1054            outputs=outputs,
1055            fee=fee_est)
1056        tx.set_rbf(False)
1057        return tx
1058
1059    def open_channel(self, *, connect_str: str, funding_tx: PartialTransaction,
1060                     funding_sat: int, push_amt_sat: int, password: str = None) -> Tuple[Channel, PartialTransaction]:
1061        if funding_sat > LN_MAX_FUNDING_SAT:
1062            raise Exception(_("Requested channel capacity is over protocol allowed maximum."))
1063        coro = self._open_channel_coroutine(
1064            connect_str=connect_str, funding_tx=funding_tx, funding_sat=funding_sat,
1065            push_sat=push_amt_sat, password=password)
1066        fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
1067        try:
1068            chan, funding_tx = fut.result()
1069        except concurrent.futures.TimeoutError:
1070            raise Exception(_("open_channel timed out"))
1071        return chan, funding_tx
1072
1073    def get_channel_by_short_id(self, short_channel_id: bytes) -> Optional[Channel]:
1074        for chan in self.channels.values():
1075            if chan.short_channel_id == short_channel_id:
1076                return chan
1077
1078    @log_exceptions
1079    async def pay_invoice(
1080            self, invoice: str, *,
1081            amount_msat: int = None,
1082            attempts: int = 1,
1083            full_path: LNPaymentPath = None) -> Tuple[bool, List[HtlcLog]]:
1084
1085        lnaddr = self._check_invoice(invoice, amount_msat=amount_msat)
1086        min_cltv_expiry = lnaddr.get_min_final_cltv_expiry()
1087        payment_hash = lnaddr.paymenthash
1088        key = payment_hash.hex()
1089        payment_secret = lnaddr.payment_secret
1090        invoice_pubkey = lnaddr.pubkey.serialize()
1091        invoice_features = lnaddr.get_features()
1092        r_tags = lnaddr.get_routing_info('r')
1093        amount_to_pay = lnaddr.get_amount_msat()
1094        status = self.get_payment_status(payment_hash)
1095        if status == PR_PAID:
1096            raise PaymentFailure(_("This invoice has been paid already"))
1097        if status == PR_INFLIGHT:
1098            raise PaymentFailure(_("A payment was already initiated for this invoice"))
1099        if payment_hash in self.get_payments(status='inflight'):
1100            raise PaymentFailure(_("A previous attempt to pay this invoice did not clear"))
1101        info = PaymentInfo(payment_hash, amount_to_pay, SENT, PR_UNPAID)
1102        self.save_payment_info(info)
1103        self.wallet.set_label(key, lnaddr.get_description())
1104
1105        self.set_invoice_status(key, PR_INFLIGHT)
1106        try:
1107            await self.pay_to_node(
1108                node_pubkey=invoice_pubkey,
1109                payment_hash=payment_hash,
1110                payment_secret=payment_secret,
1111                amount_to_pay=amount_to_pay,
1112                min_cltv_expiry=min_cltv_expiry,
1113                r_tags=r_tags,
1114                invoice_features=invoice_features,
1115                attempts=attempts,
1116                full_path=full_path)
1117            success = True
1118        except PaymentFailure as e:
1119            self.logger.info(f'payment failure: {e!r}')
1120            success = False
1121            reason = str(e)
1122        if success:
1123            self.set_invoice_status(key, PR_PAID)
1124            util.trigger_callback('payment_succeeded', self.wallet, key)
1125        else:
1126            self.set_invoice_status(key, PR_UNPAID)
1127            util.trigger_callback('payment_failed', self.wallet, key, reason)
1128        log = self.logs[key]
1129        return success, log
1130
1131    async def pay_to_node(
1132            self, *,
1133            node_pubkey: bytes,
1134            payment_hash: bytes,
1135            payment_secret: Optional[bytes],
1136            amount_to_pay: int,  # in msat
1137            min_cltv_expiry: int,
1138            r_tags,
1139            invoice_features: int,
1140            attempts: int = 1,
1141            full_path: LNPaymentPath = None,
1142            fwd_trampoline_onion=None,
1143            fwd_trampoline_fee=None,
1144            fwd_trampoline_cltv_delta=None) -> None:
1145
1146        if fwd_trampoline_onion:
1147            # todo: compare to the fee of the actual route we found
1148            if fwd_trampoline_fee < 1000:
1149                raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT, data=b'')
1150            if fwd_trampoline_cltv_delta < 576:
1151                raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON, data=b'')
1152
1153        self.logs[payment_hash.hex()] = log = []
1154        trampoline_fee_level = self.INITIAL_TRAMPOLINE_FEE_LEVEL
1155        use_two_trampolines = True # only used for pay to legacy
1156
1157        amount_inflight = 0  # what we sent in htlcs (that receiver gets, without fees)
1158        while True:
1159            amount_to_send = amount_to_pay - amount_inflight
1160            if amount_to_send > 0:
1161                # 1. create a set of routes for remaining amount.
1162                # note: path-finding runs in a separate thread so that we don't block the asyncio loop
1163                # graph updates might occur during the computation
1164                routes = self.create_routes_for_payment(
1165                    amount_msat=amount_to_send,
1166                    final_total_msat=amount_to_pay,
1167                    invoice_pubkey=node_pubkey,
1168                    min_cltv_expiry=min_cltv_expiry,
1169                    r_tags=r_tags,
1170                    invoice_features=invoice_features,
1171                    full_path=full_path,
1172                    payment_hash=payment_hash,
1173                    payment_secret=payment_secret,
1174                    trampoline_fee_level=trampoline_fee_level,
1175                    use_two_trampolines=use_two_trampolines,
1176                    fwd_trampoline_onion=fwd_trampoline_onion
1177                )
1178                # 2. send htlcs
1179                async for route, amount_msat, total_msat, amount_receiver_msat, cltv_delta, bucket_payment_secret, trampoline_onion in routes:
1180                    amount_inflight += amount_receiver_msat
1181                    if amount_inflight > amount_to_pay:  # safety belts
1182                        raise Exception(f"amount_inflight={amount_inflight} > amount_to_pay={amount_to_pay}")
1183                    await self.pay_to_route(
1184                        route=route,
1185                        amount_msat=amount_msat,
1186                        total_msat=total_msat,
1187                        amount_receiver_msat=amount_receiver_msat,
1188                        payment_hash=payment_hash,
1189                        payment_secret=bucket_payment_secret,
1190                        min_cltv_expiry=cltv_delta,
1191                        trampoline_onion=trampoline_onion)
1192                util.trigger_callback('invoice_status', self.wallet, payment_hash.hex())
1193            # 3. await a queue
1194            self.logger.info(f"amount inflight {amount_inflight}")
1195            htlc_log = await self.sent_htlcs[payment_hash].get()
1196            amount_inflight -= htlc_log.amount_msat
1197            if amount_inflight < 0:
1198                raise Exception(f"amount_inflight={amount_inflight} < 0")
1199            log.append(htlc_log)
1200            if htlc_log.success:
1201                if self.network.path_finder:
1202                    # TODO: report every route to liquidity hints for mpp
1203                    # in the case of success, we report channels of the
1204                    # route as being able to send the same amount in the future,
1205                    # as we assume to not know the capacity
1206                    self.network.path_finder.update_liquidity_hints(htlc_log.route, htlc_log.amount_msat)
1207                    # remove inflight htlcs from liquidity hints
1208                    self.network.path_finder.update_inflight_htlcs(htlc_log.route, add_htlcs=False)
1209                return
1210            # htlc failed
1211            if len(log) >= attempts:
1212                raise PaymentFailure('Giving up after %d attempts'%len(log))
1213            # if we get a tmp channel failure, it might work to split the amount and try more routes
1214            # if we get a channel update, we might retry the same route and amount
1215            route = htlc_log.route
1216            sender_idx = htlc_log.sender_idx
1217            failure_msg = htlc_log.failure_msg
1218            code, data = failure_msg.code, failure_msg.data
1219            self.logger.info(f"UPDATE_FAIL_HTLC. code={repr(code)}. "
1220                             f"decoded_data={failure_msg.decode_data()}. data={data.hex()!r}")
1221            self.logger.info(f"error reported by {bh2u(route[sender_idx].node_id)}")
1222            if code == OnionFailureCode.MPP_TIMEOUT:
1223                raise PaymentFailure(failure_msg.code_name())
1224            # trampoline
1225            if not self.channel_db:
1226                # FIXME The trampoline nodes in the path are chosen randomly.
1227                #       Some of the errors might depend on how we have chosen them.
1228                #       Having more attempts is currently useful in part because of the randomness,
1229                #       instead we should give feedback to create_routes_for_payment.
1230                if code in (OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT,
1231                            OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON):
1232                    # todo: parse the node parameters here (not returned by eclair yet)
1233                    trampoline_fee_level += 1
1234                    continue
1235                elif use_two_trampolines:
1236                    use_two_trampolines = False
1237                elif code in (OnionFailureCode.UNKNOWN_NEXT_PEER,
1238                              OnionFailureCode.TEMPORARY_NODE_FAILURE):
1239                    continue
1240                else:
1241                    raise PaymentFailure(failure_msg.code_name())
1242            else:
1243                self.handle_error_code_from_failed_htlc(
1244                    route=route, sender_idx=sender_idx, failure_msg=failure_msg, amount=htlc_log.amount_msat)
1245
1246    async def pay_to_route(
1247            self, *,
1248            route: LNPaymentRoute,
1249            amount_msat: int,
1250            total_msat: int,
1251            amount_receiver_msat:int,
1252            payment_hash: bytes,
1253            payment_secret: Optional[bytes],
1254            min_cltv_expiry: int,
1255            trampoline_onion: bytes = None) -> None:
1256
1257        # send a single htlc
1258        short_channel_id = route[0].short_channel_id
1259        chan = self.get_channel_by_short_id(short_channel_id)
1260        peer = self._peers.get(route[0].node_id)
1261        if not peer:
1262            raise PaymentFailure('Dropped peer')
1263        await peer.initialized
1264        htlc = peer.pay(
1265            route=route,
1266            chan=chan,
1267            amount_msat=amount_msat,
1268            total_msat=total_msat,
1269            payment_hash=payment_hash,
1270            min_final_cltv_expiry=min_cltv_expiry,
1271            payment_secret=payment_secret,
1272            trampoline_onion=trampoline_onion)
1273
1274        key = (payment_hash, short_channel_id, htlc.htlc_id)
1275        self.sent_htlcs_routes[key] = route, payment_secret, amount_msat, total_msat, amount_receiver_msat
1276        # if we sent MPP to a trampoline, add item to sent_buckets
1277        if not self.channel_db and amount_msat != total_msat:
1278            if payment_secret not in self.sent_buckets:
1279                self.sent_buckets[payment_secret] = (0, 0)
1280            amount_sent, amount_failed = self.sent_buckets[payment_secret]
1281            amount_sent += amount_receiver_msat
1282            self.sent_buckets[payment_secret] = amount_sent, amount_failed
1283        if self.network.path_finder:
1284            # add inflight htlcs to liquidity hints
1285            self.network.path_finder.update_inflight_htlcs(route, add_htlcs=True)
1286        util.trigger_callback('htlc_added', chan, htlc, SENT)
1287
1288    def handle_error_code_from_failed_htlc(
1289            self,
1290            *,
1291            route: LNPaymentRoute,
1292            sender_idx: int,
1293            failure_msg: OnionRoutingFailure,
1294            amount: int) -> None:
1295
1296        assert self.channel_db  # cannot be in trampoline mode
1297        assert self.network.path_finder
1298
1299        # remove inflight htlcs from liquidity hints
1300        self.network.path_finder.update_inflight_htlcs(route, add_htlcs=False)
1301
1302        code, data = failure_msg.code, failure_msg.data
1303        # TODO can we use lnmsg.OnionWireSerializer here?
1304        # TODO update onion_wire.csv
1305        # handle some specific error codes
1306        failure_codes = {
1307            OnionFailureCode.TEMPORARY_CHANNEL_FAILURE: 0,
1308            OnionFailureCode.AMOUNT_BELOW_MINIMUM: 8,
1309            OnionFailureCode.FEE_INSUFFICIENT: 8,
1310            OnionFailureCode.INCORRECT_CLTV_EXPIRY: 4,
1311            OnionFailureCode.EXPIRY_TOO_SOON: 0,
1312            OnionFailureCode.CHANNEL_DISABLED: 2,
1313        }
1314
1315        # determine a fallback channel to blacklist if we don't get the erring
1316        # channel via the payload
1317        if sender_idx is None:
1318            raise PaymentFailure(failure_msg.code_name())
1319        try:
1320            fallback_channel = route[sender_idx + 1].short_channel_id
1321        except IndexError:
1322            raise PaymentFailure(f'payment destination reported error: {failure_msg.code_name()}') from None
1323
1324        # TODO: handle unknown next peer?
1325        # handle failure codes that include a channel update
1326        if code in failure_codes:
1327            offset = failure_codes[code]
1328            channel_update_len = int.from_bytes(data[offset:offset+2], byteorder="big")
1329            channel_update_as_received = data[offset+2: offset+2+channel_update_len]
1330            payload = self._decode_channel_update_msg(channel_update_as_received)
1331
1332            if payload is None:
1333                self.logger.info(f'could not decode channel_update for failed htlc: '
1334                                 f'{channel_update_as_received.hex()}')
1335                self.network.path_finder.liquidity_hints.add_to_blacklist(fallback_channel)
1336            else:
1337                # apply the channel update or get blacklisted
1338                blacklist, update = self._handle_chanupd_from_failed_htlc(
1339                    payload, route=route, sender_idx=sender_idx)
1340
1341                # we interpret a temporary channel failure as a liquidity issue
1342                # in the channel and update our liquidity hints accordingly
1343                if code == OnionFailureCode.TEMPORARY_CHANNEL_FAILURE:
1344                    self.network.path_finder.update_liquidity_hints(
1345                        route,
1346                        amount,
1347                        failing_channel=ShortChannelID(payload['short_channel_id']))
1348                elif blacklist:
1349                    self.network.path_finder.liquidity_hints.add_to_blacklist(
1350                        payload['short_channel_id'])
1351
1352                # if we can't decide on some action, we are stuck
1353                if not (blacklist or update):
1354                    raise PaymentFailure(failure_msg.code_name())
1355        # for errors that do not include a channel update
1356        else:
1357            self.network.path_finder.liquidity_hints.add_to_blacklist(fallback_channel)
1358
1359    def _handle_chanupd_from_failed_htlc(self, payload, *, route, sender_idx) -> Tuple[bool, bool]:
1360        blacklist = False
1361        update = False
1362        try:
1363            r = self.channel_db.add_channel_update(payload, verify=True)
1364        except InvalidGossipMsg:
1365            return True, False  # blacklist
1366        short_channel_id = ShortChannelID(payload['short_channel_id'])
1367        if r == UpdateStatus.GOOD:
1368            self.logger.info(f"applied channel update to {short_channel_id}")
1369            # TODO: add test for this
1370            # FIXME: this does not work for our own unannounced channels.
1371            for chan in self.channels.values():
1372                if chan.short_channel_id == short_channel_id:
1373                    chan.set_remote_update(payload)
1374            update = True
1375        elif r == UpdateStatus.ORPHANED:
1376            # maybe it is a private channel (and data in invoice was outdated)
1377            self.logger.info(f"Could not find {short_channel_id}. maybe update is for private channel?")
1378            start_node_id = route[sender_idx].node_id
1379            update = self.channel_db.add_channel_update_for_private_channel(payload, start_node_id)
1380            blacklist = not update
1381        elif r == UpdateStatus.EXPIRED:
1382            blacklist = True
1383        elif r == UpdateStatus.DEPRECATED:
1384            self.logger.info(f'channel update is not more recent.')
1385            blacklist = True
1386        elif r == UpdateStatus.UNCHANGED:
1387            blacklist = True
1388        return blacklist, update
1389
1390    @classmethod
1391    def _decode_channel_update_msg(cls, chan_upd_msg: bytes) -> Optional[Dict[str, Any]]:
1392        channel_update_as_received = chan_upd_msg
1393        channel_update_typed = (258).to_bytes(length=2, byteorder="big") + channel_update_as_received
1394        # note: some nodes put channel updates in error msgs with the leading msg_type already there.
1395        #       we try decoding both ways here.
1396        try:
1397            message_type, payload = decode_msg(channel_update_typed)
1398            if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception()
1399            payload['raw'] = channel_update_typed
1400            return payload
1401        except:  # FIXME: too broad
1402            try:
1403                message_type, payload = decode_msg(channel_update_as_received)
1404                if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception()
1405                payload['raw'] = channel_update_as_received
1406                return payload
1407            except:
1408                return None
1409
1410    @staticmethod
1411    def _check_invoice(invoice: str, *, amount_msat: int = None) -> LnAddr:
1412        addr = lndecode(invoice)
1413        if addr.is_expired():
1414            raise InvoiceError(_("This invoice has expired"))
1415        if amount_msat:  # replace amt in invoice. main usecase is paying zero amt invoices
1416            existing_amt_msat = addr.get_amount_msat()
1417            if existing_amt_msat and amount_msat < existing_amt_msat:
1418                raise Exception("cannot pay lower amt than what is originally in LN invoice")
1419            addr.amount = Decimal(amount_msat) / COIN / 1000
1420        if addr.amount is None:
1421            raise InvoiceError(_("Missing amount"))
1422        if addr.get_min_final_cltv_expiry() > lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
1423            raise InvoiceError("{}\n{}".format(
1424                _("Invoice wants us to risk locking funds for unreasonably long."),
1425                f"min_final_cltv_expiry: {addr.get_min_final_cltv_expiry()}"))
1426        return addr
1427
1428    def is_trampoline_peer(self, node_id: bytes) -> bool:
1429        # until trampoline is advertised in lnfeatures, check against hardcoded list
1430        if is_hardcoded_trampoline(node_id):
1431            return True
1432        peer = self._peers.get(node_id)
1433        if peer and peer.their_features.supports(LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT):
1434            return True
1435        return False
1436
1437    def suggest_peer(self) -> Optional[bytes]:
1438        if self.channel_db:
1439            return self.lnrater.suggest_peer()
1440        else:
1441            return random.choice(list(hardcoded_trampoline_nodes().values())).pubkey
1442
1443    async def create_routes_for_payment(
1444            self, *,
1445            amount_msat: int,        # part of payment amount we want routes for now
1446            final_total_msat: int,   # total payment amount final receiver will get
1447            invoice_pubkey,
1448            min_cltv_expiry,
1449            r_tags,
1450            invoice_features: int,
1451            payment_hash,
1452            payment_secret,
1453            trampoline_fee_level: int,
1454            use_two_trampolines: bool,
1455            fwd_trampoline_onion = None,
1456            full_path: LNPaymentPath = None) -> AsyncGenerator[Tuple[LNPaymentRoute, int], None]:
1457
1458        """Creates multiple routes for splitting a payment over the available
1459        private channels.
1460
1461        We first try to conduct the payment over a single channel. If that fails
1462        and mpp is supported by the receiver, we will split the payment."""
1463        # It could happen that the pathfinding uses a channel
1464        # in the graph multiple times, meaning we could exhaust
1465        # its capacity. This could be dealt with by temporarily
1466        # iteratively blacklisting channels for this mpp attempt.
1467        invoice_features = LnFeatures(invoice_features)
1468        trampoline_features = LnFeatures.VAR_ONION_OPT
1469        local_height = self.network.get_local_height()
1470        active_channels = [chan for chan in self.channels.values() if chan.is_active() and not chan.is_frozen_for_sending()]
1471        try:
1472            # try to send over a single channel
1473            if not self.channel_db:
1474                for chan in active_channels:
1475                    if not self.is_trampoline_peer(chan.node_id):
1476                        continue
1477                    if chan.node_id == invoice_pubkey:
1478                        trampoline_onion = None
1479                        trampoline_payment_secret = payment_secret
1480                        trampoline_total_msat = final_total_msat
1481                        amount_with_fees = amount_msat
1482                        cltv_delta = min_cltv_expiry
1483                    else:
1484                        trampoline_onion, amount_with_fees, cltv_delta = create_trampoline_route_and_onion(
1485                            amount_msat=amount_msat,
1486                            total_msat=final_total_msat,
1487                            min_cltv_expiry=min_cltv_expiry,
1488                            my_pubkey=self.node_keypair.pubkey,
1489                            invoice_pubkey=invoice_pubkey,
1490                            invoice_features=invoice_features,
1491                            node_id=chan.node_id,
1492                            r_tags=r_tags,
1493                            payment_hash=payment_hash,
1494                            payment_secret=payment_secret,
1495                            local_height=local_height,
1496                            trampoline_fee_level=trampoline_fee_level,
1497                            use_two_trampolines=use_two_trampolines)
1498                        trampoline_payment_secret = os.urandom(32)
1499                        trampoline_total_msat = amount_with_fees
1500                    if chan.available_to_spend(LOCAL, strict=True) < amount_with_fees:
1501                        continue
1502                    route = [
1503                        RouteEdge(
1504                            start_node=self.node_keypair.pubkey,
1505                            end_node=chan.node_id,
1506                            short_channel_id=chan.short_channel_id,
1507                            fee_base_msat=0,
1508                            fee_proportional_millionths=0,
1509                            cltv_expiry_delta=0,
1510                            node_features=trampoline_features)
1511                    ]
1512                    yield route, amount_with_fees, trampoline_total_msat, amount_msat, cltv_delta, trampoline_payment_secret, trampoline_onion
1513                    break
1514                else:
1515                    raise NoPathFound()
1516            else:
1517                route = await run_in_thread(
1518                    partial(
1519                        self.create_route_for_payment,
1520                        amount_msat=amount_msat,
1521                        invoice_pubkey=invoice_pubkey,
1522                        min_cltv_expiry=min_cltv_expiry,
1523                        r_tags=r_tags,
1524                        invoice_features=invoice_features,
1525                        channels=active_channels,
1526                        full_path=full_path
1527                    )
1528                )
1529                yield route, amount_msat, final_total_msat, amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion
1530        except NoPathFound:
1531            if not invoice_features.supports(LnFeatures.BASIC_MPP_OPT):
1532                raise
1533            channels_with_funds = {
1534                (chan.channel_id, chan.node_id): int(chan.available_to_spend(HTLCOwner.LOCAL))
1535                for chan in active_channels}
1536            self.logger.info(f"channels_with_funds: {channels_with_funds}")
1537            # for trampoline mpp payments we have to restrict ourselves to pay
1538            # to a single node due to some incompatibility in Eclair, see:
1539            # https://github.com/ACINQ/eclair/issues/1723
1540            use_singe_node = not self.channel_db and constants.net is constants.BitcoinMainnet
1541            split_configurations = suggest_splits(amount_msat, channels_with_funds, single_node=use_singe_node)
1542            self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations')
1543
1544            for s in split_configurations:
1545                self.logger.info(f"trying split configuration: {s[0].values()} rating: {s[1]}")
1546                try:
1547                    if not self.channel_db:
1548                        buckets = defaultdict(list)
1549                        for (chan_id, _), part_amount_msat in s[0].items():
1550                            chan = self.channels[chan_id]
1551                            if part_amount_msat:
1552                                buckets[chan.node_id].append((chan_id, part_amount_msat))
1553                        for node_id, bucket in buckets.items():
1554                            bucket_amount_msat = sum([x[1] for x in bucket])
1555                            trampoline_onion, bucket_amount_with_fees, bucket_cltv_delta = create_trampoline_route_and_onion(
1556                                amount_msat=bucket_amount_msat,
1557                                total_msat=final_total_msat,
1558                                min_cltv_expiry=min_cltv_expiry,
1559                                my_pubkey=self.node_keypair.pubkey,
1560                                invoice_pubkey=invoice_pubkey,
1561                                invoice_features=invoice_features,
1562                                node_id=node_id,
1563                                r_tags=r_tags,
1564                                payment_hash=payment_hash,
1565                                payment_secret=payment_secret,
1566                                local_height=local_height,
1567                                trampoline_fee_level=trampoline_fee_level,
1568                                use_two_trampolines=use_two_trampolines)
1569                            # node_features is only used to determine is_tlv
1570                            bucket_payment_secret = os.urandom(32)
1571                            bucket_fees = bucket_amount_with_fees - bucket_amount_msat
1572                            self.logger.info(f'bucket_fees {bucket_fees}')
1573                            for chan_id, part_amount_msat in bucket:
1574                                chan = self.channels[chan_id]
1575                                margin = chan.available_to_spend(LOCAL, strict=True) - part_amount_msat
1576                                delta_fee = min(bucket_fees, margin)
1577                                part_amount_msat_with_fees = part_amount_msat + delta_fee
1578                                bucket_fees -= delta_fee
1579                                route = [
1580                                    RouteEdge(
1581                                        start_node=self.node_keypair.pubkey,
1582                                        end_node=node_id,
1583                                        short_channel_id=chan.short_channel_id,
1584                                        fee_base_msat=0,
1585                                        fee_proportional_millionths=0,
1586                                        cltv_expiry_delta=0,
1587                                        node_features=trampoline_features)
1588                                ]
1589                                self.logger.info(f'adding route {part_amount_msat} {delta_fee} {margin}')
1590                                yield route, part_amount_msat_with_fees, bucket_amount_with_fees, part_amount_msat, bucket_cltv_delta, bucket_payment_secret, trampoline_onion
1591                            if bucket_fees != 0:
1592                                self.logger.info('not enough margin to pay trampoline fee')
1593                                raise NoPathFound()
1594                    else:
1595                        for (chan_id, _), part_amount_msat in s[0].items():
1596                            if part_amount_msat:
1597                                channel = self.channels[chan_id]
1598                                route = await run_in_thread(
1599                                    partial(
1600                                        self.create_route_for_payment,
1601                                        amount_msat=part_amount_msat,
1602                                        invoice_pubkey=invoice_pubkey,
1603                                        min_cltv_expiry=min_cltv_expiry,
1604                                        r_tags=r_tags,
1605                                        invoice_features=invoice_features,
1606                                        channels=[channel],
1607                                        full_path=None
1608                                    )
1609                                )
1610                                yield route, part_amount_msat, final_total_msat, part_amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion
1611                    self.logger.info(f"found acceptable split configuration: {list(s[0].values())} rating: {s[1]}")
1612                    break
1613                except NoPathFound:
1614                    continue
1615            else:
1616                raise NoPathFound()
1617
1618    @profiler
1619    def create_route_for_payment(
1620            self, *,
1621            amount_msat: int,
1622            invoice_pubkey: bytes,
1623            min_cltv_expiry: int,
1624            r_tags,
1625            invoice_features: int,
1626            channels: List[Channel],
1627            full_path: Optional[LNPaymentPath]) -> LNPaymentRoute:
1628
1629        scid_to_my_channels = {
1630            chan.short_channel_id: chan for chan in channels
1631            if chan.short_channel_id is not None
1632        }
1633        # Collect all private edges from route hints.
1634        # Note: if some route hints are multiple edges long, and these paths cross each other,
1635        #       we allow our path finding to cross the paths; i.e. the route hints are not isolated.
1636        private_route_edges = {}  # type: Dict[ShortChannelID, RouteEdge]
1637        for private_path in r_tags:
1638            # we need to shift the node pubkey by one towards the destination:
1639            private_path_nodes = [edge[0] for edge in private_path][1:] + [invoice_pubkey]
1640            private_path_rest = [edge[1:] for edge in private_path]
1641            start_node = private_path[0][0]
1642            for end_node, edge_rest in zip(private_path_nodes, private_path_rest):
1643                short_channel_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta = edge_rest
1644                short_channel_id = ShortChannelID(short_channel_id)
1645                # if we have a routing policy for this edge in the db, that takes precedence,
1646                # as it is likely from a previous failure
1647                channel_policy = self.channel_db.get_policy_for_node(
1648                    short_channel_id=short_channel_id,
1649                    node_id=start_node,
1650                    my_channels=scid_to_my_channels)
1651                if channel_policy:
1652                    fee_base_msat = channel_policy.fee_base_msat
1653                    fee_proportional_millionths = channel_policy.fee_proportional_millionths
1654                    cltv_expiry_delta = channel_policy.cltv_expiry_delta
1655                node_info = self.channel_db.get_node_info_for_node_id(node_id=end_node)
1656                route_edge = RouteEdge(
1657                        start_node=start_node,
1658                        end_node=end_node,
1659                        short_channel_id=short_channel_id,
1660                        fee_base_msat=fee_base_msat,
1661                        fee_proportional_millionths=fee_proportional_millionths,
1662                        cltv_expiry_delta=cltv_expiry_delta,
1663                        node_features=node_info.features if node_info else 0)
1664                private_route_edges[route_edge.short_channel_id] = route_edge
1665                start_node = end_node
1666        # now find a route, end to end: between us and the recipient
1667        try:
1668            route = self.network.path_finder.find_route(
1669                nodeA=self.node_keypair.pubkey,
1670                nodeB=invoice_pubkey,
1671                invoice_amount_msat=amount_msat,
1672                path=full_path,
1673                my_channels=scid_to_my_channels,
1674                private_route_edges=private_route_edges)
1675        except NoChannelPolicy as e:
1676            raise NoPathFound() from e
1677        if not route:
1678            raise NoPathFound()
1679        # test sanity
1680        if not is_route_sane_to_use(route, amount_msat, min_cltv_expiry):
1681            self.logger.info(f"rejecting insane route {route}")
1682            raise NoPathFound()
1683        assert len(route) > 0
1684        if route[-1].end_node != invoice_pubkey:
1685            raise LNPathInconsistent("last node_id != invoice pubkey")
1686        # add features from invoice
1687        route[-1].node_features |= invoice_features
1688        return route
1689
1690    def add_request(self, amount_sat, message, expiry) -> str:
1691        coro = self._add_request_coro(amount_sat, message, expiry)
1692        fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
1693        try:
1694            return fut.result(timeout=5)
1695        except concurrent.futures.TimeoutError:
1696            raise Exception(_("add invoice timed out"))
1697
1698    @log_exceptions
1699    async def create_invoice(
1700            self, *,
1701            amount_msat: Optional[int],
1702            message: str,
1703            expiry: int,
1704            write_to_disk: bool = True,
1705    ) -> Tuple[LnAddr, str]:
1706
1707        timestamp = int(time.time())
1708        routing_hints = await self._calc_routing_hints_for_invoice(amount_msat)
1709        if not routing_hints:
1710            self.logger.info(
1711                "Warning. No routing hints added to invoice. "
1712                "Other clients will likely not be able to send to us.")
1713        # if not all hints are trampoline, do not create trampoline invoice
1714        invoice_features = self.features.for_invoice()
1715        trampoline_hints = []
1716        for r in routing_hints:
1717            node_id, short_channel_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta = r[1][0]
1718            if len(r[1])== 1 and self.is_trampoline_peer(node_id):
1719                trampoline_hints.append(('t', (node_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta)))
1720        payment_preimage = os.urandom(32)
1721        payment_hash = sha256(payment_preimage)
1722        info = PaymentInfo(payment_hash, amount_msat, RECEIVED, PR_UNPAID)
1723        amount_btc = amount_msat/Decimal(COIN*1000) if amount_msat else None
1724        if expiry == 0:
1725            expiry = LN_EXPIRY_NEVER
1726        lnaddr = LnAddr(
1727            paymenthash=payment_hash,
1728            amount=amount_btc,
1729            tags=[
1730                ('d', message),
1731                ('c', MIN_FINAL_CLTV_EXPIRY_FOR_INVOICE),
1732                ('x', expiry),
1733                ('9', invoice_features)]
1734            + routing_hints
1735            + trampoline_hints,
1736            date=timestamp,
1737            payment_secret=derive_payment_secret_from_payment_preimage(payment_preimage))
1738        invoice = lnencode(lnaddr, self.node_keypair.privkey)
1739        self.save_preimage(payment_hash, payment_preimage, write_to_disk=False)
1740        self.save_payment_info(info, write_to_disk=False)
1741        if write_to_disk:
1742            self.wallet.save_db()
1743        return lnaddr, invoice
1744
1745    async def _add_request_coro(self, amount_sat: Optional[int], message, expiry: int) -> str:
1746        amount_msat = amount_sat * 1000 if amount_sat is not None else None
1747        lnaddr, invoice = await self.create_invoice(
1748            amount_msat=amount_msat,
1749            message=message,
1750            expiry=expiry,
1751            write_to_disk=False,
1752        )
1753        key = bh2u(lnaddr.paymenthash)
1754        req = LNInvoice.from_bech32(invoice)
1755        self.wallet.add_payment_request(req, write_to_disk=False)
1756        self.wallet.set_label(key, message)
1757        self.wallet.save_db()
1758        return key
1759
1760    def save_preimage(self, payment_hash: bytes, preimage: bytes, *, write_to_disk: bool = True):
1761        assert sha256(preimage) == payment_hash
1762        self.preimages[bh2u(payment_hash)] = bh2u(preimage)
1763        if write_to_disk:
1764            self.wallet.save_db()
1765
1766    def get_preimage(self, payment_hash: bytes) -> Optional[bytes]:
1767        r = self.preimages.get(bh2u(payment_hash))
1768        return bfh(r) if r else None
1769
1770    def get_payment_info(self, payment_hash: bytes) -> Optional[PaymentInfo]:
1771        """returns None if payment_hash is a payment we are forwarding"""
1772        key = payment_hash.hex()
1773        with self.lock:
1774            if key in self.payments:
1775                amount_msat, direction, status = self.payments[key]
1776                return PaymentInfo(payment_hash, amount_msat, direction, status)
1777
1778    def save_payment_info(self, info: PaymentInfo, *, write_to_disk: bool = True) -> None:
1779        key = info.payment_hash.hex()
1780        assert info.status in SAVED_PR_STATUS
1781        with self.lock:
1782            self.payments[key] = info.amount_msat, info.direction, info.status
1783        if write_to_disk:
1784            self.wallet.save_db()
1785
1786    def check_received_mpp_htlc(self, payment_secret, short_channel_id, htlc: UpdateAddHtlc, expected_msat: int) -> Optional[bool]:
1787        """ return MPP status: True (accepted), False (expired) or None """
1788        payment_hash = htlc.payment_hash
1789        is_expired, is_accepted, htlc_set = self.received_mpp_htlcs.get(payment_secret, (False, False, set()))
1790        if self.get_payment_status(payment_hash) == PR_PAID:
1791            # payment_status is persisted
1792            is_accepted = True
1793            is_expired = False
1794        key = (short_channel_id, htlc)
1795        if key not in htlc_set:
1796            htlc_set.add(key)
1797        if not is_accepted and not is_expired:
1798            total = sum([_htlc.amount_msat for scid, _htlc in htlc_set])
1799            first_timestamp = min([_htlc.timestamp for scid, _htlc in htlc_set])
1800            if self.stopping_soon:
1801                is_expired = True  # try to time out pending HTLCs before shutting down
1802            elif time.time() - first_timestamp > self.MPP_EXPIRY:
1803                is_expired = True
1804            elif total == expected_msat:
1805                is_accepted = True
1806        if is_accepted or is_expired:
1807            htlc_set.remove(key)
1808        if len(htlc_set) > 0:
1809            self.received_mpp_htlcs[payment_secret] = is_expired, is_accepted, htlc_set
1810        elif payment_secret in self.received_mpp_htlcs:
1811            self.received_mpp_htlcs.pop(payment_secret)
1812        return True if is_accepted else (False if is_expired else None)
1813
1814    def get_payment_status(self, payment_hash: bytes) -> int:
1815        info = self.get_payment_info(payment_hash)
1816        return info.status if info else PR_UNPAID
1817
1818    def get_invoice_status(self, invoice: LNInvoice) -> int:
1819        key = invoice.rhash
1820        log = self.logs[key]
1821        if key in self.inflight_payments:
1822            return PR_INFLIGHT
1823        # status may be PR_FAILED
1824        status = self.get_payment_status(bfh(key))
1825        if status == PR_UNPAID and log:
1826            status = PR_FAILED
1827        return status
1828
1829    def set_invoice_status(self, key: str, status: int) -> None:
1830        if status == PR_INFLIGHT:
1831            self.inflight_payments.add(key)
1832        elif key in self.inflight_payments:
1833            self.inflight_payments.remove(key)
1834        if status in SAVED_PR_STATUS:
1835            self.set_payment_status(bfh(key), status)
1836        util.trigger_callback('invoice_status', self.wallet, key)
1837
1838    def set_request_status(self, payment_hash: bytes, status: int) -> None:
1839        if self.get_payment_status(payment_hash) != status:
1840            self.set_payment_status(payment_hash, status)
1841            util.trigger_callback('request_status', self.wallet, payment_hash.hex(), status)
1842
1843    def set_payment_status(self, payment_hash: bytes, status: int) -> None:
1844        info = self.get_payment_info(payment_hash)
1845        if info is None:
1846            # if we are forwarding
1847            return
1848        info = info._replace(status=status)
1849        self.save_payment_info(info)
1850
1851    def htlc_fulfilled(self, chan, payment_hash: bytes, htlc_id:int):
1852        util.trigger_callback('htlc_fulfilled', payment_hash, chan.channel_id)
1853        q = self.sent_htlcs.get(payment_hash)
1854        if q:
1855            route, payment_secret, amount_msat, bucket_msat, amount_receiver_msat = self.sent_htlcs_routes[(payment_hash, chan.short_channel_id, htlc_id)]
1856            htlc_log = HtlcLog(
1857                success=True,
1858                route=route,
1859                amount_msat=amount_receiver_msat)
1860            q.put_nowait(htlc_log)
1861        else:
1862            key = payment_hash.hex()
1863            self.set_invoice_status(key, PR_PAID)
1864            util.trigger_callback('payment_succeeded', self.wallet, key)
1865
1866    def htlc_failed(
1867            self,
1868            chan: Channel,
1869            payment_hash: bytes,
1870            htlc_id: int,
1871            error_bytes: Optional[bytes],
1872            failure_message: Optional['OnionRoutingFailure']):
1873
1874        util.trigger_callback('htlc_failed', payment_hash, chan.channel_id)
1875        q = self.sent_htlcs.get(payment_hash)
1876        if q:
1877            # detect if it is part of a bucket
1878            # if yes, wait until the bucket completely failed
1879            key = (payment_hash, chan.short_channel_id, htlc_id)
1880            route, payment_secret, amount_msat, bucket_msat, amount_receiver_msat = self.sent_htlcs_routes[key]
1881            if error_bytes:
1882                # TODO "decode_onion_error" might raise, catch and maybe blacklist/penalise someone?
1883                try:
1884                    failure_message, sender_idx = chan.decode_onion_error(error_bytes, route, htlc_id)
1885                except Exception as e:
1886                    sender_idx = None
1887                    failure_message = OnionRoutingFailure(-1, str(e))
1888            else:
1889                # probably got "update_fail_malformed_htlc". well... who to penalise now?
1890                assert failure_message is not None
1891                sender_idx = None
1892            self.logger.info(f"htlc_failed {failure_message}")
1893
1894            # check sent_buckets if we use trampoline
1895            if not self.channel_db and payment_secret in self.sent_buckets:
1896                amount_sent, amount_failed = self.sent_buckets[payment_secret]
1897                amount_failed += amount_receiver_msat
1898                self.sent_buckets[payment_secret] = amount_sent, amount_failed
1899                if amount_sent != amount_failed:
1900                    self.logger.info('bucket still active...')
1901                    return
1902                self.logger.info('bucket failed')
1903                amount_receiver_msat = amount_sent
1904
1905            htlc_log = HtlcLog(
1906                success=False,
1907                route=route,
1908                amount_msat=amount_receiver_msat,
1909                error_bytes=error_bytes,
1910                failure_msg=failure_message,
1911                sender_idx=sender_idx)
1912            q.put_nowait(htlc_log)
1913        else:
1914            self.logger.info(f"received unknown htlc_failed, probably from previous session")
1915            key = payment_hash.hex()
1916            self.set_invoice_status(key, PR_UNPAID)
1917            util.trigger_callback('payment_failed', self.wallet, key, '')
1918
1919    async def _calc_routing_hints_for_invoice(self, amount_msat: Optional[int]):
1920        """calculate routing hints (BOLT-11 'r' field)"""
1921        routing_hints = []
1922        channels = list(self.channels.values())
1923        # do minimal filtering of channels.
1924        # we include channels that cannot *right now* receive (e.g. peer disconnected or balance insufficient)
1925        channels = [chan for chan in channels
1926                    if (chan.is_open() and not chan.is_frozen_for_receiving())]
1927        # Filter out channels that have very low receive capacity compared to invoice amt.
1928        # Even with MPP, below a certain threshold, including these channels probably
1929        # hurts more than help, as they lead to many failed attempts for the sender.
1930        channels = [chan for chan in channels
1931                    if chan.available_to_spend(REMOTE) > (amount_msat or 0) * 0.05]
1932        # cap max channels to include to keep QR code reasonably scannable
1933        channels = sorted(channels, key=lambda chan: (not chan.is_active(), -chan.available_to_spend(REMOTE)))
1934        channels = channels[:15]
1935        random.shuffle(channels)  # let's not leak channel order
1936        scid_to_my_channels = {chan.short_channel_id: chan for chan in channels
1937                               if chan.short_channel_id is not None}
1938        for chan in channels:
1939            chan_id = chan.short_channel_id
1940            assert isinstance(chan_id, bytes), chan_id
1941            channel_info = get_mychannel_info(chan_id, scid_to_my_channels)
1942            # note: as a fallback, if we don't have a channel update for the
1943            # incoming direction of our private channel, we fill the invoice with garbage.
1944            # the sender should still be able to pay us, but will incur an extra round trip
1945            # (they will get the channel update from the onion error)
1946            # at least, that's the theory. https://github.com/lightningnetwork/lnd/issues/2066
1947            fee_base_msat = fee_proportional_millionths = 0
1948            cltv_expiry_delta = 1  # lnd won't even try with zero
1949            missing_info = True
1950            if channel_info:
1951                policy = get_mychannel_policy(channel_info.short_channel_id, chan.node_id, scid_to_my_channels)
1952                if policy:
1953                    fee_base_msat = policy.fee_base_msat
1954                    fee_proportional_millionths = policy.fee_proportional_millionths
1955                    cltv_expiry_delta = policy.cltv_expiry_delta
1956                    missing_info = False
1957            if missing_info:
1958                self.logger.info(
1959                    f"Warning. Missing channel update for our channel {chan_id}; "
1960                    f"filling invoice with incorrect data.")
1961            routing_hints.append(('r', [(
1962                chan.node_id,
1963                chan_id,
1964                fee_base_msat,
1965                fee_proportional_millionths,
1966                cltv_expiry_delta)]))
1967        return routing_hints
1968
1969    def delete_payment(self, payment_hash_hex: str):
1970        try:
1971            with self.lock:
1972                del self.payments[payment_hash_hex]
1973        except KeyError:
1974            return
1975        self.wallet.save_db()
1976
1977    def get_balance(self):
1978        with self.lock:
1979            return Decimal(sum(
1980                chan.balance(LOCAL) if not chan.is_closed() else 0
1981                for chan in self.channels.values())) / 1000
1982
1983    def num_sats_can_send(self) -> Decimal:
1984        can_send = 0
1985        with self.lock:
1986            if self.channels:
1987                for c in self.channels.values():
1988                    if c.is_active() and not c.is_frozen_for_sending():
1989                        can_send += c.available_to_spend(LOCAL)
1990        # Here we have to guess a fee, because some callers (submarine swaps)
1991        # use this method to initiate a payment, which would otherwise fail.
1992        fee_base_msat = TRAMPOLINE_FEES[3]['fee_base_msat']
1993        fee_proportional_millionths = TRAMPOLINE_FEES[3]['fee_proportional_millionths']
1994        # inverse of fee_for_edge_msat
1995        can_send_minus_fees = (can_send - fee_base_msat) * 1_000_000 // ( 1_000_000 + fee_proportional_millionths)
1996        can_send_minus_fees = max(0, can_send_minus_fees)
1997        return Decimal(can_send_minus_fees) / 1000
1998
1999    def num_sats_can_receive(self) -> Decimal:
2000        with self.lock:
2001            channels = [
2002                c for c in self.channels.values()
2003                if c.is_active() and not c.is_frozen_for_receiving()
2004            ]
2005            can_receive = sum([c.available_to_spend(REMOTE) for c in channels]) if channels else 0
2006        return Decimal(can_receive) / 1000
2007
2008    def num_sats_can_receive_no_mpp(self) -> Decimal:
2009        with self.lock:
2010            channels = [
2011                c for c in self.channels.values()
2012                if c.is_active() and not c.is_frozen_for_receiving()
2013            ]
2014            can_receive = max([c.available_to_spend(REMOTE) for c in channels]) if channels else 0
2015        return Decimal(can_receive) / 1000
2016
2017    def can_pay_invoice(self, invoice: LNInvoice) -> bool:
2018        return invoice.get_amount_sat() <= self.num_sats_can_send()
2019
2020    def can_receive_invoice(self, invoice: LNInvoice) -> bool:
2021        return invoice.get_amount_sat() <= self.num_sats_can_receive()
2022
2023    async def close_channel(self, chan_id):
2024        chan = self._channels[chan_id]
2025        peer = self._peers[chan.node_id]
2026        return await peer.close_channel(chan_id)
2027
2028    async def force_close_channel(self, chan_id):
2029        # returns txid or raises
2030        chan = self._channels[chan_id]
2031        tx = chan.force_close_tx()
2032        await self.network.broadcast_transaction(tx)
2033        chan.set_state(ChannelState.FORCE_CLOSING)
2034        return tx.txid()
2035
2036    async def try_force_closing(self, chan_id):
2037        # fails silently but sets the state, so that we will retry later
2038        chan = self._channels[chan_id]
2039        tx = chan.force_close_tx()
2040        chan.set_state(ChannelState.FORCE_CLOSING)
2041        await self.network.try_broadcasting(tx, 'force-close')
2042
2043    def remove_channel(self, chan_id):
2044        chan = self.channels[chan_id]
2045        assert chan.can_be_deleted()
2046        with self.lock:
2047            self._channels.pop(chan_id)
2048            self.db.get('channels').pop(chan_id.hex())
2049        for addr in chan.get_wallet_addresses_channel_might_want_reserved():
2050            self.wallet.set_reserved_state_of_address(addr, reserved=False)
2051
2052        util.trigger_callback('channels_updated', self.wallet)
2053        util.trigger_callback('wallet_updated', self.wallet)
2054
2055    @ignore_exceptions
2056    @log_exceptions
2057    async def reestablish_peer_for_given_channel(self, chan: Channel) -> None:
2058        now = time.time()
2059        peer_addresses = []
2060        if not self.channel_db:
2061            addr = trampolines_by_id().get(chan.node_id)
2062            if addr:
2063                peer_addresses.append(addr)
2064        else:
2065            # will try last good address first, from gossip
2066            last_good_addr = self.channel_db.get_last_good_address(chan.node_id)
2067            if last_good_addr:
2068                peer_addresses.append(last_good_addr)
2069            # will try addresses for node_id from gossip
2070            addrs_from_gossip = self.channel_db.get_node_addresses(chan.node_id) or []
2071            for host, port, ts in addrs_from_gossip:
2072                peer_addresses.append(LNPeerAddr(host, port, chan.node_id))
2073        # will try addresses stored in channel storage
2074        peer_addresses += list(chan.get_peer_addresses())
2075        # Done gathering addresses.
2076        # Now select first one that has not failed recently.
2077        for peer in peer_addresses:
2078            if self._can_retry_addr(peer, urgent=True, now=now):
2079                await self._add_peer(peer.host, peer.port, peer.pubkey)
2080                return
2081
2082    async def reestablish_peers_and_channels(self):
2083        while True:
2084            await asyncio.sleep(1)
2085            if self.stopping_soon:
2086                return
2087            for chan in self.channels.values():
2088                if chan.is_closed():
2089                    continue
2090                # reestablish
2091                if not chan.should_try_to_reestablish_peer():
2092                    continue
2093                peer = self._peers.get(chan.node_id, None)
2094                if peer:
2095                    await peer.taskgroup.spawn(peer.reestablish_channel(chan))
2096                else:
2097                    await self.taskgroup.spawn(self.reestablish_peer_for_given_channel(chan))
2098
2099    def current_feerate_per_kw(self):
2100        from .simple_config import FEE_LN_ETA_TARGET, FEERATE_FALLBACK_STATIC_FEE, FEERATE_REGTEST_HARDCODED
2101        if constants.net is constants.BitcoinRegtest:
2102            return FEERATE_REGTEST_HARDCODED // 4
2103        feerate_per_kvbyte = self.network.config.eta_target_to_fee(FEE_LN_ETA_TARGET)
2104        if feerate_per_kvbyte is None:
2105            feerate_per_kvbyte = FEERATE_FALLBACK_STATIC_FEE
2106        return max(253, feerate_per_kvbyte // 4)
2107
2108    def create_channel_backup(self, channel_id):
2109        chan = self._channels[channel_id]
2110        # do not backup old-style channels
2111        assert chan.is_static_remotekey_enabled()
2112        peer_addresses = list(chan.get_peer_addresses())
2113        peer_addr = peer_addresses[0]
2114        return ImportedChannelBackupStorage(
2115            node_id = chan.node_id,
2116            privkey = self.node_keypair.privkey,
2117            funding_txid = chan.funding_outpoint.txid,
2118            funding_index = chan.funding_outpoint.output_index,
2119            funding_address = chan.get_funding_address(),
2120            host = peer_addr.host,
2121            port = peer_addr.port,
2122            is_initiator = chan.constraints.is_initiator,
2123            channel_seed = chan.config[LOCAL].channel_seed,
2124            local_delay = chan.config[LOCAL].to_self_delay,
2125            remote_delay = chan.config[REMOTE].to_self_delay,
2126            remote_revocation_pubkey = chan.config[REMOTE].revocation_basepoint.pubkey,
2127            remote_payment_pubkey = chan.config[REMOTE].payment_basepoint.pubkey)
2128
2129    def export_channel_backup(self, channel_id):
2130        xpub = self.wallet.get_fingerprint()
2131        backup_bytes = self.create_channel_backup(channel_id).to_bytes()
2132        assert backup_bytes == ImportedChannelBackupStorage.from_bytes(backup_bytes).to_bytes(), "roundtrip failed"
2133        encrypted = pw_encode_with_version_and_mac(backup_bytes, xpub)
2134        assert backup_bytes == pw_decode_with_version_and_mac(encrypted, xpub), "encrypt failed"
2135        return 'channel_backup:' + encrypted
2136
2137    async def request_force_close(self, channel_id: bytes, *, connect_str=None) -> None:
2138        if channel_id in self.channels:
2139            chan = self.channels[channel_id]
2140            peer = self._peers.get(chan.node_id)
2141            if not peer:
2142                raise Exception('Peer not found')
2143            chan.should_request_force_close = True
2144            peer.close_and_cleanup()
2145        elif connect_str:
2146            peer = await self.add_peer(connect_str)
2147            await peer.trigger_force_close(channel_id)
2148        elif channel_id in self.channel_backups:
2149            await self._request_force_close_from_backup(channel_id)
2150        else:
2151            raise Exception(f'Unknown channel {channel_id.hex()}')
2152
2153    def import_channel_backup(self, data):
2154        assert data.startswith('channel_backup:')
2155        encrypted = data[15:]
2156        xpub = self.wallet.get_fingerprint()
2157        decrypted = pw_decode_with_version_and_mac(encrypted, xpub)
2158        cb_storage = ImportedChannelBackupStorage.from_bytes(decrypted)
2159        channel_id = cb_storage.channel_id()
2160        if channel_id.hex() in self.db.get_dict("channels"):
2161            raise Exception('Channel already in wallet')
2162        self.logger.info(f'importing channel backup: {channel_id.hex()}')
2163        d = self.db.get_dict("imported_channel_backups")
2164        d[channel_id.hex()] = cb_storage
2165        with self.lock:
2166            cb = ChannelBackup(cb_storage, sweep_address=self.sweep_address, lnworker=self)
2167            self._channel_backups[channel_id] = cb
2168        self.wallet.save_db()
2169        util.trigger_callback('channels_updated', self.wallet)
2170        self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address())
2171
2172    def has_conflicting_backup_with(self, remote_node_id: bytes):
2173        """ Returns whether we have an active channel with this node on another device, using same local node id. """
2174        channel_backup_peers = [
2175            cb.node_id for cb in self.channel_backups.values()
2176            if (not cb.is_closed() and cb.get_local_pubkey() == self.node_keypair.pubkey)]
2177        return any(remote_node_id.startswith(cb_peer_nodeid) for cb_peer_nodeid in channel_backup_peers)
2178
2179    def remove_channel_backup(self, channel_id):
2180        chan = self.channel_backups[channel_id]
2181        assert chan.can_be_deleted()
2182        onchain_backups = self.db.get_dict("onchain_channel_backups")
2183        imported_backups = self.db.get_dict("imported_channel_backups")
2184        if channel_id.hex() in onchain_backups:
2185            onchain_backups.pop(channel_id.hex())
2186        elif channel_id.hex() in imported_backups:
2187            imported_backups.pop(channel_id.hex())
2188        else:
2189            raise Exception('Channel not found')
2190        with self.lock:
2191            self._channel_backups.pop(channel_id)
2192        self.wallet.save_db()
2193        util.trigger_callback('channels_updated', self.wallet)
2194
2195    @log_exceptions
2196    async def _request_force_close_from_backup(self, channel_id: bytes):
2197        cb = self.channel_backups.get(channel_id)
2198        if not cb:
2199            raise Exception(f'channel backup not found {self.channel_backups}')
2200        cb = cb.cb # storage
2201        self.logger.info(f'requesting channel force close: {channel_id.hex()}')
2202        if isinstance(cb, ImportedChannelBackupStorage):
2203            node_id = cb.node_id
2204            privkey = cb.privkey
2205            addresses = [(cb.host, cb.port, 0)]
2206            # TODO also try network addresses from gossip db (as it might have changed)
2207        else:
2208            assert isinstance(cb, OnchainChannelBackupStorage)
2209            if not self.channel_db:
2210                raise Exception('Enable gossip first')
2211            node_id = self.network.channel_db.get_node_by_prefix(cb.node_id_prefix)
2212            privkey = self.node_keypair.privkey
2213            addresses = self.network.channel_db.get_node_addresses(node_id)
2214            if not addresses:
2215                raise Exception('Peer not found in gossip database')
2216        for host, port, timestamp in addresses:
2217            peer_addr = LNPeerAddr(host, port, node_id)
2218            transport = LNTransport(privkey, peer_addr, proxy=self.network.proxy)
2219            peer = Peer(self, node_id, transport, is_channel_backup=True)
2220            try:
2221                async with TaskGroup(wait=any) as group:
2222                    await group.spawn(peer._message_loop())
2223                    await group.spawn(peer.trigger_force_close(channel_id))
2224                return
2225            except Exception as e:
2226                self.logger.info(f'failed to connect {host} {e}')
2227                continue
2228            # TODO close/cleanup the transport
2229        else:
2230            raise Exception('failed to connect')
2231
2232    def maybe_add_backup_from_tx(self, tx):
2233        funding_address = None
2234        node_id_prefix = None
2235        for i, o in enumerate(tx.outputs()):
2236            script_type = get_script_type_from_output_script(o.scriptpubkey)
2237            if script_type == 'p2wsh':
2238                funding_index = i
2239                funding_address = o.address
2240                for o2 in tx.outputs():
2241                    if o2.scriptpubkey.startswith(bytes([opcodes.OP_RETURN])):
2242                        encrypted_data = o2.scriptpubkey[2:]
2243                        data = self.decrypt_cb_data(encrypted_data, funding_address)
2244                        if data.startswith(CB_MAGIC_BYTES):
2245                            node_id_prefix = data[4:]
2246        if node_id_prefix is None:
2247            return
2248        funding_txid = tx.txid()
2249        cb_storage = OnchainChannelBackupStorage(
2250            node_id_prefix = node_id_prefix,
2251            funding_txid = funding_txid,
2252            funding_index = funding_index,
2253            funding_address = funding_address,
2254            is_initiator = True)
2255        channel_id = cb_storage.channel_id().hex()
2256        if channel_id in self.db.get_dict("channels"):
2257            return
2258        self.logger.info(f"adding backup from tx")
2259        d = self.db.get_dict("onchain_channel_backups")
2260        d[channel_id] = cb_storage
2261        cb = ChannelBackup(cb_storage, sweep_address=self.sweep_address, lnworker=self)
2262        self.wallet.save_db()
2263        with self.lock:
2264            self._channel_backups[bfh(channel_id)] = cb
2265        util.trigger_callback('channels_updated', self.wallet)
2266        self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address())
2267