1# Copyright (C) 2018 The Electrum developers 2# Distributed under the MIT software license, see the accompanying 3# file LICENCE or http://www.opensource.org/licenses/mit-license.php 4 5import asyncio 6import os 7from decimal import Decimal 8import random 9import time 10from typing import (Optional, Sequence, Tuple, List, Set, Dict, TYPE_CHECKING, 11 NamedTuple, Union, Mapping, Any, Iterable, AsyncGenerator) 12import threading 13import socket 14import aiohttp 15import json 16from datetime import datetime, timezone 17from functools import partial 18from collections import defaultdict 19import concurrent 20from concurrent import futures 21import urllib.parse 22 23import dns.resolver 24import dns.exception 25from aiorpcx import run_in_thread, TaskGroup, NetAddress, ignore_after 26 27from . import constants, util 28from . import keystore 29from .util import profiler, chunks 30from .invoices import PR_TYPE_LN, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LNInvoice, LN_EXPIRY_NEVER 31from .util import NetworkRetryManager, JsonRPCClient 32from .lnutil import LN_MAX_FUNDING_SAT 33from .keystore import BIP32_KeyStore 34from .bitcoin import COIN 35from .bitcoin import opcodes, make_op_return, address_to_scripthash 36from .transaction import Transaction 37from .transaction import get_script_type_from_output_script 38from .crypto import sha256 39from .bip32 import BIP32Node 40from .util import bh2u, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions 41from .crypto import chacha20_encrypt, chacha20_decrypt 42from .util import ignore_exceptions, make_aiohttp_session, SilentTaskGroup 43from .util import timestamp_to_datetime, random_shuffled_copy 44from .util import MyEncoder, is_private_netaddress 45from .logging import Logger 46from .lntransport import LNTransport, LNResponderTransport, LNTransportBase 47from .lnpeer import Peer, LN_P2P_NETWORK_TIMEOUT 48from .lnaddr import lnencode, LnAddr, lndecode 49from .ecc import der_sig_from_sig_string 50from .lnchannel import Channel, AbstractChannel 51from .lnchannel import ChannelState, PeerState, HTLCWithStatus 52from .lnrater import LNRater 53from . import lnutil 54from .lnutil import funding_output_script 55from .bitcoin import redeem_script_to_address 56from .lnutil import (Outpoint, LNPeerAddr, 57 get_compressed_pubkey_from_bech32, extract_nodeid, 58 PaymentFailure, split_host_port, ConnStringFormatError, 59 generate_keypair, LnKeyFamily, LOCAL, REMOTE, 60 MIN_FINAL_CLTV_EXPIRY_FOR_INVOICE, 61 NUM_MAX_EDGES_IN_PAYMENT_PATH, SENT, RECEIVED, HTLCOwner, 62 UpdateAddHtlc, Direction, LnFeatures, ShortChannelID, 63 HtlcLog, derive_payment_secret_from_payment_preimage, 64 NoPathFound, InvalidGossipMsg) 65from .lnutil import ln_dummy_address, ln_compare_features, IncompatibleLightningFeatures 66from .transaction import PartialTxOutput, PartialTransaction, PartialTxInput 67from .lnonion import OnionFailureCode, OnionRoutingFailure 68from .lnmsg import decode_msg 69from .i18n import _ 70from .lnrouter import (RouteEdge, LNPaymentRoute, LNPaymentPath, is_route_sane_to_use, 71 NoChannelPolicy, LNPathInconsistent) 72from .address_synchronizer import TX_HEIGHT_LOCAL 73from . import lnsweep 74from .lnwatcher import LNWalletWatcher 75from .crypto import pw_encode_with_version_and_mac, pw_decode_with_version_and_mac 76from .lnutil import ImportedChannelBackupStorage, OnchainChannelBackupStorage 77from .lnchannel import ChannelBackup 78from .channel_db import UpdateStatus 79from .channel_db import get_mychannel_info, get_mychannel_policy 80from .submarine_swaps import SwapManager 81from .channel_db import ChannelInfo, Policy 82from .mpp_split import suggest_splits 83from .trampoline import create_trampoline_route_and_onion, TRAMPOLINE_FEES 84 85if TYPE_CHECKING: 86 from .network import Network 87 from .wallet import Abstract_Wallet 88 from .channel_db import ChannelDB 89 from .simple_config import SimpleConfig 90 91 92SAVED_PR_STATUS = [PR_PAID, PR_UNPAID] # status that are persisted 93 94 95NUM_PEERS_TARGET = 4 96 97# onchain channel backup data 98CB_VERSION = 0 99CB_MAGIC_BYTES = bytes([0, 0, 0, CB_VERSION]) 100 101 102FALLBACK_NODE_LIST_TESTNET = ( 103 LNPeerAddr(host='203.132.95.10', port=9735, pubkey=bfh('038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9')), 104 LNPeerAddr(host='2401:d002:4402:0:bf1d:986a:7598:6d49', port=9735, pubkey=bfh('038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9')), 105 LNPeerAddr(host='50.116.3.223', port=9734, pubkey=bfh('03236a685d30096b26692dce0cf0fa7c8528bdf61dbf5363a3ef6d5c92733a3016')), 106 LNPeerAddr(host='3.16.119.191', port=9735, pubkey=bfh('03d5e17a3c213fe490e1b0c389f8cfcfcea08a29717d50a9f453735e0ab2a7c003')), 107 LNPeerAddr(host='34.250.234.192', port=9735, pubkey=bfh('03933884aaf1d6b108397e5efe5c86bcf2d8ca8d2f700eda99db9214fc2712b134')), 108 LNPeerAddr(host='88.99.209.230', port=9735, pubkey=bfh('0260d9119979caedc570ada883ff614c6efb93f7f7382e25d73ecbeba0b62df2d7')), 109 LNPeerAddr(host='160.16.233.215', port=9735, pubkey=bfh('023ea0a53af875580899da0ab0a21455d9c19160c4ea1b7774c9d4be6810b02d2c')), 110 LNPeerAddr(host='197.155.6.173', port=9735, pubkey=bfh('0269a94e8b32c005e4336bfb743c08a6e9beb13d940d57c479d95c8e687ccbdb9f')), 111 LNPeerAddr(host='2c0f:fb18:406::4', port=9735, pubkey=bfh('0269a94e8b32c005e4336bfb743c08a6e9beb13d940d57c479d95c8e687ccbdb9f')), 112 LNPeerAddr(host='163.172.94.64', port=9735, pubkey=bfh('030f0bf260acdbd3edcad84d7588ec7c5df4711e87e6a23016f989b8d3a4147230')), 113 LNPeerAddr(host='23.237.77.12', port=9735, pubkey=bfh('02312627fdf07fbdd7e5ddb136611bdde9b00d26821d14d94891395452f67af248')), 114 LNPeerAddr(host='197.155.6.172', port=9735, pubkey=bfh('02ae2f22b02375e3e9b4b4a2db4f12e1b50752b4062dbefd6e01332acdaf680379')), 115 LNPeerAddr(host='2c0f:fb18:406::3', port=9735, pubkey=bfh('02ae2f22b02375e3e9b4b4a2db4f12e1b50752b4062dbefd6e01332acdaf680379')), 116 LNPeerAddr(host='23.239.23.44', port=9740, pubkey=bfh('034fe52e98a0e9d3c21b767e1b371881265d8c7578c21f5afd6d6438da10348b36')), 117 LNPeerAddr(host='2600:3c01::f03c:91ff:fe05:349c', port=9740, pubkey=bfh('034fe52e98a0e9d3c21b767e1b371881265d8c7578c21f5afd6d6438da10348b36')), 118) 119 120FALLBACK_NODE_LIST_MAINNET = [ 121 LNPeerAddr(host='172.81.181.3', port=9735, pubkey=bfh('0214382bdce7750dfcb8126df8e2b12de38536902dc36abcebdaeefdeca1df8284')), 122 LNPeerAddr(host='35.230.100.60', port=9735, pubkey=bfh('023f5e3582716bed96f6f26cfcd8037e07474d7b4743afdc8b07e692df63464d7e')), 123 LNPeerAddr(host='40.69.71.114', port=9735, pubkey=bfh('028303182c9885da93b3b25c9621d22cf34475e63c123942e402ab530c0556e675')), 124 LNPeerAddr(host='94.177.171.73', port=9735, pubkey=bfh('0276e09a267592e7451a939c932cf685f0754de382a3ca85d2fb3a864d4c365ad5')), 125 LNPeerAddr(host='34.236.113.58', port=9735, pubkey=bfh('02fa50c72ee1e2eb5f1b6d9c3032080c4c864373c4201dfa2966aa34eee1051f97')), 126 LNPeerAddr(host='52.50.244.44', port=9735, pubkey=bfh('030c3f19d742ca294a55c00376b3b355c3c90d61c6b6b39554dbc7ac19b141c14f')), 127 LNPeerAddr(host='157.245.68.47', port=9735, pubkey=bfh('03c2abfa93eacec04721c019644584424aab2ba4dff3ac9bdab4e9c97007491dda')), 128 LNPeerAddr(host='18.221.23.28', port=9735, pubkey=bfh('03abf6f44c355dec0d5aa155bdbdd6e0c8fefe318eff402de65c6eb2e1be55dc3e')), 129 LNPeerAddr(host='52.224.178.244', port=9735, pubkey=bfh('026b105ac13212c48714c6be9b11577a9ce10f10e1c88a45ce217e6331209faf8b')), 130 LNPeerAddr(host='34.239.230.56', port=9735, pubkey=bfh('03864ef025fde8fb587d989186ce6a4a186895ee44a926bfc370e2c366597a3f8f')), 131 LNPeerAddr(host='46.229.165.136', port=9735, pubkey=bfh('0390b5d4492dc2f5318e5233ab2cebf6d48914881a33ef6a9c6bcdbb433ad986d0')), 132 LNPeerAddr(host='157.230.28.160', port=9735, pubkey=bfh('0279c22ed7a068d10dc1a38ae66d2d6461e269226c60258c021b1ddcdfe4b00bc4')), 133 LNPeerAddr(host='74.108.13.152', port=9735, pubkey=bfh('0331f80652fb840239df8dc99205792bba2e559a05469915804c08420230e23c7c')), 134 LNPeerAddr(host='167.172.44.148', port=9735, pubkey=bfh('0395033b252c6f40e3756984162d68174e2bd8060a129c0d3462a9370471c6d28f')), 135 LNPeerAddr(host='138.68.14.104', port=9735, pubkey=bfh('03bb88ccc444534da7b5b64b4f7b15e1eccb18e102db0e400d4b9cfe93763aa26d')), 136 LNPeerAddr(host='3.124.63.44', port=9735, pubkey=bfh('0242a4ae0c5bef18048fbecf995094b74bfb0f7391418d71ed394784373f41e4f3')), 137 LNPeerAddr(host='2001:470:8:2e1::43', port=9735, pubkey=bfh('03baa70886d9200af0ffbd3f9e18d96008331c858456b16e3a9b41e735c6208fef')), 138 LNPeerAddr(host='2601:186:c100:6bcd:219:d1ff:fe75:dc2f', port=9735, pubkey=bfh('0298f6074a454a1f5345cb2a7c6f9fce206cd0bf675d177cdbf0ca7508dd28852f')), 139 LNPeerAddr(host='2001:41d0:e:734::1', port=9735, pubkey=bfh('03a503d8e30f2ff407096d235b5db63b4fcf3f89a653acb6f43d3fc492a7674019')), 140 LNPeerAddr(host='2a01:4f9:2b:2254::2', port=9735, pubkey=bfh('02f3069a342ae2883a6f29e275f06f28a56a6ea2e2d96f5888a3266444dcf542b6')), 141 LNPeerAddr(host='2a02:8070:24c1:100:528c:2997:6dbc:a054', port=9735, pubkey=bfh('02a45def9ae014fdd2603dd7033d157faa3a55a72b06a63ae22ef46d9fafdc6e8d')), 142 LNPeerAddr(host='2600:3c01::f03c:91ff:fe05:349c', port=9736, pubkey=bfh('02731b798b39a09f9f14e90ee601afb6ebb796d6e5797de14582a978770b33700f')), 143 LNPeerAddr(host='2a00:8a60:e012:a00::21', port=9735, pubkey=bfh('027ce055380348d7812d2ae7745701c9f93e70c1adeb2657f053f91df4f2843c71')), 144 LNPeerAddr(host='2604:a880:400:d1::8bd:1001', port=9735, pubkey=bfh('03649c72a4816f0cd546f84aafbd657e92a30ab474de7ab795e8b5650a427611f7')), 145 LNPeerAddr(host='2a01:4f8:c0c:7b31::1', port=9735, pubkey=bfh('02c16cca44562b590dd279c942200bdccfd4f990c3a69fad620c10ef2f8228eaff')), 146 LNPeerAddr(host='2001:41d0:1:b40d::1', port=9735, pubkey=bfh('026726a4b043d413b45b334876d17b8a98848129604429ec65532ba286a42efeac')), 147] 148 149 150from .trampoline import trampolines_by_id, hardcoded_trampoline_nodes, is_hardcoded_trampoline 151 152 153class PaymentInfo(NamedTuple): 154 payment_hash: bytes 155 amount_msat: Optional[int] 156 direction: int 157 status: int 158 159 160class ErrorAddingPeer(Exception): pass 161 162 163# set some feature flags as baseline for both LNWallet and LNGossip 164# note that e.g. DATA_LOSS_PROTECT is needed for LNGossip as many peers require it 165BASE_FEATURES = LnFeatures(0)\ 166 | LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT\ 167 | LnFeatures.OPTION_STATIC_REMOTEKEY_OPT\ 168 | LnFeatures.VAR_ONION_OPT\ 169 | LnFeatures.PAYMENT_SECRET_OPT\ 170 | LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT 171 172# we do not want to receive unrequested gossip (see lnpeer.maybe_save_remote_update) 173LNWALLET_FEATURES = BASE_FEATURES\ 174 | LnFeatures.OPTION_DATA_LOSS_PROTECT_REQ\ 175 | LnFeatures.OPTION_STATIC_REMOTEKEY_REQ\ 176 | LnFeatures.GOSSIP_QUERIES_REQ\ 177 | LnFeatures.BASIC_MPP_OPT\ 178 | LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT 179 180LNGOSSIP_FEATURES = BASE_FEATURES\ 181 | LnFeatures.GOSSIP_QUERIES_OPT\ 182 | LnFeatures.GOSSIP_QUERIES_REQ 183 184 185class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]): 186 187 INITIAL_TRAMPOLINE_FEE_LEVEL = 1 # only used for trampoline payments. set to 0 in tests. 188 189 def __init__(self, xprv, features: LnFeatures): 190 Logger.__init__(self) 191 NetworkRetryManager.__init__( 192 self, 193 max_retry_delay_normal=3600, 194 init_retry_delay_normal=600, 195 max_retry_delay_urgent=300, 196 init_retry_delay_urgent=4, 197 ) 198 self.lock = threading.RLock() 199 self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY) 200 self.backup_key = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.BACKUP_CIPHER).privkey 201 self._peers = {} # type: Dict[bytes, Peer] # pubkey -> Peer # needs self.lock 202 self.taskgroup = SilentTaskGroup() 203 self.listen_server = None # type: Optional[asyncio.AbstractServer] 204 self.features = features 205 self.network = None # type: Optional[Network] 206 self.config = None # type: Optional[SimpleConfig] 207 self.stopping_soon = False # whether we are being shut down 208 209 util.register_callback(self.on_proxy_changed, ['proxy_set']) 210 211 @property 212 def channel_db(self): 213 return self.network.channel_db if self.network else None 214 215 @property 216 def peers(self) -> Mapping[bytes, Peer]: 217 """Returns a read-only copy of peers.""" 218 with self.lock: 219 return self._peers.copy() 220 221 def channels_for_peer(self, node_id: bytes) -> Dict[bytes, Channel]: 222 return {} 223 224 def get_node_alias(self, node_id: bytes) -> Optional[str]: 225 """Returns the alias of the node, or None if unknown.""" 226 node_alias = None 227 if self.channel_db: 228 node_info = self.channel_db.get_node_info_for_node_id(node_id) 229 if node_info: 230 node_alias = node_info.alias 231 else: 232 for k, v in hardcoded_trampoline_nodes().items(): 233 if v.pubkey == node_id: 234 node_alias = k 235 break 236 return node_alias 237 238 async def maybe_listen(self): 239 # FIXME: only one LNWorker can listen at a time (single port) 240 listen_addr = self.config.get('lightning_listen') 241 if listen_addr: 242 self.logger.info(f'lightning_listen enabled. will try to bind: {listen_addr!r}') 243 try: 244 netaddr = NetAddress.from_string(listen_addr) 245 except Exception as e: 246 self.logger.error(f"failed to parse config key 'lightning_listen'. got: {e!r}") 247 return 248 addr = str(netaddr.host) 249 async def cb(reader, writer): 250 transport = LNResponderTransport(self.node_keypair.privkey, reader, writer) 251 try: 252 node_id = await transport.handshake() 253 except Exception as e: 254 self.logger.info(f'handshake failure from incoming connection: {e!r}') 255 return 256 await self._add_peer_from_transport(node_id=node_id, transport=transport) 257 try: 258 self.listen_server = await asyncio.start_server(cb, addr, netaddr.port) 259 except OSError as e: 260 self.logger.error(f"cannot listen for lightning p2p. error: {e!r}") 261 262 @ignore_exceptions # don't kill outer taskgroup 263 async def main_loop(self): 264 self.logger.info("starting taskgroup.") 265 try: 266 async with self.taskgroup as group: 267 await group.spawn(self._maintain_connectivity()) 268 except asyncio.CancelledError: 269 raise 270 except Exception as e: 271 self.logger.exception("taskgroup died.") 272 finally: 273 self.logger.info("taskgroup stopped.") 274 275 async def _maintain_connectivity(self): 276 while True: 277 await asyncio.sleep(1) 278 if self.stopping_soon: 279 return 280 now = time.time() 281 if len(self._peers) >= NUM_PEERS_TARGET: 282 continue 283 peers = await self._get_next_peers_to_try() 284 for peer in peers: 285 if self._can_retry_addr(peer, now=now): 286 try: 287 await self._add_peer(peer.host, peer.port, peer.pubkey) 288 except ErrorAddingPeer as e: 289 self.logger.info(f"failed to add peer: {peer}. exc: {e!r}") 290 291 async def _add_peer(self, host: str, port: int, node_id: bytes) -> Peer: 292 if node_id in self._peers: 293 return self._peers[node_id] 294 port = int(port) 295 peer_addr = LNPeerAddr(host, port, node_id) 296 self._trying_addr_now(peer_addr) 297 self.logger.info(f"adding peer {peer_addr}") 298 if node_id == self.node_keypair.pubkey: 299 raise ErrorAddingPeer("cannot connect to self") 300 transport = LNTransport(self.node_keypair.privkey, peer_addr, 301 proxy=self.network.proxy) 302 peer = await self._add_peer_from_transport(node_id=node_id, transport=transport) 303 return peer 304 305 async def _add_peer_from_transport(self, *, node_id: bytes, transport: LNTransportBase) -> Peer: 306 peer = Peer(self, node_id, transport) 307 with self.lock: 308 existing_peer = self._peers.get(node_id) 309 if existing_peer: 310 existing_peer.close_and_cleanup() 311 assert node_id not in self._peers 312 self._peers[node_id] = peer 313 await self.taskgroup.spawn(peer.main_loop()) 314 return peer 315 316 def peer_closed(self, peer: Peer) -> None: 317 with self.lock: 318 peer2 = self._peers.get(peer.pubkey) 319 if peer2 is peer: 320 self._peers.pop(peer.pubkey) 321 322 def num_peers(self) -> int: 323 return sum([p.is_initialized() for p in self.peers.values()]) 324 325 def start_network(self, network: 'Network'): 326 assert network 327 self.network = network 328 self.config = network.config 329 self._add_peers_from_config() 330 asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop) 331 332 async def stop(self): 333 if self.listen_server: 334 self.listen_server.close() 335 util.unregister_callback(self.on_proxy_changed) 336 await self.taskgroup.cancel_remaining() 337 338 def _add_peers_from_config(self): 339 peer_list = self.config.get('lightning_peers', []) 340 for host, port, pubkey in peer_list: 341 asyncio.run_coroutine_threadsafe( 342 self._add_peer(host, int(port), bfh(pubkey)), 343 self.network.asyncio_loop) 344 345 def is_good_peer(self, peer: LNPeerAddr) -> bool: 346 # the purpose of this method is to filter peers that advertise the desired feature bits 347 # it is disabled for now, because feature bits published in node announcements seem to be unreliable 348 return True 349 node_id = peer.pubkey 350 node = self.channel_db._nodes.get(node_id) 351 if not node: 352 return False 353 try: 354 ln_compare_features(self.features, node.features) 355 except IncompatibleLightningFeatures: 356 return False 357 #self.logger.info(f'is_good {peer.host}') 358 return True 359 360 def on_peer_successfully_established(self, peer: Peer) -> None: 361 if isinstance(peer.transport, LNTransport): 362 peer_addr = peer.transport.peer_addr 363 # reset connection attempt count 364 self._on_connection_successfully_established(peer_addr) 365 # add into channel db 366 if self.channel_db: 367 self.channel_db.add_recent_peer(peer_addr) 368 # save network address into channels we might have with peer 369 for chan in peer.channels.values(): 370 chan.add_or_update_peer_addr(peer_addr) 371 372 async def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]: 373 now = time.time() 374 await self.channel_db.data_loaded.wait() 375 # first try from recent peers 376 recent_peers = self.channel_db.get_recent_peers() 377 for peer in recent_peers: 378 if not peer: 379 continue 380 if peer.pubkey in self._peers: 381 continue 382 if not self._can_retry_addr(peer, now=now): 383 continue 384 if not self.is_good_peer(peer): 385 continue 386 return [peer] 387 # try random peer from graph 388 unconnected_nodes = self.channel_db.get_200_randomly_sorted_nodes_not_in(self.peers.keys()) 389 if unconnected_nodes: 390 for node_id in unconnected_nodes: 391 addrs = self.channel_db.get_node_addresses(node_id) 392 if not addrs: 393 continue 394 host, port, timestamp = self.choose_preferred_address(list(addrs)) 395 try: 396 peer = LNPeerAddr(host, port, node_id) 397 except ValueError: 398 continue 399 if not self._can_retry_addr(peer, now=now): 400 continue 401 if not self.is_good_peer(peer): 402 continue 403 #self.logger.info('taking random ln peer from our channel db') 404 return [peer] 405 406 # getting desperate... let's try hardcoded fallback list of peers 407 if constants.net in (constants.BitcoinTestnet,): 408 fallback_list = FALLBACK_NODE_LIST_TESTNET 409 elif constants.net in (constants.BitcoinMainnet,): 410 fallback_list = FALLBACK_NODE_LIST_MAINNET 411 else: 412 return [] # regtest?? 413 414 fallback_list = [peer for peer in fallback_list if self._can_retry_addr(peer, now=now)] 415 if fallback_list: 416 return [random.choice(fallback_list)] 417 418 # last resort: try dns seeds (BOLT-10) 419 return await run_in_thread(self._get_peers_from_dns_seeds) 420 421 def _get_peers_from_dns_seeds(self) -> Sequence[LNPeerAddr]: 422 # NOTE: potentially long blocking call, do not run directly on asyncio event loop. 423 # Return several peers to reduce the number of dns queries. 424 if not constants.net.LN_DNS_SEEDS: 425 return [] 426 dns_seed = random.choice(constants.net.LN_DNS_SEEDS) 427 self.logger.info('asking dns seed "{}" for ln peers'.format(dns_seed)) 428 try: 429 # note: this might block for several seconds 430 # this will include bech32-encoded-pubkeys and ports 431 srv_answers = resolve_dns_srv('r{}.{}'.format( 432 constants.net.LN_REALM_BYTE, dns_seed)) 433 except dns.exception.DNSException as e: 434 self.logger.info(f'failed querying (1) dns seed "{dns_seed}" for ln peers: {repr(e)}') 435 return [] 436 random.shuffle(srv_answers) 437 num_peers = 2 * NUM_PEERS_TARGET 438 srv_answers = srv_answers[:num_peers] 439 # we now have pubkeys and ports but host is still needed 440 peers = [] 441 for srv_ans in srv_answers: 442 try: 443 # note: this might block for several seconds 444 answers = dns.resolver.resolve(srv_ans['host']) 445 except dns.exception.DNSException as e: 446 self.logger.info(f'failed querying (2) dns seed "{dns_seed}" for ln peers: {repr(e)}') 447 continue 448 try: 449 ln_host = str(answers[0]) 450 port = int(srv_ans['port']) 451 bech32_pubkey = srv_ans['host'].split('.')[0] 452 pubkey = get_compressed_pubkey_from_bech32(bech32_pubkey) 453 peers.append(LNPeerAddr(ln_host, port, pubkey)) 454 except Exception as e: 455 self.logger.info(f'error with parsing peer from dns seed: {repr(e)}') 456 continue 457 self.logger.info(f'got {len(peers)} ln peers from dns seed') 458 return peers 459 460 @staticmethod 461 def choose_preferred_address(addr_list: Sequence[Tuple[str, int, int]]) -> Tuple[str, int, int]: 462 assert len(addr_list) >= 1 463 # choose first one that is an IP 464 for host, port, timestamp in addr_list: 465 if is_ip_address(host): 466 return host, port, timestamp 467 # otherwise choose one at random 468 # TODO maybe filter out onion if not on tor? 469 choice = random.choice(addr_list) 470 return choice 471 472 def on_proxy_changed(self, event, *args): 473 for peer in self.peers.values(): 474 peer.close_and_cleanup() 475 self._clear_addr_retry_times() 476 477 @log_exceptions 478 async def add_peer(self, connect_str: str) -> Peer: 479 node_id, rest = extract_nodeid(connect_str) 480 peer = self._peers.get(node_id) 481 if not peer: 482 if rest is not None: 483 host, port = split_host_port(rest) 484 else: 485 if not self.channel_db: 486 addr = trampolines_by_id().get(node_id) 487 if not addr: 488 raise ConnStringFormatError(_('Address unknown for node:') + ' ' + bh2u(node_id)) 489 host, port = addr.host, addr.port 490 else: 491 addrs = self.channel_db.get_node_addresses(node_id) 492 if not addrs: 493 raise ConnStringFormatError(_('Don\'t know any addresses for node:') + ' ' + bh2u(node_id)) 494 host, port, timestamp = self.choose_preferred_address(list(addrs)) 495 port = int(port) 496 # Try DNS-resolving the host (if needed). This is simply so that 497 # the caller gets a nice exception if it cannot be resolved. 498 try: 499 await asyncio.get_event_loop().getaddrinfo(host, port) 500 except socket.gaierror: 501 raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)')) 502 # add peer 503 peer = await self._add_peer(host, port, node_id) 504 return peer 505 506 507class LNGossip(LNWorker): 508 max_age = 14*24*3600 509 LOGGING_SHORTCUT = 'g' 510 511 def __init__(self): 512 seed = os.urandom(32) 513 node = BIP32Node.from_rootseed(seed, xtype='standard') 514 xprv = node.to_xprv() 515 super().__init__(xprv, LNGOSSIP_FEATURES) 516 self.unknown_ids = set() 517 518 def start_network(self, network: 'Network'): 519 assert network 520 super().start_network(network) 521 asyncio.run_coroutine_threadsafe(self.taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop) 522 523 async def maintain_db(self): 524 await self.channel_db.data_loaded.wait() 525 while True: 526 if len(self.unknown_ids) == 0: 527 self.channel_db.prune_old_policies(self.max_age) 528 self.channel_db.prune_orphaned_channels() 529 await asyncio.sleep(120) 530 531 async def add_new_ids(self, ids: Iterable[bytes]): 532 known = self.channel_db.get_channel_ids() 533 new = set(ids) - set(known) 534 self.unknown_ids.update(new) 535 util.trigger_callback('unknown_channels', len(self.unknown_ids)) 536 util.trigger_callback('gossip_peers', self.num_peers()) 537 util.trigger_callback('ln_gossip_sync_progress') 538 539 def get_ids_to_query(self) -> Sequence[bytes]: 540 N = 500 541 l = list(self.unknown_ids) 542 self.unknown_ids = set(l[N:]) 543 util.trigger_callback('unknown_channels', len(self.unknown_ids)) 544 util.trigger_callback('ln_gossip_sync_progress') 545 return l[0:N] 546 547 def get_sync_progress_estimate(self) -> Tuple[Optional[int], Optional[int], Optional[int]]: 548 """Estimates the gossip synchronization process and returns the number 549 of synchronized channels, the total channels in the network and a 550 rescaled percentage of the synchronization process.""" 551 if self.num_peers() == 0: 552 return None, None, None 553 nchans_with_0p, nchans_with_1p, nchans_with_2p = self.channel_db.get_num_channels_partitioned_by_policy_count() 554 num_db_channels = nchans_with_0p + nchans_with_1p + nchans_with_2p 555 # some channels will never have two policies (only one is in gossip?...) 556 # so if we have at least 1 policy for a channel, we consider that channel "complete" here 557 current_est = num_db_channels - nchans_with_0p 558 total_est = len(self.unknown_ids) + num_db_channels 559 560 progress = current_est / total_est if total_est and current_est else 0 561 progress_percent = (1.0 / 0.95 * progress) * 100 562 progress_percent = min(progress_percent, 100) 563 progress_percent = round(progress_percent) 564 # take a minimal number of synchronized channels to get a more accurate 565 # percentage estimate 566 if current_est < 200: 567 progress_percent = 0 568 return current_est, total_est, progress_percent 569 570 async def process_gossip(self, chan_anns, node_anns, chan_upds): 571 # note: we run in the originating peer's TaskGroup, so we can safely raise here 572 # and disconnect only from that peer 573 await self.channel_db.data_loaded.wait() 574 self.logger.debug(f'process_gossip {len(chan_anns)} {len(node_anns)} {len(chan_upds)}') 575 # channel announcements 576 def process_chan_anns(): 577 for payload in chan_anns: 578 self.channel_db.verify_channel_announcement(payload) 579 self.channel_db.add_channel_announcements(chan_anns) 580 await run_in_thread(process_chan_anns) 581 # node announcements 582 def process_node_anns(): 583 for payload in node_anns: 584 self.channel_db.verify_node_announcement(payload) 585 self.channel_db.add_node_announcements(node_anns) 586 await run_in_thread(process_node_anns) 587 # channel updates 588 categorized_chan_upds = await run_in_thread(partial( 589 self.channel_db.add_channel_updates, 590 chan_upds, 591 max_age=self.max_age)) 592 orphaned = categorized_chan_upds.orphaned 593 if orphaned: 594 self.logger.info(f'adding {len(orphaned)} unknown channel ids') 595 orphaned_ids = [c['short_channel_id'] for c in orphaned] 596 await self.add_new_ids(orphaned_ids) 597 if categorized_chan_upds.good: 598 self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds)}') 599 600 601class LNWallet(LNWorker): 602 603 lnwatcher: Optional['LNWalletWatcher'] 604 MPP_EXPIRY = 120 605 TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS = 3 # seconds 606 607 def __init__(self, wallet: 'Abstract_Wallet', xprv): 608 self.wallet = wallet 609 self.db = wallet.db 610 Logger.__init__(self) 611 LNWorker.__init__(self, xprv, LNWALLET_FEATURES) 612 self.config = wallet.config 613 self.lnwatcher = None 614 self.lnrater: LNRater = None 615 self.payments = self.db.get_dict('lightning_payments') # RHASH -> amount, direction, is_paid 616 self.preimages = self.db.get_dict('lightning_preimages') # RHASH -> preimage 617 # note: this sweep_address is only used as fallback; as it might result in address-reuse 618 self.sweep_address = wallet.get_new_sweep_address_for_channel() 619 self.logs = defaultdict(list) # type: Dict[str, List[HtlcLog]] # key is RHASH # (not persisted) 620 # used in tests 621 self.enable_htlc_settle = True 622 self.enable_htlc_forwarding = True 623 624 # note: accessing channels (besides simple lookup) needs self.lock! 625 self._channels = {} # type: Dict[bytes, Channel] 626 channels = self.db.get_dict("channels") 627 for channel_id, c in random_shuffled_copy(channels.items()): 628 self._channels[bfh(channel_id)] = Channel(c, sweep_address=self.sweep_address, lnworker=self) 629 630 self._channel_backups = {} # type: Dict[bytes, ChannelBackup] 631 # order is important: imported should overwrite onchain 632 for name in ["onchain_channel_backups", "imported_channel_backups"]: 633 channel_backups = self.db.get_dict(name) 634 for channel_id, storage in channel_backups.items(): 635 self._channel_backups[bfh(channel_id)] = ChannelBackup(storage, sweep_address=self.sweep_address, lnworker=self) 636 637 self.sent_htlcs = defaultdict(asyncio.Queue) # type: Dict[bytes, asyncio.Queue[HtlcLog]] 638 self.sent_htlcs_routes = dict() # (RHASH, scid, htlc_id) -> route, payment_secret, amount_msat, bucket_msat 639 self.sent_buckets = dict() # payment_secret -> (amount_sent, amount_failed) 640 self.received_mpp_htlcs = dict() # RHASH -> mpp_status, htlc_set 641 642 self.swap_manager = SwapManager(wallet=self.wallet, lnworker=self) 643 # detect inflight payments 644 self.inflight_payments = set() # (not persisted) keys of invoices that are in PR_INFLIGHT state 645 for payment_hash in self.get_payments(status='inflight').keys(): 646 self.set_invoice_status(payment_hash.hex(), PR_INFLIGHT) 647 648 self.trampoline_forwarding_failures = {} # todo: should be persisted 649 650 def has_deterministic_node_id(self): 651 return bool(self.db.get('lightning_xprv')) 652 653 def has_recoverable_channels(self): 654 # TODO: expose use_recoverable_channels in preferences 655 return self.has_deterministic_node_id() \ 656 and self.config.get('use_recoverable_channels', True) \ 657 and not (self.config.get('lightning_listen')) 658 659 @property 660 def channels(self) -> Mapping[bytes, Channel]: 661 """Returns a read-only copy of channels.""" 662 with self.lock: 663 return self._channels.copy() 664 665 @property 666 def channel_backups(self) -> Mapping[bytes, ChannelBackup]: 667 """Returns a read-only copy of channels.""" 668 with self.lock: 669 return self._channel_backups.copy() 670 671 def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]: 672 return self._channels.get(channel_id, None) 673 674 def diagnostic_name(self): 675 return self.wallet.diagnostic_name() 676 677 @ignore_exceptions 678 @log_exceptions 679 async def sync_with_local_watchtower(self): 680 watchtower = self.network.local_watchtower 681 if watchtower: 682 while True: 683 for chan in self.channels.values(): 684 await self.sync_channel_with_watchtower(chan, watchtower.sweepstore) 685 await asyncio.sleep(5) 686 687 @ignore_exceptions 688 @log_exceptions 689 async def sync_with_remote_watchtower(self): 690 while True: 691 # periodically poll if the user updated 'watchtower_url' 692 await asyncio.sleep(5) 693 watchtower_url = self.config.get('watchtower_url') 694 if not watchtower_url: 695 continue 696 parsed_url = urllib.parse.urlparse(watchtower_url) 697 if not (parsed_url.scheme == 'https' or is_private_netaddress(parsed_url.hostname)): 698 self.logger.warning(f"got watchtower URL for remote tower but we won't use it! " 699 f"can only use HTTPS (except if private IP): not using {watchtower_url!r}") 700 continue 701 # try to sync with the remote watchtower 702 try: 703 async with make_aiohttp_session(proxy=self.network.proxy) as session: 704 watchtower = JsonRPCClient(session, watchtower_url) 705 watchtower.add_method('get_ctn') 706 watchtower.add_method('add_sweep_tx') 707 for chan in self.channels.values(): 708 await self.sync_channel_with_watchtower(chan, watchtower) 709 except aiohttp.client_exceptions.ClientConnectorError: 710 self.logger.info(f'could not contact remote watchtower {watchtower_url}') 711 712 async def sync_channel_with_watchtower(self, chan: Channel, watchtower): 713 outpoint = chan.funding_outpoint.to_str() 714 addr = chan.get_funding_address() 715 current_ctn = chan.get_oldest_unrevoked_ctn(REMOTE) 716 watchtower_ctn = await watchtower.get_ctn(outpoint, addr) 717 for ctn in range(watchtower_ctn + 1, current_ctn): 718 sweeptxs = chan.create_sweeptxs(ctn) 719 for tx in sweeptxs: 720 await watchtower.add_sweep_tx(outpoint, ctn, tx.inputs()[0].prevout.to_str(), tx.serialize()) 721 722 def start_network(self, network: 'Network'): 723 assert network 724 self.network = network 725 self.config = network.config 726 self.lnwatcher = LNWalletWatcher(self, network) 727 self.lnwatcher.start_network(network) 728 self.swap_manager.start_network(network=network, lnwatcher=self.lnwatcher) 729 self.lnrater = LNRater(self, network) 730 731 for chan in self.channels.values(): 732 self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address()) 733 for cb in self.channel_backups.values(): 734 self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address()) 735 736 for coro in [ 737 self.maybe_listen(), 738 self.lnwatcher.on_network_update('network_updated'), # shortcut (don't block) if funding tx locked and verified 739 self.reestablish_peers_and_channels(), 740 self.sync_with_local_watchtower(), 741 self.sync_with_remote_watchtower(), 742 ]: 743 tg_coro = self.taskgroup.spawn(coro) 744 asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop) 745 746 async def stop(self): 747 self.stopping_soon = True 748 if self.listen_server: # stop accepting new peers 749 self.listen_server.close() 750 async with ignore_after(self.TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS): 751 await self.wait_for_received_pending_htlcs_to_get_removed() 752 await LNWorker.stop(self) 753 if self.lnwatcher: 754 await self.lnwatcher.stop() 755 self.lnwatcher = None 756 757 async def wait_for_received_pending_htlcs_to_get_removed(self): 758 assert self.stopping_soon is True 759 # We try to fail pending MPP HTLCs, and wait a bit for them to get removed. 760 # Note: even without MPP, if we just failed/fulfilled an HTLC, it is good 761 # to wait a bit for it to become irrevocably removed. 762 # Note: we don't wait for *all htlcs* to get removed, only for those 763 # that we can already fail/fulfill. e.g. forwarded htlcs cannot be removed 764 async with TaskGroup() as group: 765 for peer in self.peers.values(): 766 await group.spawn(peer.wait_one_htlc_switch_iteration()) 767 while True: 768 if all(not peer.received_htlcs_pending_removal for peer in self.peers.values()): 769 break 770 async with TaskGroup(wait=any) as group: 771 for peer in self.peers.values(): 772 await group.spawn(peer.received_htlc_removed_event.wait()) 773 774 def peer_closed(self, peer): 775 for chan in self.channels_for_peer(peer.pubkey).values(): 776 chan.peer_state = PeerState.DISCONNECTED 777 util.trigger_callback('channel', self.wallet, chan) 778 super().peer_closed(peer) 779 780 def get_payments(self, *, status=None) -> Mapping[bytes, List[HTLCWithStatus]]: 781 out = defaultdict(list) 782 for chan in self.channels.values(): 783 d = chan.get_payments(status=status) 784 for payment_hash, plist in d.items(): 785 out[payment_hash] += plist 786 return out 787 788 def get_payment_value( 789 self, info: Optional['PaymentInfo'], plist: List[HTLCWithStatus], 790 ) -> Tuple[int, int, int]: 791 assert plist 792 amount_msat = 0 793 fee_msat = None 794 for htlc_with_status in plist: 795 htlc = htlc_with_status.htlc 796 _direction = htlc_with_status.direction 797 amount_msat += int(_direction) * htlc.amount_msat 798 if _direction == SENT and info and info.amount_msat: 799 fee_msat = (fee_msat or 0) - info.amount_msat - amount_msat 800 timestamp = min([htlc_with_status.htlc.timestamp for htlc_with_status in plist]) 801 return amount_msat, fee_msat, timestamp 802 803 def get_lightning_history(self): 804 out = {} 805 for payment_hash, plist in self.get_payments(status='settled').items(): 806 if len(plist) == 0: 807 continue 808 key = payment_hash.hex() 809 info = self.get_payment_info(payment_hash) 810 amount_msat, fee_msat, timestamp = self.get_payment_value(info, plist) 811 if info is not None: 812 label = self.wallet.get_label(key) 813 direction = ('sent' if info.direction == SENT else 'received') if len(plist)==1 else 'self-payment' 814 else: 815 direction = 'forwarding' 816 label = _('Forwarding') 817 preimage = self.get_preimage(payment_hash).hex() 818 item = { 819 'type': 'payment', 820 'label': label, 821 'timestamp': timestamp or 0, 822 'date': timestamp_to_datetime(timestamp), 823 'direction': direction, 824 'amount_msat': amount_msat, 825 'fee_msat': fee_msat, 826 'payment_hash': key, 827 'preimage': preimage, 828 } 829 # add group_id to swap transactions 830 swap = self.swap_manager.get_swap(payment_hash) 831 if swap: 832 if swap.is_reverse: 833 item['group_id'] = swap.spending_txid 834 item['group_label'] = 'Reverse swap' + ' ' + self.config.format_amount_and_units(swap.lightning_amount) 835 else: 836 item['group_id'] = swap.funding_txid 837 item['group_label'] = 'Forward swap' + ' ' + self.config.format_amount_and_units(swap.onchain_amount) 838 # done 839 out[payment_hash] = item 840 return out 841 842 def get_onchain_history(self): 843 current_height = self.wallet.get_local_height() 844 out = {} 845 # add funding events 846 for chan in self.channels.values(): 847 item = chan.get_funding_height() 848 if item is None: 849 continue 850 if not self.lnwatcher: 851 continue # lnwatcher not available with --offline (its data is not persisted) 852 funding_txid, funding_height, funding_timestamp = item 853 tx_height = self.lnwatcher.get_tx_height(funding_txid) 854 item = { 855 'channel_id': bh2u(chan.channel_id), 856 'type': 'channel_opening', 857 'label': self.wallet.get_label_for_txid(funding_txid) or (_('Open channel') + ' ' + chan.get_id_for_log()), 858 'txid': funding_txid, 859 'amount_msat': chan.balance(LOCAL, ctn=0), 860 'direction': 'received', 861 'timestamp': tx_height.timestamp, 862 'date': timestamp_to_datetime(tx_height.timestamp), 863 'fee_sat': None, 864 'fee_msat': None, 865 'height': tx_height.height, 866 'confirmations': tx_height.conf, 867 } 868 out[funding_txid] = item 869 item = chan.get_closing_height() 870 if item is None: 871 continue 872 closing_txid, closing_height, closing_timestamp = item 873 tx_height = self.lnwatcher.get_tx_height(closing_txid) 874 item = { 875 'channel_id': bh2u(chan.channel_id), 876 'txid': closing_txid, 877 'label': self.wallet.get_label_for_txid(closing_txid) or (_('Close channel') + ' ' + chan.get_id_for_log()), 878 'type': 'channel_closure', 879 'amount_msat': -chan.balance_minus_outgoing_htlcs(LOCAL), 880 'direction': 'sent', 881 'timestamp': tx_height.timestamp, 882 'date': timestamp_to_datetime(tx_height.timestamp), 883 'fee_sat': None, 884 'fee_msat': None, 885 'height': tx_height.height, 886 'confirmations': tx_height.conf, 887 } 888 out[closing_txid] = item 889 # add info about submarine swaps 890 settled_payments = self.get_payments(status='settled') 891 for payment_hash_hex, swap in self.swap_manager.swaps.items(): 892 txid = swap.spending_txid if swap.is_reverse else swap.funding_txid 893 if txid is None: 894 continue 895 payment_hash = bytes.fromhex(payment_hash_hex) 896 if payment_hash in settled_payments: 897 plist = settled_payments[payment_hash] 898 info = self.get_payment_info(payment_hash) 899 amount_msat, fee_msat, timestamp = self.get_payment_value(info, plist) 900 else: 901 amount_msat = 0 902 label = 'Reverse swap' if swap.is_reverse else 'Forward swap' 903 delta = current_height - swap.locktime 904 if not swap.is_redeemed and swap.spending_txid is None and delta < 0: 905 label += f' (refundable in {-delta} blocks)' # fixme: only if unspent 906 out[txid] = { 907 'txid': txid, 908 'group_id': txid, 909 'amount_msat': 0, 910 #'amount_msat': amount_msat, # must not be added 911 'type': 'swap', 912 'label': self.wallet.get_label_for_txid(txid) or label, 913 } 914 return out 915 916 def get_history(self): 917 out = list(self.get_lightning_history().values()) + list(self.get_onchain_history().values()) 918 # sort by timestamp 919 out.sort(key=lambda x: (x.get('timestamp') or float("inf"))) 920 balance_msat = 0 921 for item in out: 922 balance_msat += item['amount_msat'] 923 item['balance_msat'] = balance_msat 924 return out 925 926 def channel_peers(self) -> List[bytes]: 927 node_ids = [chan.node_id for chan in self.channels.values() if not chan.is_closed()] 928 return node_ids 929 930 def channels_for_peer(self, node_id): 931 assert type(node_id) is bytes 932 return {chan_id: chan for (chan_id, chan) in self.channels.items() 933 if chan.node_id == node_id} 934 935 def channel_state_changed(self, chan: Channel): 936 if type(chan) is Channel: 937 self.save_channel(chan) 938 util.trigger_callback('channel', self.wallet, chan) 939 940 def save_channel(self, chan: Channel): 941 assert type(chan) is Channel 942 if chan.config[REMOTE].next_per_commitment_point == chan.config[REMOTE].current_per_commitment_point: 943 raise Exception("Tried to save channel with next_point == current_point, this should not happen") 944 self.wallet.save_db() 945 util.trigger_callback('channel', self.wallet, chan) 946 947 def channel_by_txo(self, txo: str) -> Optional[AbstractChannel]: 948 for chan in self.channels.values(): 949 if chan.funding_outpoint.to_str() == txo: 950 return chan 951 for chan in self.channel_backups.values(): 952 if chan.funding_outpoint.to_str() == txo: 953 return chan 954 955 async def on_channel_update(self, chan: Channel): 956 if type(chan) is ChannelBackup: 957 util.trigger_callback('channel', self.wallet, chan) 958 return 959 960 if chan.get_state() == ChannelState.OPEN and chan.should_be_closed_due_to_expiring_htlcs(self.network.get_local_height()): 961 self.logger.info(f"force-closing due to expiring htlcs") 962 await self.try_force_closing(chan.channel_id) 963 964 elif chan.get_state() == ChannelState.FUNDED: 965 peer = self._peers.get(chan.node_id) 966 if peer and peer.is_initialized(): 967 peer.send_funding_locked(chan) 968 969 elif chan.get_state() == ChannelState.OPEN: 970 peer = self._peers.get(chan.node_id) 971 if peer: 972 await peer.maybe_update_fee(chan) 973 conf = self.lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf 974 peer.on_network_update(chan, conf) 975 976 elif chan.get_state() == ChannelState.FORCE_CLOSING: 977 force_close_tx = chan.force_close_tx() 978 txid = force_close_tx.txid() 979 height = self.lnwatcher.get_tx_height(txid).height 980 if height == TX_HEIGHT_LOCAL: 981 self.logger.info('REBROADCASTING CLOSING TX') 982 await self.network.try_broadcasting(force_close_tx, 'force-close') 983 984 @log_exceptions 985 async def _open_channel_coroutine( 986 self, *, 987 connect_str: str, 988 funding_tx: PartialTransaction, 989 funding_sat: int, 990 push_sat: int, 991 password: Optional[str]) -> Tuple[Channel, PartialTransaction]: 992 993 peer = await self.add_peer(connect_str) 994 coro = peer.channel_establishment_flow( 995 funding_tx=funding_tx, 996 funding_sat=funding_sat, 997 push_msat=push_sat * 1000, 998 temp_channel_id=os.urandom(32)) 999 chan, funding_tx = await asyncio.wait_for(coro, LN_P2P_NETWORK_TIMEOUT) 1000 util.trigger_callback('channels_updated', self.wallet) 1001 self.wallet.add_transaction(funding_tx) # save tx as local into the wallet 1002 self.wallet.sign_transaction(funding_tx, password) 1003 self.wallet.set_label(funding_tx.txid(), _('Open channel')) 1004 if funding_tx.is_complete(): 1005 await self.network.try_broadcasting(funding_tx, 'open_channel') 1006 return chan, funding_tx 1007 1008 def add_channel(self, chan: Channel): 1009 with self.lock: 1010 self._channels[chan.channel_id] = chan 1011 self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address()) 1012 1013 def add_new_channel(self, chan: Channel): 1014 self.add_channel(chan) 1015 channels_db = self.db.get_dict('channels') 1016 channels_db[chan.channel_id.hex()] = chan.storage 1017 for addr in chan.get_wallet_addresses_channel_might_want_reserved(): 1018 self.wallet.set_reserved_state_of_address(addr, reserved=True) 1019 try: 1020 self.save_channel(chan) 1021 backup_dir = self.config.get_backup_dir() 1022 if backup_dir is not None: 1023 self.wallet.save_backup(backup_dir) 1024 except: 1025 chan.set_state(ChannelState.REDEEMED) 1026 self.remove_channel(chan.channel_id) 1027 raise 1028 1029 def cb_data(self, node_id): 1030 return CB_MAGIC_BYTES + node_id[0:16] 1031 1032 def decrypt_cb_data(self, encrypted_data, funding_address): 1033 funding_scripthash = bytes.fromhex(address_to_scripthash(funding_address)) 1034 nonce = funding_scripthash[0:12] 1035 return chacha20_decrypt(key=self.backup_key, data=encrypted_data, nonce=nonce) 1036 1037 def encrypt_cb_data(self, data, funding_address): 1038 funding_scripthash = bytes.fromhex(address_to_scripthash(funding_address)) 1039 nonce = funding_scripthash[0:12] 1040 return chacha20_encrypt(key=self.backup_key, data=data, nonce=nonce) 1041 1042 def mktx_for_open_channel( 1043 self, *, 1044 coins: Sequence[PartialTxInput], 1045 funding_sat: int, 1046 node_id: bytes, 1047 fee_est=None) -> PartialTransaction: 1048 outputs = [PartialTxOutput.from_address_and_value(ln_dummy_address(), funding_sat)] 1049 if self.has_recoverable_channels(): 1050 dummy_scriptpubkey = make_op_return(self.cb_data(node_id)) 1051 outputs.append(PartialTxOutput(scriptpubkey=dummy_scriptpubkey, value=0)) 1052 tx = self.wallet.make_unsigned_transaction( 1053 coins=coins, 1054 outputs=outputs, 1055 fee=fee_est) 1056 tx.set_rbf(False) 1057 return tx 1058 1059 def open_channel(self, *, connect_str: str, funding_tx: PartialTransaction, 1060 funding_sat: int, push_amt_sat: int, password: str = None) -> Tuple[Channel, PartialTransaction]: 1061 if funding_sat > LN_MAX_FUNDING_SAT: 1062 raise Exception(_("Requested channel capacity is over protocol allowed maximum.")) 1063 coro = self._open_channel_coroutine( 1064 connect_str=connect_str, funding_tx=funding_tx, funding_sat=funding_sat, 1065 push_sat=push_amt_sat, password=password) 1066 fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) 1067 try: 1068 chan, funding_tx = fut.result() 1069 except concurrent.futures.TimeoutError: 1070 raise Exception(_("open_channel timed out")) 1071 return chan, funding_tx 1072 1073 def get_channel_by_short_id(self, short_channel_id: bytes) -> Optional[Channel]: 1074 for chan in self.channels.values(): 1075 if chan.short_channel_id == short_channel_id: 1076 return chan 1077 1078 @log_exceptions 1079 async def pay_invoice( 1080 self, invoice: str, *, 1081 amount_msat: int = None, 1082 attempts: int = 1, 1083 full_path: LNPaymentPath = None) -> Tuple[bool, List[HtlcLog]]: 1084 1085 lnaddr = self._check_invoice(invoice, amount_msat=amount_msat) 1086 min_cltv_expiry = lnaddr.get_min_final_cltv_expiry() 1087 payment_hash = lnaddr.paymenthash 1088 key = payment_hash.hex() 1089 payment_secret = lnaddr.payment_secret 1090 invoice_pubkey = lnaddr.pubkey.serialize() 1091 invoice_features = lnaddr.get_features() 1092 r_tags = lnaddr.get_routing_info('r') 1093 amount_to_pay = lnaddr.get_amount_msat() 1094 status = self.get_payment_status(payment_hash) 1095 if status == PR_PAID: 1096 raise PaymentFailure(_("This invoice has been paid already")) 1097 if status == PR_INFLIGHT: 1098 raise PaymentFailure(_("A payment was already initiated for this invoice")) 1099 if payment_hash in self.get_payments(status='inflight'): 1100 raise PaymentFailure(_("A previous attempt to pay this invoice did not clear")) 1101 info = PaymentInfo(payment_hash, amount_to_pay, SENT, PR_UNPAID) 1102 self.save_payment_info(info) 1103 self.wallet.set_label(key, lnaddr.get_description()) 1104 1105 self.set_invoice_status(key, PR_INFLIGHT) 1106 try: 1107 await self.pay_to_node( 1108 node_pubkey=invoice_pubkey, 1109 payment_hash=payment_hash, 1110 payment_secret=payment_secret, 1111 amount_to_pay=amount_to_pay, 1112 min_cltv_expiry=min_cltv_expiry, 1113 r_tags=r_tags, 1114 invoice_features=invoice_features, 1115 attempts=attempts, 1116 full_path=full_path) 1117 success = True 1118 except PaymentFailure as e: 1119 self.logger.info(f'payment failure: {e!r}') 1120 success = False 1121 reason = str(e) 1122 if success: 1123 self.set_invoice_status(key, PR_PAID) 1124 util.trigger_callback('payment_succeeded', self.wallet, key) 1125 else: 1126 self.set_invoice_status(key, PR_UNPAID) 1127 util.trigger_callback('payment_failed', self.wallet, key, reason) 1128 log = self.logs[key] 1129 return success, log 1130 1131 async def pay_to_node( 1132 self, *, 1133 node_pubkey: bytes, 1134 payment_hash: bytes, 1135 payment_secret: Optional[bytes], 1136 amount_to_pay: int, # in msat 1137 min_cltv_expiry: int, 1138 r_tags, 1139 invoice_features: int, 1140 attempts: int = 1, 1141 full_path: LNPaymentPath = None, 1142 fwd_trampoline_onion=None, 1143 fwd_trampoline_fee=None, 1144 fwd_trampoline_cltv_delta=None) -> None: 1145 1146 if fwd_trampoline_onion: 1147 # todo: compare to the fee of the actual route we found 1148 if fwd_trampoline_fee < 1000: 1149 raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT, data=b'') 1150 if fwd_trampoline_cltv_delta < 576: 1151 raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON, data=b'') 1152 1153 self.logs[payment_hash.hex()] = log = [] 1154 trampoline_fee_level = self.INITIAL_TRAMPOLINE_FEE_LEVEL 1155 use_two_trampolines = True # only used for pay to legacy 1156 1157 amount_inflight = 0 # what we sent in htlcs (that receiver gets, without fees) 1158 while True: 1159 amount_to_send = amount_to_pay - amount_inflight 1160 if amount_to_send > 0: 1161 # 1. create a set of routes for remaining amount. 1162 # note: path-finding runs in a separate thread so that we don't block the asyncio loop 1163 # graph updates might occur during the computation 1164 routes = self.create_routes_for_payment( 1165 amount_msat=amount_to_send, 1166 final_total_msat=amount_to_pay, 1167 invoice_pubkey=node_pubkey, 1168 min_cltv_expiry=min_cltv_expiry, 1169 r_tags=r_tags, 1170 invoice_features=invoice_features, 1171 full_path=full_path, 1172 payment_hash=payment_hash, 1173 payment_secret=payment_secret, 1174 trampoline_fee_level=trampoline_fee_level, 1175 use_two_trampolines=use_two_trampolines, 1176 fwd_trampoline_onion=fwd_trampoline_onion 1177 ) 1178 # 2. send htlcs 1179 async for route, amount_msat, total_msat, amount_receiver_msat, cltv_delta, bucket_payment_secret, trampoline_onion in routes: 1180 amount_inflight += amount_receiver_msat 1181 if amount_inflight > amount_to_pay: # safety belts 1182 raise Exception(f"amount_inflight={amount_inflight} > amount_to_pay={amount_to_pay}") 1183 await self.pay_to_route( 1184 route=route, 1185 amount_msat=amount_msat, 1186 total_msat=total_msat, 1187 amount_receiver_msat=amount_receiver_msat, 1188 payment_hash=payment_hash, 1189 payment_secret=bucket_payment_secret, 1190 min_cltv_expiry=cltv_delta, 1191 trampoline_onion=trampoline_onion) 1192 util.trigger_callback('invoice_status', self.wallet, payment_hash.hex()) 1193 # 3. await a queue 1194 self.logger.info(f"amount inflight {amount_inflight}") 1195 htlc_log = await self.sent_htlcs[payment_hash].get() 1196 amount_inflight -= htlc_log.amount_msat 1197 if amount_inflight < 0: 1198 raise Exception(f"amount_inflight={amount_inflight} < 0") 1199 log.append(htlc_log) 1200 if htlc_log.success: 1201 if self.network.path_finder: 1202 # TODO: report every route to liquidity hints for mpp 1203 # in the case of success, we report channels of the 1204 # route as being able to send the same amount in the future, 1205 # as we assume to not know the capacity 1206 self.network.path_finder.update_liquidity_hints(htlc_log.route, htlc_log.amount_msat) 1207 # remove inflight htlcs from liquidity hints 1208 self.network.path_finder.update_inflight_htlcs(htlc_log.route, add_htlcs=False) 1209 return 1210 # htlc failed 1211 if len(log) >= attempts: 1212 raise PaymentFailure('Giving up after %d attempts'%len(log)) 1213 # if we get a tmp channel failure, it might work to split the amount and try more routes 1214 # if we get a channel update, we might retry the same route and amount 1215 route = htlc_log.route 1216 sender_idx = htlc_log.sender_idx 1217 failure_msg = htlc_log.failure_msg 1218 code, data = failure_msg.code, failure_msg.data 1219 self.logger.info(f"UPDATE_FAIL_HTLC. code={repr(code)}. " 1220 f"decoded_data={failure_msg.decode_data()}. data={data.hex()!r}") 1221 self.logger.info(f"error reported by {bh2u(route[sender_idx].node_id)}") 1222 if code == OnionFailureCode.MPP_TIMEOUT: 1223 raise PaymentFailure(failure_msg.code_name()) 1224 # trampoline 1225 if not self.channel_db: 1226 # FIXME The trampoline nodes in the path are chosen randomly. 1227 # Some of the errors might depend on how we have chosen them. 1228 # Having more attempts is currently useful in part because of the randomness, 1229 # instead we should give feedback to create_routes_for_payment. 1230 if code in (OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT, 1231 OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON): 1232 # todo: parse the node parameters here (not returned by eclair yet) 1233 trampoline_fee_level += 1 1234 continue 1235 elif use_two_trampolines: 1236 use_two_trampolines = False 1237 elif code in (OnionFailureCode.UNKNOWN_NEXT_PEER, 1238 OnionFailureCode.TEMPORARY_NODE_FAILURE): 1239 continue 1240 else: 1241 raise PaymentFailure(failure_msg.code_name()) 1242 else: 1243 self.handle_error_code_from_failed_htlc( 1244 route=route, sender_idx=sender_idx, failure_msg=failure_msg, amount=htlc_log.amount_msat) 1245 1246 async def pay_to_route( 1247 self, *, 1248 route: LNPaymentRoute, 1249 amount_msat: int, 1250 total_msat: int, 1251 amount_receiver_msat:int, 1252 payment_hash: bytes, 1253 payment_secret: Optional[bytes], 1254 min_cltv_expiry: int, 1255 trampoline_onion: bytes = None) -> None: 1256 1257 # send a single htlc 1258 short_channel_id = route[0].short_channel_id 1259 chan = self.get_channel_by_short_id(short_channel_id) 1260 peer = self._peers.get(route[0].node_id) 1261 if not peer: 1262 raise PaymentFailure('Dropped peer') 1263 await peer.initialized 1264 htlc = peer.pay( 1265 route=route, 1266 chan=chan, 1267 amount_msat=amount_msat, 1268 total_msat=total_msat, 1269 payment_hash=payment_hash, 1270 min_final_cltv_expiry=min_cltv_expiry, 1271 payment_secret=payment_secret, 1272 trampoline_onion=trampoline_onion) 1273 1274 key = (payment_hash, short_channel_id, htlc.htlc_id) 1275 self.sent_htlcs_routes[key] = route, payment_secret, amount_msat, total_msat, amount_receiver_msat 1276 # if we sent MPP to a trampoline, add item to sent_buckets 1277 if not self.channel_db and amount_msat != total_msat: 1278 if payment_secret not in self.sent_buckets: 1279 self.sent_buckets[payment_secret] = (0, 0) 1280 amount_sent, amount_failed = self.sent_buckets[payment_secret] 1281 amount_sent += amount_receiver_msat 1282 self.sent_buckets[payment_secret] = amount_sent, amount_failed 1283 if self.network.path_finder: 1284 # add inflight htlcs to liquidity hints 1285 self.network.path_finder.update_inflight_htlcs(route, add_htlcs=True) 1286 util.trigger_callback('htlc_added', chan, htlc, SENT) 1287 1288 def handle_error_code_from_failed_htlc( 1289 self, 1290 *, 1291 route: LNPaymentRoute, 1292 sender_idx: int, 1293 failure_msg: OnionRoutingFailure, 1294 amount: int) -> None: 1295 1296 assert self.channel_db # cannot be in trampoline mode 1297 assert self.network.path_finder 1298 1299 # remove inflight htlcs from liquidity hints 1300 self.network.path_finder.update_inflight_htlcs(route, add_htlcs=False) 1301 1302 code, data = failure_msg.code, failure_msg.data 1303 # TODO can we use lnmsg.OnionWireSerializer here? 1304 # TODO update onion_wire.csv 1305 # handle some specific error codes 1306 failure_codes = { 1307 OnionFailureCode.TEMPORARY_CHANNEL_FAILURE: 0, 1308 OnionFailureCode.AMOUNT_BELOW_MINIMUM: 8, 1309 OnionFailureCode.FEE_INSUFFICIENT: 8, 1310 OnionFailureCode.INCORRECT_CLTV_EXPIRY: 4, 1311 OnionFailureCode.EXPIRY_TOO_SOON: 0, 1312 OnionFailureCode.CHANNEL_DISABLED: 2, 1313 } 1314 1315 # determine a fallback channel to blacklist if we don't get the erring 1316 # channel via the payload 1317 if sender_idx is None: 1318 raise PaymentFailure(failure_msg.code_name()) 1319 try: 1320 fallback_channel = route[sender_idx + 1].short_channel_id 1321 except IndexError: 1322 raise PaymentFailure(f'payment destination reported error: {failure_msg.code_name()}') from None 1323 1324 # TODO: handle unknown next peer? 1325 # handle failure codes that include a channel update 1326 if code in failure_codes: 1327 offset = failure_codes[code] 1328 channel_update_len = int.from_bytes(data[offset:offset+2], byteorder="big") 1329 channel_update_as_received = data[offset+2: offset+2+channel_update_len] 1330 payload = self._decode_channel_update_msg(channel_update_as_received) 1331 1332 if payload is None: 1333 self.logger.info(f'could not decode channel_update for failed htlc: ' 1334 f'{channel_update_as_received.hex()}') 1335 self.network.path_finder.liquidity_hints.add_to_blacklist(fallback_channel) 1336 else: 1337 # apply the channel update or get blacklisted 1338 blacklist, update = self._handle_chanupd_from_failed_htlc( 1339 payload, route=route, sender_idx=sender_idx) 1340 1341 # we interpret a temporary channel failure as a liquidity issue 1342 # in the channel and update our liquidity hints accordingly 1343 if code == OnionFailureCode.TEMPORARY_CHANNEL_FAILURE: 1344 self.network.path_finder.update_liquidity_hints( 1345 route, 1346 amount, 1347 failing_channel=ShortChannelID(payload['short_channel_id'])) 1348 elif blacklist: 1349 self.network.path_finder.liquidity_hints.add_to_blacklist( 1350 payload['short_channel_id']) 1351 1352 # if we can't decide on some action, we are stuck 1353 if not (blacklist or update): 1354 raise PaymentFailure(failure_msg.code_name()) 1355 # for errors that do not include a channel update 1356 else: 1357 self.network.path_finder.liquidity_hints.add_to_blacklist(fallback_channel) 1358 1359 def _handle_chanupd_from_failed_htlc(self, payload, *, route, sender_idx) -> Tuple[bool, bool]: 1360 blacklist = False 1361 update = False 1362 try: 1363 r = self.channel_db.add_channel_update(payload, verify=True) 1364 except InvalidGossipMsg: 1365 return True, False # blacklist 1366 short_channel_id = ShortChannelID(payload['short_channel_id']) 1367 if r == UpdateStatus.GOOD: 1368 self.logger.info(f"applied channel update to {short_channel_id}") 1369 # TODO: add test for this 1370 # FIXME: this does not work for our own unannounced channels. 1371 for chan in self.channels.values(): 1372 if chan.short_channel_id == short_channel_id: 1373 chan.set_remote_update(payload) 1374 update = True 1375 elif r == UpdateStatus.ORPHANED: 1376 # maybe it is a private channel (and data in invoice was outdated) 1377 self.logger.info(f"Could not find {short_channel_id}. maybe update is for private channel?") 1378 start_node_id = route[sender_idx].node_id 1379 update = self.channel_db.add_channel_update_for_private_channel(payload, start_node_id) 1380 blacklist = not update 1381 elif r == UpdateStatus.EXPIRED: 1382 blacklist = True 1383 elif r == UpdateStatus.DEPRECATED: 1384 self.logger.info(f'channel update is not more recent.') 1385 blacklist = True 1386 elif r == UpdateStatus.UNCHANGED: 1387 blacklist = True 1388 return blacklist, update 1389 1390 @classmethod 1391 def _decode_channel_update_msg(cls, chan_upd_msg: bytes) -> Optional[Dict[str, Any]]: 1392 channel_update_as_received = chan_upd_msg 1393 channel_update_typed = (258).to_bytes(length=2, byteorder="big") + channel_update_as_received 1394 # note: some nodes put channel updates in error msgs with the leading msg_type already there. 1395 # we try decoding both ways here. 1396 try: 1397 message_type, payload = decode_msg(channel_update_typed) 1398 if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception() 1399 payload['raw'] = channel_update_typed 1400 return payload 1401 except: # FIXME: too broad 1402 try: 1403 message_type, payload = decode_msg(channel_update_as_received) 1404 if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception() 1405 payload['raw'] = channel_update_as_received 1406 return payload 1407 except: 1408 return None 1409 1410 @staticmethod 1411 def _check_invoice(invoice: str, *, amount_msat: int = None) -> LnAddr: 1412 addr = lndecode(invoice) 1413 if addr.is_expired(): 1414 raise InvoiceError(_("This invoice has expired")) 1415 if amount_msat: # replace amt in invoice. main usecase is paying zero amt invoices 1416 existing_amt_msat = addr.get_amount_msat() 1417 if existing_amt_msat and amount_msat < existing_amt_msat: 1418 raise Exception("cannot pay lower amt than what is originally in LN invoice") 1419 addr.amount = Decimal(amount_msat) / COIN / 1000 1420 if addr.amount is None: 1421 raise InvoiceError(_("Missing amount")) 1422 if addr.get_min_final_cltv_expiry() > lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE: 1423 raise InvoiceError("{}\n{}".format( 1424 _("Invoice wants us to risk locking funds for unreasonably long."), 1425 f"min_final_cltv_expiry: {addr.get_min_final_cltv_expiry()}")) 1426 return addr 1427 1428 def is_trampoline_peer(self, node_id: bytes) -> bool: 1429 # until trampoline is advertised in lnfeatures, check against hardcoded list 1430 if is_hardcoded_trampoline(node_id): 1431 return True 1432 peer = self._peers.get(node_id) 1433 if peer and peer.their_features.supports(LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT): 1434 return True 1435 return False 1436 1437 def suggest_peer(self) -> Optional[bytes]: 1438 if self.channel_db: 1439 return self.lnrater.suggest_peer() 1440 else: 1441 return random.choice(list(hardcoded_trampoline_nodes().values())).pubkey 1442 1443 async def create_routes_for_payment( 1444 self, *, 1445 amount_msat: int, # part of payment amount we want routes for now 1446 final_total_msat: int, # total payment amount final receiver will get 1447 invoice_pubkey, 1448 min_cltv_expiry, 1449 r_tags, 1450 invoice_features: int, 1451 payment_hash, 1452 payment_secret, 1453 trampoline_fee_level: int, 1454 use_two_trampolines: bool, 1455 fwd_trampoline_onion = None, 1456 full_path: LNPaymentPath = None) -> AsyncGenerator[Tuple[LNPaymentRoute, int], None]: 1457 1458 """Creates multiple routes for splitting a payment over the available 1459 private channels. 1460 1461 We first try to conduct the payment over a single channel. If that fails 1462 and mpp is supported by the receiver, we will split the payment.""" 1463 # It could happen that the pathfinding uses a channel 1464 # in the graph multiple times, meaning we could exhaust 1465 # its capacity. This could be dealt with by temporarily 1466 # iteratively blacklisting channels for this mpp attempt. 1467 invoice_features = LnFeatures(invoice_features) 1468 trampoline_features = LnFeatures.VAR_ONION_OPT 1469 local_height = self.network.get_local_height() 1470 active_channels = [chan for chan in self.channels.values() if chan.is_active() and not chan.is_frozen_for_sending()] 1471 try: 1472 # try to send over a single channel 1473 if not self.channel_db: 1474 for chan in active_channels: 1475 if not self.is_trampoline_peer(chan.node_id): 1476 continue 1477 if chan.node_id == invoice_pubkey: 1478 trampoline_onion = None 1479 trampoline_payment_secret = payment_secret 1480 trampoline_total_msat = final_total_msat 1481 amount_with_fees = amount_msat 1482 cltv_delta = min_cltv_expiry 1483 else: 1484 trampoline_onion, amount_with_fees, cltv_delta = create_trampoline_route_and_onion( 1485 amount_msat=amount_msat, 1486 total_msat=final_total_msat, 1487 min_cltv_expiry=min_cltv_expiry, 1488 my_pubkey=self.node_keypair.pubkey, 1489 invoice_pubkey=invoice_pubkey, 1490 invoice_features=invoice_features, 1491 node_id=chan.node_id, 1492 r_tags=r_tags, 1493 payment_hash=payment_hash, 1494 payment_secret=payment_secret, 1495 local_height=local_height, 1496 trampoline_fee_level=trampoline_fee_level, 1497 use_two_trampolines=use_two_trampolines) 1498 trampoline_payment_secret = os.urandom(32) 1499 trampoline_total_msat = amount_with_fees 1500 if chan.available_to_spend(LOCAL, strict=True) < amount_with_fees: 1501 continue 1502 route = [ 1503 RouteEdge( 1504 start_node=self.node_keypair.pubkey, 1505 end_node=chan.node_id, 1506 short_channel_id=chan.short_channel_id, 1507 fee_base_msat=0, 1508 fee_proportional_millionths=0, 1509 cltv_expiry_delta=0, 1510 node_features=trampoline_features) 1511 ] 1512 yield route, amount_with_fees, trampoline_total_msat, amount_msat, cltv_delta, trampoline_payment_secret, trampoline_onion 1513 break 1514 else: 1515 raise NoPathFound() 1516 else: 1517 route = await run_in_thread( 1518 partial( 1519 self.create_route_for_payment, 1520 amount_msat=amount_msat, 1521 invoice_pubkey=invoice_pubkey, 1522 min_cltv_expiry=min_cltv_expiry, 1523 r_tags=r_tags, 1524 invoice_features=invoice_features, 1525 channels=active_channels, 1526 full_path=full_path 1527 ) 1528 ) 1529 yield route, amount_msat, final_total_msat, amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion 1530 except NoPathFound: 1531 if not invoice_features.supports(LnFeatures.BASIC_MPP_OPT): 1532 raise 1533 channels_with_funds = { 1534 (chan.channel_id, chan.node_id): int(chan.available_to_spend(HTLCOwner.LOCAL)) 1535 for chan in active_channels} 1536 self.logger.info(f"channels_with_funds: {channels_with_funds}") 1537 # for trampoline mpp payments we have to restrict ourselves to pay 1538 # to a single node due to some incompatibility in Eclair, see: 1539 # https://github.com/ACINQ/eclair/issues/1723 1540 use_singe_node = not self.channel_db and constants.net is constants.BitcoinMainnet 1541 split_configurations = suggest_splits(amount_msat, channels_with_funds, single_node=use_singe_node) 1542 self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations') 1543 1544 for s in split_configurations: 1545 self.logger.info(f"trying split configuration: {s[0].values()} rating: {s[1]}") 1546 try: 1547 if not self.channel_db: 1548 buckets = defaultdict(list) 1549 for (chan_id, _), part_amount_msat in s[0].items(): 1550 chan = self.channels[chan_id] 1551 if part_amount_msat: 1552 buckets[chan.node_id].append((chan_id, part_amount_msat)) 1553 for node_id, bucket in buckets.items(): 1554 bucket_amount_msat = sum([x[1] for x in bucket]) 1555 trampoline_onion, bucket_amount_with_fees, bucket_cltv_delta = create_trampoline_route_and_onion( 1556 amount_msat=bucket_amount_msat, 1557 total_msat=final_total_msat, 1558 min_cltv_expiry=min_cltv_expiry, 1559 my_pubkey=self.node_keypair.pubkey, 1560 invoice_pubkey=invoice_pubkey, 1561 invoice_features=invoice_features, 1562 node_id=node_id, 1563 r_tags=r_tags, 1564 payment_hash=payment_hash, 1565 payment_secret=payment_secret, 1566 local_height=local_height, 1567 trampoline_fee_level=trampoline_fee_level, 1568 use_two_trampolines=use_two_trampolines) 1569 # node_features is only used to determine is_tlv 1570 bucket_payment_secret = os.urandom(32) 1571 bucket_fees = bucket_amount_with_fees - bucket_amount_msat 1572 self.logger.info(f'bucket_fees {bucket_fees}') 1573 for chan_id, part_amount_msat in bucket: 1574 chan = self.channels[chan_id] 1575 margin = chan.available_to_spend(LOCAL, strict=True) - part_amount_msat 1576 delta_fee = min(bucket_fees, margin) 1577 part_amount_msat_with_fees = part_amount_msat + delta_fee 1578 bucket_fees -= delta_fee 1579 route = [ 1580 RouteEdge( 1581 start_node=self.node_keypair.pubkey, 1582 end_node=node_id, 1583 short_channel_id=chan.short_channel_id, 1584 fee_base_msat=0, 1585 fee_proportional_millionths=0, 1586 cltv_expiry_delta=0, 1587 node_features=trampoline_features) 1588 ] 1589 self.logger.info(f'adding route {part_amount_msat} {delta_fee} {margin}') 1590 yield route, part_amount_msat_with_fees, bucket_amount_with_fees, part_amount_msat, bucket_cltv_delta, bucket_payment_secret, trampoline_onion 1591 if bucket_fees != 0: 1592 self.logger.info('not enough margin to pay trampoline fee') 1593 raise NoPathFound() 1594 else: 1595 for (chan_id, _), part_amount_msat in s[0].items(): 1596 if part_amount_msat: 1597 channel = self.channels[chan_id] 1598 route = await run_in_thread( 1599 partial( 1600 self.create_route_for_payment, 1601 amount_msat=part_amount_msat, 1602 invoice_pubkey=invoice_pubkey, 1603 min_cltv_expiry=min_cltv_expiry, 1604 r_tags=r_tags, 1605 invoice_features=invoice_features, 1606 channels=[channel], 1607 full_path=None 1608 ) 1609 ) 1610 yield route, part_amount_msat, final_total_msat, part_amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion 1611 self.logger.info(f"found acceptable split configuration: {list(s[0].values())} rating: {s[1]}") 1612 break 1613 except NoPathFound: 1614 continue 1615 else: 1616 raise NoPathFound() 1617 1618 @profiler 1619 def create_route_for_payment( 1620 self, *, 1621 amount_msat: int, 1622 invoice_pubkey: bytes, 1623 min_cltv_expiry: int, 1624 r_tags, 1625 invoice_features: int, 1626 channels: List[Channel], 1627 full_path: Optional[LNPaymentPath]) -> LNPaymentRoute: 1628 1629 scid_to_my_channels = { 1630 chan.short_channel_id: chan for chan in channels 1631 if chan.short_channel_id is not None 1632 } 1633 # Collect all private edges from route hints. 1634 # Note: if some route hints are multiple edges long, and these paths cross each other, 1635 # we allow our path finding to cross the paths; i.e. the route hints are not isolated. 1636 private_route_edges = {} # type: Dict[ShortChannelID, RouteEdge] 1637 for private_path in r_tags: 1638 # we need to shift the node pubkey by one towards the destination: 1639 private_path_nodes = [edge[0] for edge in private_path][1:] + [invoice_pubkey] 1640 private_path_rest = [edge[1:] for edge in private_path] 1641 start_node = private_path[0][0] 1642 for end_node, edge_rest in zip(private_path_nodes, private_path_rest): 1643 short_channel_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta = edge_rest 1644 short_channel_id = ShortChannelID(short_channel_id) 1645 # if we have a routing policy for this edge in the db, that takes precedence, 1646 # as it is likely from a previous failure 1647 channel_policy = self.channel_db.get_policy_for_node( 1648 short_channel_id=short_channel_id, 1649 node_id=start_node, 1650 my_channels=scid_to_my_channels) 1651 if channel_policy: 1652 fee_base_msat = channel_policy.fee_base_msat 1653 fee_proportional_millionths = channel_policy.fee_proportional_millionths 1654 cltv_expiry_delta = channel_policy.cltv_expiry_delta 1655 node_info = self.channel_db.get_node_info_for_node_id(node_id=end_node) 1656 route_edge = RouteEdge( 1657 start_node=start_node, 1658 end_node=end_node, 1659 short_channel_id=short_channel_id, 1660 fee_base_msat=fee_base_msat, 1661 fee_proportional_millionths=fee_proportional_millionths, 1662 cltv_expiry_delta=cltv_expiry_delta, 1663 node_features=node_info.features if node_info else 0) 1664 private_route_edges[route_edge.short_channel_id] = route_edge 1665 start_node = end_node 1666 # now find a route, end to end: between us and the recipient 1667 try: 1668 route = self.network.path_finder.find_route( 1669 nodeA=self.node_keypair.pubkey, 1670 nodeB=invoice_pubkey, 1671 invoice_amount_msat=amount_msat, 1672 path=full_path, 1673 my_channels=scid_to_my_channels, 1674 private_route_edges=private_route_edges) 1675 except NoChannelPolicy as e: 1676 raise NoPathFound() from e 1677 if not route: 1678 raise NoPathFound() 1679 # test sanity 1680 if not is_route_sane_to_use(route, amount_msat, min_cltv_expiry): 1681 self.logger.info(f"rejecting insane route {route}") 1682 raise NoPathFound() 1683 assert len(route) > 0 1684 if route[-1].end_node != invoice_pubkey: 1685 raise LNPathInconsistent("last node_id != invoice pubkey") 1686 # add features from invoice 1687 route[-1].node_features |= invoice_features 1688 return route 1689 1690 def add_request(self, amount_sat, message, expiry) -> str: 1691 coro = self._add_request_coro(amount_sat, message, expiry) 1692 fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) 1693 try: 1694 return fut.result(timeout=5) 1695 except concurrent.futures.TimeoutError: 1696 raise Exception(_("add invoice timed out")) 1697 1698 @log_exceptions 1699 async def create_invoice( 1700 self, *, 1701 amount_msat: Optional[int], 1702 message: str, 1703 expiry: int, 1704 write_to_disk: bool = True, 1705 ) -> Tuple[LnAddr, str]: 1706 1707 timestamp = int(time.time()) 1708 routing_hints = await self._calc_routing_hints_for_invoice(amount_msat) 1709 if not routing_hints: 1710 self.logger.info( 1711 "Warning. No routing hints added to invoice. " 1712 "Other clients will likely not be able to send to us.") 1713 # if not all hints are trampoline, do not create trampoline invoice 1714 invoice_features = self.features.for_invoice() 1715 trampoline_hints = [] 1716 for r in routing_hints: 1717 node_id, short_channel_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta = r[1][0] 1718 if len(r[1])== 1 and self.is_trampoline_peer(node_id): 1719 trampoline_hints.append(('t', (node_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta))) 1720 payment_preimage = os.urandom(32) 1721 payment_hash = sha256(payment_preimage) 1722 info = PaymentInfo(payment_hash, amount_msat, RECEIVED, PR_UNPAID) 1723 amount_btc = amount_msat/Decimal(COIN*1000) if amount_msat else None 1724 if expiry == 0: 1725 expiry = LN_EXPIRY_NEVER 1726 lnaddr = LnAddr( 1727 paymenthash=payment_hash, 1728 amount=amount_btc, 1729 tags=[ 1730 ('d', message), 1731 ('c', MIN_FINAL_CLTV_EXPIRY_FOR_INVOICE), 1732 ('x', expiry), 1733 ('9', invoice_features)] 1734 + routing_hints 1735 + trampoline_hints, 1736 date=timestamp, 1737 payment_secret=derive_payment_secret_from_payment_preimage(payment_preimage)) 1738 invoice = lnencode(lnaddr, self.node_keypair.privkey) 1739 self.save_preimage(payment_hash, payment_preimage, write_to_disk=False) 1740 self.save_payment_info(info, write_to_disk=False) 1741 if write_to_disk: 1742 self.wallet.save_db() 1743 return lnaddr, invoice 1744 1745 async def _add_request_coro(self, amount_sat: Optional[int], message, expiry: int) -> str: 1746 amount_msat = amount_sat * 1000 if amount_sat is not None else None 1747 lnaddr, invoice = await self.create_invoice( 1748 amount_msat=amount_msat, 1749 message=message, 1750 expiry=expiry, 1751 write_to_disk=False, 1752 ) 1753 key = bh2u(lnaddr.paymenthash) 1754 req = LNInvoice.from_bech32(invoice) 1755 self.wallet.add_payment_request(req, write_to_disk=False) 1756 self.wallet.set_label(key, message) 1757 self.wallet.save_db() 1758 return key 1759 1760 def save_preimage(self, payment_hash: bytes, preimage: bytes, *, write_to_disk: bool = True): 1761 assert sha256(preimage) == payment_hash 1762 self.preimages[bh2u(payment_hash)] = bh2u(preimage) 1763 if write_to_disk: 1764 self.wallet.save_db() 1765 1766 def get_preimage(self, payment_hash: bytes) -> Optional[bytes]: 1767 r = self.preimages.get(bh2u(payment_hash)) 1768 return bfh(r) if r else None 1769 1770 def get_payment_info(self, payment_hash: bytes) -> Optional[PaymentInfo]: 1771 """returns None if payment_hash is a payment we are forwarding""" 1772 key = payment_hash.hex() 1773 with self.lock: 1774 if key in self.payments: 1775 amount_msat, direction, status = self.payments[key] 1776 return PaymentInfo(payment_hash, amount_msat, direction, status) 1777 1778 def save_payment_info(self, info: PaymentInfo, *, write_to_disk: bool = True) -> None: 1779 key = info.payment_hash.hex() 1780 assert info.status in SAVED_PR_STATUS 1781 with self.lock: 1782 self.payments[key] = info.amount_msat, info.direction, info.status 1783 if write_to_disk: 1784 self.wallet.save_db() 1785 1786 def check_received_mpp_htlc(self, payment_secret, short_channel_id, htlc: UpdateAddHtlc, expected_msat: int) -> Optional[bool]: 1787 """ return MPP status: True (accepted), False (expired) or None """ 1788 payment_hash = htlc.payment_hash 1789 is_expired, is_accepted, htlc_set = self.received_mpp_htlcs.get(payment_secret, (False, False, set())) 1790 if self.get_payment_status(payment_hash) == PR_PAID: 1791 # payment_status is persisted 1792 is_accepted = True 1793 is_expired = False 1794 key = (short_channel_id, htlc) 1795 if key not in htlc_set: 1796 htlc_set.add(key) 1797 if not is_accepted and not is_expired: 1798 total = sum([_htlc.amount_msat for scid, _htlc in htlc_set]) 1799 first_timestamp = min([_htlc.timestamp for scid, _htlc in htlc_set]) 1800 if self.stopping_soon: 1801 is_expired = True # try to time out pending HTLCs before shutting down 1802 elif time.time() - first_timestamp > self.MPP_EXPIRY: 1803 is_expired = True 1804 elif total == expected_msat: 1805 is_accepted = True 1806 if is_accepted or is_expired: 1807 htlc_set.remove(key) 1808 if len(htlc_set) > 0: 1809 self.received_mpp_htlcs[payment_secret] = is_expired, is_accepted, htlc_set 1810 elif payment_secret in self.received_mpp_htlcs: 1811 self.received_mpp_htlcs.pop(payment_secret) 1812 return True if is_accepted else (False if is_expired else None) 1813 1814 def get_payment_status(self, payment_hash: bytes) -> int: 1815 info = self.get_payment_info(payment_hash) 1816 return info.status if info else PR_UNPAID 1817 1818 def get_invoice_status(self, invoice: LNInvoice) -> int: 1819 key = invoice.rhash 1820 log = self.logs[key] 1821 if key in self.inflight_payments: 1822 return PR_INFLIGHT 1823 # status may be PR_FAILED 1824 status = self.get_payment_status(bfh(key)) 1825 if status == PR_UNPAID and log: 1826 status = PR_FAILED 1827 return status 1828 1829 def set_invoice_status(self, key: str, status: int) -> None: 1830 if status == PR_INFLIGHT: 1831 self.inflight_payments.add(key) 1832 elif key in self.inflight_payments: 1833 self.inflight_payments.remove(key) 1834 if status in SAVED_PR_STATUS: 1835 self.set_payment_status(bfh(key), status) 1836 util.trigger_callback('invoice_status', self.wallet, key) 1837 1838 def set_request_status(self, payment_hash: bytes, status: int) -> None: 1839 if self.get_payment_status(payment_hash) != status: 1840 self.set_payment_status(payment_hash, status) 1841 util.trigger_callback('request_status', self.wallet, payment_hash.hex(), status) 1842 1843 def set_payment_status(self, payment_hash: bytes, status: int) -> None: 1844 info = self.get_payment_info(payment_hash) 1845 if info is None: 1846 # if we are forwarding 1847 return 1848 info = info._replace(status=status) 1849 self.save_payment_info(info) 1850 1851 def htlc_fulfilled(self, chan, payment_hash: bytes, htlc_id:int): 1852 util.trigger_callback('htlc_fulfilled', payment_hash, chan.channel_id) 1853 q = self.sent_htlcs.get(payment_hash) 1854 if q: 1855 route, payment_secret, amount_msat, bucket_msat, amount_receiver_msat = self.sent_htlcs_routes[(payment_hash, chan.short_channel_id, htlc_id)] 1856 htlc_log = HtlcLog( 1857 success=True, 1858 route=route, 1859 amount_msat=amount_receiver_msat) 1860 q.put_nowait(htlc_log) 1861 else: 1862 key = payment_hash.hex() 1863 self.set_invoice_status(key, PR_PAID) 1864 util.trigger_callback('payment_succeeded', self.wallet, key) 1865 1866 def htlc_failed( 1867 self, 1868 chan: Channel, 1869 payment_hash: bytes, 1870 htlc_id: int, 1871 error_bytes: Optional[bytes], 1872 failure_message: Optional['OnionRoutingFailure']): 1873 1874 util.trigger_callback('htlc_failed', payment_hash, chan.channel_id) 1875 q = self.sent_htlcs.get(payment_hash) 1876 if q: 1877 # detect if it is part of a bucket 1878 # if yes, wait until the bucket completely failed 1879 key = (payment_hash, chan.short_channel_id, htlc_id) 1880 route, payment_secret, amount_msat, bucket_msat, amount_receiver_msat = self.sent_htlcs_routes[key] 1881 if error_bytes: 1882 # TODO "decode_onion_error" might raise, catch and maybe blacklist/penalise someone? 1883 try: 1884 failure_message, sender_idx = chan.decode_onion_error(error_bytes, route, htlc_id) 1885 except Exception as e: 1886 sender_idx = None 1887 failure_message = OnionRoutingFailure(-1, str(e)) 1888 else: 1889 # probably got "update_fail_malformed_htlc". well... who to penalise now? 1890 assert failure_message is not None 1891 sender_idx = None 1892 self.logger.info(f"htlc_failed {failure_message}") 1893 1894 # check sent_buckets if we use trampoline 1895 if not self.channel_db and payment_secret in self.sent_buckets: 1896 amount_sent, amount_failed = self.sent_buckets[payment_secret] 1897 amount_failed += amount_receiver_msat 1898 self.sent_buckets[payment_secret] = amount_sent, amount_failed 1899 if amount_sent != amount_failed: 1900 self.logger.info('bucket still active...') 1901 return 1902 self.logger.info('bucket failed') 1903 amount_receiver_msat = amount_sent 1904 1905 htlc_log = HtlcLog( 1906 success=False, 1907 route=route, 1908 amount_msat=amount_receiver_msat, 1909 error_bytes=error_bytes, 1910 failure_msg=failure_message, 1911 sender_idx=sender_idx) 1912 q.put_nowait(htlc_log) 1913 else: 1914 self.logger.info(f"received unknown htlc_failed, probably from previous session") 1915 key = payment_hash.hex() 1916 self.set_invoice_status(key, PR_UNPAID) 1917 util.trigger_callback('payment_failed', self.wallet, key, '') 1918 1919 async def _calc_routing_hints_for_invoice(self, amount_msat: Optional[int]): 1920 """calculate routing hints (BOLT-11 'r' field)""" 1921 routing_hints = [] 1922 channels = list(self.channels.values()) 1923 # do minimal filtering of channels. 1924 # we include channels that cannot *right now* receive (e.g. peer disconnected or balance insufficient) 1925 channels = [chan for chan in channels 1926 if (chan.is_open() and not chan.is_frozen_for_receiving())] 1927 # Filter out channels that have very low receive capacity compared to invoice amt. 1928 # Even with MPP, below a certain threshold, including these channels probably 1929 # hurts more than help, as they lead to many failed attempts for the sender. 1930 channels = [chan for chan in channels 1931 if chan.available_to_spend(REMOTE) > (amount_msat or 0) * 0.05] 1932 # cap max channels to include to keep QR code reasonably scannable 1933 channels = sorted(channels, key=lambda chan: (not chan.is_active(), -chan.available_to_spend(REMOTE))) 1934 channels = channels[:15] 1935 random.shuffle(channels) # let's not leak channel order 1936 scid_to_my_channels = {chan.short_channel_id: chan for chan in channels 1937 if chan.short_channel_id is not None} 1938 for chan in channels: 1939 chan_id = chan.short_channel_id 1940 assert isinstance(chan_id, bytes), chan_id 1941 channel_info = get_mychannel_info(chan_id, scid_to_my_channels) 1942 # note: as a fallback, if we don't have a channel update for the 1943 # incoming direction of our private channel, we fill the invoice with garbage. 1944 # the sender should still be able to pay us, but will incur an extra round trip 1945 # (they will get the channel update from the onion error) 1946 # at least, that's the theory. https://github.com/lightningnetwork/lnd/issues/2066 1947 fee_base_msat = fee_proportional_millionths = 0 1948 cltv_expiry_delta = 1 # lnd won't even try with zero 1949 missing_info = True 1950 if channel_info: 1951 policy = get_mychannel_policy(channel_info.short_channel_id, chan.node_id, scid_to_my_channels) 1952 if policy: 1953 fee_base_msat = policy.fee_base_msat 1954 fee_proportional_millionths = policy.fee_proportional_millionths 1955 cltv_expiry_delta = policy.cltv_expiry_delta 1956 missing_info = False 1957 if missing_info: 1958 self.logger.info( 1959 f"Warning. Missing channel update for our channel {chan_id}; " 1960 f"filling invoice with incorrect data.") 1961 routing_hints.append(('r', [( 1962 chan.node_id, 1963 chan_id, 1964 fee_base_msat, 1965 fee_proportional_millionths, 1966 cltv_expiry_delta)])) 1967 return routing_hints 1968 1969 def delete_payment(self, payment_hash_hex: str): 1970 try: 1971 with self.lock: 1972 del self.payments[payment_hash_hex] 1973 except KeyError: 1974 return 1975 self.wallet.save_db() 1976 1977 def get_balance(self): 1978 with self.lock: 1979 return Decimal(sum( 1980 chan.balance(LOCAL) if not chan.is_closed() else 0 1981 for chan in self.channels.values())) / 1000 1982 1983 def num_sats_can_send(self) -> Decimal: 1984 can_send = 0 1985 with self.lock: 1986 if self.channels: 1987 for c in self.channels.values(): 1988 if c.is_active() and not c.is_frozen_for_sending(): 1989 can_send += c.available_to_spend(LOCAL) 1990 # Here we have to guess a fee, because some callers (submarine swaps) 1991 # use this method to initiate a payment, which would otherwise fail. 1992 fee_base_msat = TRAMPOLINE_FEES[3]['fee_base_msat'] 1993 fee_proportional_millionths = TRAMPOLINE_FEES[3]['fee_proportional_millionths'] 1994 # inverse of fee_for_edge_msat 1995 can_send_minus_fees = (can_send - fee_base_msat) * 1_000_000 // ( 1_000_000 + fee_proportional_millionths) 1996 can_send_minus_fees = max(0, can_send_minus_fees) 1997 return Decimal(can_send_minus_fees) / 1000 1998 1999 def num_sats_can_receive(self) -> Decimal: 2000 with self.lock: 2001 channels = [ 2002 c for c in self.channels.values() 2003 if c.is_active() and not c.is_frozen_for_receiving() 2004 ] 2005 can_receive = sum([c.available_to_spend(REMOTE) for c in channels]) if channels else 0 2006 return Decimal(can_receive) / 1000 2007 2008 def num_sats_can_receive_no_mpp(self) -> Decimal: 2009 with self.lock: 2010 channels = [ 2011 c for c in self.channels.values() 2012 if c.is_active() and not c.is_frozen_for_receiving() 2013 ] 2014 can_receive = max([c.available_to_spend(REMOTE) for c in channels]) if channels else 0 2015 return Decimal(can_receive) / 1000 2016 2017 def can_pay_invoice(self, invoice: LNInvoice) -> bool: 2018 return invoice.get_amount_sat() <= self.num_sats_can_send() 2019 2020 def can_receive_invoice(self, invoice: LNInvoice) -> bool: 2021 return invoice.get_amount_sat() <= self.num_sats_can_receive() 2022 2023 async def close_channel(self, chan_id): 2024 chan = self._channels[chan_id] 2025 peer = self._peers[chan.node_id] 2026 return await peer.close_channel(chan_id) 2027 2028 async def force_close_channel(self, chan_id): 2029 # returns txid or raises 2030 chan = self._channels[chan_id] 2031 tx = chan.force_close_tx() 2032 await self.network.broadcast_transaction(tx) 2033 chan.set_state(ChannelState.FORCE_CLOSING) 2034 return tx.txid() 2035 2036 async def try_force_closing(self, chan_id): 2037 # fails silently but sets the state, so that we will retry later 2038 chan = self._channels[chan_id] 2039 tx = chan.force_close_tx() 2040 chan.set_state(ChannelState.FORCE_CLOSING) 2041 await self.network.try_broadcasting(tx, 'force-close') 2042 2043 def remove_channel(self, chan_id): 2044 chan = self.channels[chan_id] 2045 assert chan.can_be_deleted() 2046 with self.lock: 2047 self._channels.pop(chan_id) 2048 self.db.get('channels').pop(chan_id.hex()) 2049 for addr in chan.get_wallet_addresses_channel_might_want_reserved(): 2050 self.wallet.set_reserved_state_of_address(addr, reserved=False) 2051 2052 util.trigger_callback('channels_updated', self.wallet) 2053 util.trigger_callback('wallet_updated', self.wallet) 2054 2055 @ignore_exceptions 2056 @log_exceptions 2057 async def reestablish_peer_for_given_channel(self, chan: Channel) -> None: 2058 now = time.time() 2059 peer_addresses = [] 2060 if not self.channel_db: 2061 addr = trampolines_by_id().get(chan.node_id) 2062 if addr: 2063 peer_addresses.append(addr) 2064 else: 2065 # will try last good address first, from gossip 2066 last_good_addr = self.channel_db.get_last_good_address(chan.node_id) 2067 if last_good_addr: 2068 peer_addresses.append(last_good_addr) 2069 # will try addresses for node_id from gossip 2070 addrs_from_gossip = self.channel_db.get_node_addresses(chan.node_id) or [] 2071 for host, port, ts in addrs_from_gossip: 2072 peer_addresses.append(LNPeerAddr(host, port, chan.node_id)) 2073 # will try addresses stored in channel storage 2074 peer_addresses += list(chan.get_peer_addresses()) 2075 # Done gathering addresses. 2076 # Now select first one that has not failed recently. 2077 for peer in peer_addresses: 2078 if self._can_retry_addr(peer, urgent=True, now=now): 2079 await self._add_peer(peer.host, peer.port, peer.pubkey) 2080 return 2081 2082 async def reestablish_peers_and_channels(self): 2083 while True: 2084 await asyncio.sleep(1) 2085 if self.stopping_soon: 2086 return 2087 for chan in self.channels.values(): 2088 if chan.is_closed(): 2089 continue 2090 # reestablish 2091 if not chan.should_try_to_reestablish_peer(): 2092 continue 2093 peer = self._peers.get(chan.node_id, None) 2094 if peer: 2095 await peer.taskgroup.spawn(peer.reestablish_channel(chan)) 2096 else: 2097 await self.taskgroup.spawn(self.reestablish_peer_for_given_channel(chan)) 2098 2099 def current_feerate_per_kw(self): 2100 from .simple_config import FEE_LN_ETA_TARGET, FEERATE_FALLBACK_STATIC_FEE, FEERATE_REGTEST_HARDCODED 2101 if constants.net is constants.BitcoinRegtest: 2102 return FEERATE_REGTEST_HARDCODED // 4 2103 feerate_per_kvbyte = self.network.config.eta_target_to_fee(FEE_LN_ETA_TARGET) 2104 if feerate_per_kvbyte is None: 2105 feerate_per_kvbyte = FEERATE_FALLBACK_STATIC_FEE 2106 return max(253, feerate_per_kvbyte // 4) 2107 2108 def create_channel_backup(self, channel_id): 2109 chan = self._channels[channel_id] 2110 # do not backup old-style channels 2111 assert chan.is_static_remotekey_enabled() 2112 peer_addresses = list(chan.get_peer_addresses()) 2113 peer_addr = peer_addresses[0] 2114 return ImportedChannelBackupStorage( 2115 node_id = chan.node_id, 2116 privkey = self.node_keypair.privkey, 2117 funding_txid = chan.funding_outpoint.txid, 2118 funding_index = chan.funding_outpoint.output_index, 2119 funding_address = chan.get_funding_address(), 2120 host = peer_addr.host, 2121 port = peer_addr.port, 2122 is_initiator = chan.constraints.is_initiator, 2123 channel_seed = chan.config[LOCAL].channel_seed, 2124 local_delay = chan.config[LOCAL].to_self_delay, 2125 remote_delay = chan.config[REMOTE].to_self_delay, 2126 remote_revocation_pubkey = chan.config[REMOTE].revocation_basepoint.pubkey, 2127 remote_payment_pubkey = chan.config[REMOTE].payment_basepoint.pubkey) 2128 2129 def export_channel_backup(self, channel_id): 2130 xpub = self.wallet.get_fingerprint() 2131 backup_bytes = self.create_channel_backup(channel_id).to_bytes() 2132 assert backup_bytes == ImportedChannelBackupStorage.from_bytes(backup_bytes).to_bytes(), "roundtrip failed" 2133 encrypted = pw_encode_with_version_and_mac(backup_bytes, xpub) 2134 assert backup_bytes == pw_decode_with_version_and_mac(encrypted, xpub), "encrypt failed" 2135 return 'channel_backup:' + encrypted 2136 2137 async def request_force_close(self, channel_id: bytes, *, connect_str=None) -> None: 2138 if channel_id in self.channels: 2139 chan = self.channels[channel_id] 2140 peer = self._peers.get(chan.node_id) 2141 if not peer: 2142 raise Exception('Peer not found') 2143 chan.should_request_force_close = True 2144 peer.close_and_cleanup() 2145 elif connect_str: 2146 peer = await self.add_peer(connect_str) 2147 await peer.trigger_force_close(channel_id) 2148 elif channel_id in self.channel_backups: 2149 await self._request_force_close_from_backup(channel_id) 2150 else: 2151 raise Exception(f'Unknown channel {channel_id.hex()}') 2152 2153 def import_channel_backup(self, data): 2154 assert data.startswith('channel_backup:') 2155 encrypted = data[15:] 2156 xpub = self.wallet.get_fingerprint() 2157 decrypted = pw_decode_with_version_and_mac(encrypted, xpub) 2158 cb_storage = ImportedChannelBackupStorage.from_bytes(decrypted) 2159 channel_id = cb_storage.channel_id() 2160 if channel_id.hex() in self.db.get_dict("channels"): 2161 raise Exception('Channel already in wallet') 2162 self.logger.info(f'importing channel backup: {channel_id.hex()}') 2163 d = self.db.get_dict("imported_channel_backups") 2164 d[channel_id.hex()] = cb_storage 2165 with self.lock: 2166 cb = ChannelBackup(cb_storage, sweep_address=self.sweep_address, lnworker=self) 2167 self._channel_backups[channel_id] = cb 2168 self.wallet.save_db() 2169 util.trigger_callback('channels_updated', self.wallet) 2170 self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address()) 2171 2172 def has_conflicting_backup_with(self, remote_node_id: bytes): 2173 """ Returns whether we have an active channel with this node on another device, using same local node id. """ 2174 channel_backup_peers = [ 2175 cb.node_id for cb in self.channel_backups.values() 2176 if (not cb.is_closed() and cb.get_local_pubkey() == self.node_keypair.pubkey)] 2177 return any(remote_node_id.startswith(cb_peer_nodeid) for cb_peer_nodeid in channel_backup_peers) 2178 2179 def remove_channel_backup(self, channel_id): 2180 chan = self.channel_backups[channel_id] 2181 assert chan.can_be_deleted() 2182 onchain_backups = self.db.get_dict("onchain_channel_backups") 2183 imported_backups = self.db.get_dict("imported_channel_backups") 2184 if channel_id.hex() in onchain_backups: 2185 onchain_backups.pop(channel_id.hex()) 2186 elif channel_id.hex() in imported_backups: 2187 imported_backups.pop(channel_id.hex()) 2188 else: 2189 raise Exception('Channel not found') 2190 with self.lock: 2191 self._channel_backups.pop(channel_id) 2192 self.wallet.save_db() 2193 util.trigger_callback('channels_updated', self.wallet) 2194 2195 @log_exceptions 2196 async def _request_force_close_from_backup(self, channel_id: bytes): 2197 cb = self.channel_backups.get(channel_id) 2198 if not cb: 2199 raise Exception(f'channel backup not found {self.channel_backups}') 2200 cb = cb.cb # storage 2201 self.logger.info(f'requesting channel force close: {channel_id.hex()}') 2202 if isinstance(cb, ImportedChannelBackupStorage): 2203 node_id = cb.node_id 2204 privkey = cb.privkey 2205 addresses = [(cb.host, cb.port, 0)] 2206 # TODO also try network addresses from gossip db (as it might have changed) 2207 else: 2208 assert isinstance(cb, OnchainChannelBackupStorage) 2209 if not self.channel_db: 2210 raise Exception('Enable gossip first') 2211 node_id = self.network.channel_db.get_node_by_prefix(cb.node_id_prefix) 2212 privkey = self.node_keypair.privkey 2213 addresses = self.network.channel_db.get_node_addresses(node_id) 2214 if not addresses: 2215 raise Exception('Peer not found in gossip database') 2216 for host, port, timestamp in addresses: 2217 peer_addr = LNPeerAddr(host, port, node_id) 2218 transport = LNTransport(privkey, peer_addr, proxy=self.network.proxy) 2219 peer = Peer(self, node_id, transport, is_channel_backup=True) 2220 try: 2221 async with TaskGroup(wait=any) as group: 2222 await group.spawn(peer._message_loop()) 2223 await group.spawn(peer.trigger_force_close(channel_id)) 2224 return 2225 except Exception as e: 2226 self.logger.info(f'failed to connect {host} {e}') 2227 continue 2228 # TODO close/cleanup the transport 2229 else: 2230 raise Exception('failed to connect') 2231 2232 def maybe_add_backup_from_tx(self, tx): 2233 funding_address = None 2234 node_id_prefix = None 2235 for i, o in enumerate(tx.outputs()): 2236 script_type = get_script_type_from_output_script(o.scriptpubkey) 2237 if script_type == 'p2wsh': 2238 funding_index = i 2239 funding_address = o.address 2240 for o2 in tx.outputs(): 2241 if o2.scriptpubkey.startswith(bytes([opcodes.OP_RETURN])): 2242 encrypted_data = o2.scriptpubkey[2:] 2243 data = self.decrypt_cb_data(encrypted_data, funding_address) 2244 if data.startswith(CB_MAGIC_BYTES): 2245 node_id_prefix = data[4:] 2246 if node_id_prefix is None: 2247 return 2248 funding_txid = tx.txid() 2249 cb_storage = OnchainChannelBackupStorage( 2250 node_id_prefix = node_id_prefix, 2251 funding_txid = funding_txid, 2252 funding_index = funding_index, 2253 funding_address = funding_address, 2254 is_initiator = True) 2255 channel_id = cb_storage.channel_id().hex() 2256 if channel_id in self.db.get_dict("channels"): 2257 return 2258 self.logger.info(f"adding backup from tx") 2259 d = self.db.get_dict("onchain_channel_backups") 2260 d[channel_id] = cb_storage 2261 cb = ChannelBackup(cb_storage, sweep_address=self.sweep_address, lnworker=self) 2262 self.wallet.save_db() 2263 with self.lock: 2264 self._channel_backups[bfh(channel_id)] = cb 2265 util.trigger_callback('channels_updated', self.wallet) 2266 self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address()) 2267