1#!/usr/local/bin/python3.8 2# 3# Copyright (C) 2018 The Electrum developers 4# Distributed under the MIT software license, see the accompanying 5# file LICENCE or http://www.opensource.org/licenses/mit-license.php 6 7import zlib 8from collections import OrderedDict, defaultdict 9import asyncio 10import os 11import time 12from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set 13from datetime import datetime 14import functools 15 16import aiorpcx 17from aiorpcx import TaskGroup 18 19from .crypto import sha256, sha256d 20from . import bitcoin, util 21from . import ecc 22from .ecc import sig_string_from_r_and_s, der_sig_from_sig_string 23from . import constants 24from .util import (bh2u, bfh, log_exceptions, ignore_exceptions, chunks, SilentTaskGroup, 25 UnrelatedTransactionException) 26from . import transaction 27from .bitcoin import make_op_return 28from .transaction import PartialTxOutput, match_script_against_template 29from .logging import Logger 30from .lnonion import (new_onion_packet, OnionFailureCode, calc_hops_data_for_payment, 31 process_onion_packet, OnionPacket, construct_onion_error, OnionRoutingFailure, 32 ProcessedOnionPacket, UnsupportedOnionPacketVersion, InvalidOnionMac, InvalidOnionPubkey, 33 OnionFailureCodeMetaFlag) 34from .lnchannel import Channel, RevokeAndAck, RemoteCtnTooFarInFuture, ChannelState, PeerState 35from . import lnutil 36from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc, 37 RemoteConfig, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore, 38 funding_output_script, get_per_commitment_secret_from_seed, 39 secret_to_pubkey, PaymentFailure, LnFeatures, 40 LOCAL, REMOTE, HTLCOwner, 41 ln_compare_features, privkey_to_pubkey, MIN_FINAL_CLTV_EXPIRY_ACCEPTED, 42 LightningPeerConnectionClosed, HandshakeFailed, 43 RemoteMisbehaving, ShortChannelID, 44 IncompatibleLightningFeatures, derive_payment_secret_from_payment_preimage, 45 LN_MAX_FUNDING_SAT, calc_fees_for_commitment_tx, 46 UpfrontShutdownScriptViolation) 47from .lnutil import FeeUpdate, channel_id_from_funding_tx 48from .lntransport import LNTransport, LNTransportBase 49from .lnmsg import encode_msg, decode_msg, UnknownOptionalMsgType 50from .interface import GracefulDisconnect 51from .lnrouter import fee_for_edge_msat 52from .lnutil import ln_dummy_address 53from .json_db import StoredDict 54from .invoices import PR_PAID 55 56if TYPE_CHECKING: 57 from .lnworker import LNGossip, LNWallet 58 from .lnrouter import LNPaymentRoute 59 from .transaction import PartialTransaction 60 61 62LN_P2P_NETWORK_TIMEOUT = 20 63 64 65class Peer(Logger): 66 LOGGING_SHORTCUT = 'P' 67 68 ORDERED_MESSAGES = ( 69 'accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'channel_reestablish', 'closing_signed') 70 SPAMMY_MESSAGES = ( 71 'ping', 'pong', 'channel_announcement', 'node_announcement', 'channel_update',) 72 73 def __init__( 74 self, 75 lnworker: Union['LNGossip', 'LNWallet'], 76 pubkey: bytes, 77 transport: LNTransportBase, 78 *, is_channel_backup= False): 79 80 self.is_channel_backup = is_channel_backup 81 self._sent_init = False # type: bool 82 self._received_init = False # type: bool 83 self.initialized = asyncio.Future() 84 self.got_disconnected = asyncio.Event() 85 self.querying = asyncio.Event() 86 self.transport = transport 87 self.pubkey = pubkey # remote pubkey 88 self.lnworker = lnworker 89 self.privkey = self.transport.privkey # local privkey 90 self.features = self.lnworker.features # type: LnFeatures 91 self.their_features = LnFeatures(0) # type: LnFeatures 92 self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)] 93 assert self.node_ids[0] != self.node_ids[1] 94 self.network = lnworker.network 95 self.ping_time = 0 96 self.reply_channel_range = asyncio.Queue() 97 # gossip uses a single queue to preserve message order 98 self.gossip_queue = asyncio.Queue() 99 self.ordered_message_queues = defaultdict(asyncio.Queue) # for messsage that are ordered 100 self.temp_id_to_id = {} # to forward error messages 101 self.funding_created_sent = set() # for channels in PREOPENING 102 self.funding_signed_sent = set() # for channels in PREOPENING 103 self.shutdown_received = {} # chan_id -> asyncio.Future() 104 self.announcement_signatures = defaultdict(asyncio.Queue) 105 self.orphan_channel_updates = OrderedDict() # type: OrderedDict[ShortChannelID, dict] 106 Logger.__init__(self) 107 self.taskgroup = SilentTaskGroup() 108 # HTLCs offered by REMOTE, that we started removing but are still active: 109 self.received_htlcs_pending_removal = set() # type: Set[Tuple[Channel, int]] 110 self.received_htlc_removed_event = asyncio.Event() 111 self._htlc_switch_iterstart_event = asyncio.Event() 112 self._htlc_switch_iterdone_event = asyncio.Event() 113 114 def send_message(self, message_name: str, **kwargs): 115 assert type(message_name) is str 116 if message_name not in self.SPAMMY_MESSAGES: 117 self.logger.debug(f"Sending {message_name.upper()}") 118 if message_name.upper() != "INIT" and not self.is_initialized(): 119 raise Exception("tried to send message before we are initialized") 120 raw_msg = encode_msg(message_name, **kwargs) 121 self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id")) 122 self.transport.send_bytes(raw_msg) 123 124 def _store_raw_msg_if_local_update(self, raw_msg: bytes, *, message_name: str, channel_id: Optional[bytes]): 125 is_commitment_signed = message_name == "commitment_signed" 126 if not (message_name.startswith("update_") or is_commitment_signed): 127 return 128 assert channel_id 129 chan = self.get_channel_by_id(channel_id) 130 if not chan: 131 raise Exception(f"channel {channel_id.hex()} not found for peer {self.pubkey.hex()}") 132 chan.hm.store_local_update_raw_msg(raw_msg, is_commitment_signed=is_commitment_signed) 133 if is_commitment_signed: 134 # saving now, to ensure replaying updates works (in case of channel reestablishment) 135 self.lnworker.save_channel(chan) 136 137 def maybe_set_initialized(self): 138 if self.initialized.done(): 139 return 140 if self._sent_init and self._received_init: 141 self.initialized.set_result(True) 142 143 def is_initialized(self) -> bool: 144 return (self.initialized.done() 145 and not self.initialized.cancelled() 146 and self.initialized.exception() is None 147 and self.initialized.result() is True) 148 149 async def initialize(self): 150 if isinstance(self.transport, LNTransport): 151 await self.transport.handshake() 152 features = self.features.for_init_message() 153 b = int.bit_length(features) 154 flen = b // 8 + int(bool(b % 8)) 155 self.send_message( 156 "init", gflen=0, flen=flen, 157 features=features, 158 init_tlvs={ 159 'networks': 160 {'chains': constants.net.rev_genesis_bytes()} 161 }) 162 self._sent_init = True 163 self.maybe_set_initialized() 164 165 @property 166 def channels(self) -> Dict[bytes, Channel]: 167 return self.lnworker.channels_for_peer(self.pubkey) 168 169 def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]: 170 # note: this is faster than self.channels.get(channel_id) 171 chan = self.lnworker.get_channel_by_id(channel_id) 172 if not chan: 173 return None 174 if chan.node_id != self.pubkey: 175 return None 176 return chan 177 178 def diagnostic_name(self): 179 return self.lnworker.__class__.__name__ + ', ' + self.transport.name() 180 181 def ping_if_required(self): 182 if time.time() - self.ping_time > 120: 183 self.send_message('ping', num_pong_bytes=4, byteslen=4) 184 self.ping_time = time.time() 185 186 def process_message(self, message): 187 try: 188 message_type, payload = decode_msg(message) 189 except UnknownOptionalMsgType as e: 190 self.logger.info(f"received unknown message from peer. ignoring: {e!r}") 191 return 192 if message_type not in self.SPAMMY_MESSAGES: 193 self.logger.debug(f"Received {message_type.upper()}") 194 # only process INIT if we are a backup 195 if self.is_channel_backup is True and message_type != 'init': 196 return 197 if message_type in self.ORDERED_MESSAGES: 198 chan_id = payload.get('channel_id') or payload["temporary_channel_id"] 199 self.ordered_message_queues[chan_id].put_nowait((message_type, payload)) 200 else: 201 if message_type != 'error' and 'channel_id' in payload: 202 chan = self.get_channel_by_id(payload['channel_id']) 203 if chan is None: 204 raise Exception('Got unknown '+ message_type) 205 args = (chan, payload) 206 else: 207 args = (payload,) 208 try: 209 f = getattr(self, 'on_' + message_type) 210 except AttributeError: 211 #self.logger.info("Received '%s'" % message_type.upper(), payload) 212 return 213 # raw message is needed to check signature 214 if message_type in ['node_announcement', 'channel_announcement', 'channel_update']: 215 payload['raw'] = message 216 execution_result = f(*args) 217 if asyncio.iscoroutinefunction(f): 218 asyncio.ensure_future(self.taskgroup.spawn(execution_result)) 219 220 def on_error(self, payload): 221 self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}") 222 chan_id = payload.get("channel_id") 223 if chan_id in self.temp_id_to_id: 224 chan_id = self.temp_id_to_id[chan_id] 225 self.ordered_message_queues[chan_id].put_nowait((None, {'error':payload['data']})) 226 227 def on_ping(self, payload): 228 l = payload['num_pong_bytes'] 229 self.send_message('pong', byteslen=l) 230 231 def on_pong(self, payload): 232 pass 233 234 async def wait_for_message(self, expected_name, channel_id): 235 q = self.ordered_message_queues[channel_id] 236 name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT) 237 if payload.get('error'): 238 raise Exception('Remote peer reported error [DO NOT TRUST THIS MESSAGE]: ' + repr(payload.get('error'))) 239 if name != expected_name: 240 raise Exception(f"Received unexpected '{name}'") 241 return payload 242 243 def on_init(self, payload): 244 if self._received_init: 245 self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT") 246 return 247 self.their_features = LnFeatures(int.from_bytes(payload['features'], byteorder="big")) 248 their_globalfeatures = int.from_bytes(payload['globalfeatures'], byteorder="big") 249 self.their_features |= their_globalfeatures 250 # check transitive dependencies for received features 251 if not self.their_features.validate_transitive_dependencies(): 252 raise GracefulDisconnect("remote did not set all dependencies for the features they sent") 253 # check if features are compatible, and set self.features to what we negotiated 254 try: 255 self.features = ln_compare_features(self.features, self.their_features) 256 except IncompatibleLightningFeatures as e: 257 self.initialized.set_exception(e) 258 raise GracefulDisconnect(f"{str(e)}") 259 # check that they are on the same chain as us, if provided 260 their_networks = payload["init_tlvs"].get("networks") 261 if their_networks: 262 their_chains = list(chunks(their_networks["chains"], 32)) 263 if constants.net.rev_genesis_bytes() not in their_chains: 264 raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})") 265 # all checks passed 266 self.lnworker.on_peer_successfully_established(self) 267 self._received_init = True 268 self.maybe_set_initialized() 269 270 def on_node_announcement(self, payload): 271 if self.lnworker.channel_db: 272 self.gossip_queue.put_nowait(('node_announcement', payload)) 273 274 def on_channel_announcement(self, payload): 275 if self.lnworker.channel_db: 276 self.gossip_queue.put_nowait(('channel_announcement', payload)) 277 278 def on_channel_update(self, payload): 279 self.maybe_save_remote_update(payload) 280 if self.lnworker.channel_db: 281 self.gossip_queue.put_nowait(('channel_update', payload)) 282 283 def maybe_save_remote_update(self, payload): 284 if not self.channels: 285 return 286 for chan in self.channels.values(): 287 if chan.short_channel_id == payload['short_channel_id']: 288 chan.set_remote_update(payload) 289 self.logger.info("saved remote_update") 290 break 291 else: 292 # Save (some bounded number of) orphan channel updates for later 293 # as it might be for our own direct channel with this peer 294 # (and we might not yet know the short channel id for that) 295 # Background: this code is here to deal with a bug in LND, 296 # see https://github.com/lightningnetwork/lnd/issues/3651 297 # and https://github.com/lightningnetwork/lightning-rfc/pull/657 298 # This code assumes gossip_queries is set. BOLT7: "if the 299 # gossip_queries feature is negotiated, [a node] MUST NOT 300 # send gossip it did not generate itself" 301 short_channel_id = ShortChannelID(payload['short_channel_id']) 302 self.logger.info(f'received orphan channel update {short_channel_id}') 303 self.orphan_channel_updates[short_channel_id] = payload 304 while len(self.orphan_channel_updates) > 25: 305 self.orphan_channel_updates.popitem(last=False) 306 307 def on_announcement_signatures(self, chan: Channel, payload): 308 if chan.config[LOCAL].was_announced: 309 h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan) 310 else: 311 self.announcement_signatures[chan.channel_id].put_nowait(payload) 312 313 def handle_disconnect(func): 314 @functools.wraps(func) 315 async def wrapper_func(self, *args, **kwargs): 316 try: 317 return await func(self, *args, **kwargs) 318 except GracefulDisconnect as e: 319 self.logger.log(e.log_level, f"Disconnecting: {repr(e)}") 320 except (LightningPeerConnectionClosed, IncompatibleLightningFeatures, 321 aiorpcx.socks.SOCKSError) as e: 322 self.logger.info(f"Disconnecting: {repr(e)}") 323 finally: 324 self.close_and_cleanup() 325 return wrapper_func 326 327 @ignore_exceptions # do not kill outer taskgroup 328 @log_exceptions 329 @handle_disconnect 330 async def main_loop(self): 331 async with self.taskgroup as group: 332 await group.spawn(self._message_loop()) 333 await group.spawn(self.htlc_switch()) 334 await group.spawn(self.query_gossip()) 335 await group.spawn(self.process_gossip()) 336 337 async def process_gossip(self): 338 while True: 339 await asyncio.sleep(5) 340 if not self.network.lngossip: 341 continue 342 chan_anns = [] 343 chan_upds = [] 344 node_anns = [] 345 while True: 346 name, payload = await self.gossip_queue.get() 347 if name == 'channel_announcement': 348 chan_anns.append(payload) 349 elif name == 'channel_update': 350 chan_upds.append(payload) 351 elif name == 'node_announcement': 352 node_anns.append(payload) 353 else: 354 raise Exception('unknown message') 355 if self.gossip_queue.empty(): 356 break 357 if self.network.lngossip: 358 await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds) 359 360 async def query_gossip(self): 361 try: 362 await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT) 363 except Exception as e: 364 raise GracefulDisconnect(f"Failed to initialize: {e!r}") from e 365 if self.lnworker == self.lnworker.network.lngossip: 366 try: 367 ids, complete = await asyncio.wait_for(self.get_channel_range(), LN_P2P_NETWORK_TIMEOUT) 368 except asyncio.TimeoutError as e: 369 raise GracefulDisconnect("query_channel_range timed out") from e 370 self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete)) 371 await self.lnworker.add_new_ids(ids) 372 while True: 373 todo = self.lnworker.get_ids_to_query() 374 if not todo: 375 await asyncio.sleep(1) 376 continue 377 await self.get_short_channel_ids(todo) 378 379 async def get_channel_range(self): 380 first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS 381 num_blocks = self.lnworker.network.get_local_height() - first_block 382 self.query_channel_range(first_block, num_blocks) 383 intervals = [] 384 ids = set() 385 # note: implementations behave differently... 386 # "sane implementation that follows BOLT-07" example: 387 # query_channel_range. <<< first_block 497000, num_blocks 79038 388 # on_reply_channel_range. >>> first_block 497000, num_blocks 39516, num_ids 4648, complete True 389 # on_reply_channel_range. >>> first_block 536516, num_blocks 19758, num_ids 5734, complete True 390 # on_reply_channel_range. >>> first_block 556274, num_blocks 9879, num_ids 13712, complete True 391 # on_reply_channel_range. >>> first_block 566153, num_blocks 9885, num_ids 18114, complete True 392 # lnd example: 393 # query_channel_range. <<< first_block 497000, num_blocks 79038 394 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False 395 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False 396 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False 397 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False 398 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 5344, complete True 399 while True: 400 index, num, complete, _ids = await self.reply_channel_range.get() 401 ids.update(_ids) 402 intervals.append((index, index+num)) 403 intervals.sort() 404 while len(intervals) > 1: 405 a,b = intervals[0] 406 c,d = intervals[1] 407 if not (a <= c and a <= b and c <= d): 408 raise Exception(f"insane reply_channel_range intervals {(a,b,c,d)}") 409 if b >= c: 410 intervals = [(a,d)] + intervals[2:] 411 else: 412 break 413 if len(intervals) == 1 and complete: 414 a, b = intervals[0] 415 if a <= first_block and b >= first_block + num_blocks: 416 break 417 return ids, complete 418 419 def request_gossip(self, timestamp=0): 420 if timestamp == 0: 421 self.logger.info('requesting whole channel graph') 422 else: 423 self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).ctime()}') 424 self.send_message( 425 'gossip_timestamp_filter', 426 chain_hash=constants.net.rev_genesis_bytes(), 427 first_timestamp=timestamp, 428 timestamp_range=b'\xff'*4) 429 430 def query_channel_range(self, first_block, num_blocks): 431 self.logger.info(f'query channel range {first_block} {num_blocks}') 432 self.send_message( 433 'query_channel_range', 434 chain_hash=constants.net.rev_genesis_bytes(), 435 first_blocknum=first_block, 436 number_of_blocks=num_blocks) 437 438 def decode_short_ids(self, encoded): 439 if encoded[0] == 0: 440 decoded = encoded[1:] 441 elif encoded[0] == 1: 442 decoded = zlib.decompress(encoded[1:]) 443 else: 444 raise Exception(f'decode_short_ids: unexpected first byte: {encoded[0]}') 445 ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)] 446 return ids 447 448 def on_reply_channel_range(self, payload): 449 first = payload['first_blocknum'] 450 num = payload['number_of_blocks'] 451 complete = bool(int.from_bytes(payload['complete'], 'big')) 452 encoded = payload['encoded_short_ids'] 453 ids = self.decode_short_ids(encoded) 454 #self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, num_ids {len(ids)}, complete {repr(payload['complete'])}") 455 self.reply_channel_range.put_nowait((first, num, complete, ids)) 456 457 async def get_short_channel_ids(self, ids): 458 self.logger.info(f'Querying {len(ids)} short_channel_ids') 459 assert not self.querying.is_set() 460 self.query_short_channel_ids(ids) 461 await self.querying.wait() 462 self.querying.clear() 463 464 def query_short_channel_ids(self, ids, compressed=True): 465 ids = sorted(ids) 466 s = b''.join(ids) 467 encoded = zlib.compress(s) if compressed else s 468 prefix = b'\x01' if compressed else b'\x00' 469 self.send_message( 470 'query_short_channel_ids', 471 chain_hash=constants.net.rev_genesis_bytes(), 472 len=1+len(encoded), 473 encoded_short_ids=prefix+encoded) 474 475 async def _message_loop(self): 476 try: 477 await asyncio.wait_for(self.initialize(), LN_P2P_NETWORK_TIMEOUT) 478 except (OSError, asyncio.TimeoutError, HandshakeFailed) as e: 479 raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e 480 async for msg in self.transport.read_messages(): 481 self.process_message(msg) 482 await asyncio.sleep(.01) 483 484 def on_reply_short_channel_ids_end(self, payload): 485 self.querying.set() 486 487 def close_and_cleanup(self): 488 # note: This method might get called multiple times! 489 # E.g. if you call close_and_cleanup() to cause a disconnection from the peer, 490 # it will get called a second time in handle_disconnect(). 491 try: 492 if self.transport: 493 self.transport.close() 494 except: 495 pass 496 self.lnworker.peer_closed(self) 497 self.got_disconnected.set() 498 499 def is_static_remotekey(self): 500 return self.features.supports(LnFeatures.OPTION_STATIC_REMOTEKEY_OPT) 501 502 def is_upfront_shutdown_script(self): 503 return self.features.supports(LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT) 504 505 def upfront_shutdown_script_from_payload(self, payload, msg_identifier: str) -> Optional[bytes]: 506 if msg_identifier not in ['accept', 'open']: 507 raise ValueError("msg_identifier must be either 'accept' or 'open'") 508 509 uss_tlv = payload[msg_identifier + '_channel_tlvs'].get( 510 'upfront_shutdown_script') 511 512 if uss_tlv and self.is_upfront_shutdown_script(): 513 upfront_shutdown_script = uss_tlv['shutdown_scriptpubkey'] 514 else: 515 upfront_shutdown_script = b'' 516 self.logger.info(f"upfront shutdown script received: {upfront_shutdown_script}") 517 return upfront_shutdown_script 518 519 def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner) -> LocalConfig: 520 channel_seed = os.urandom(32) 521 initial_msat = funding_sat * 1000 - push_msat if initiator == LOCAL else push_msat 522 523 static_remotekey = None 524 # sending empty bytes as the upfront_shutdown_script will give us the 525 # flexibility to decide an address at closing time 526 upfront_shutdown_script = b'' 527 528 if self.is_static_remotekey(): 529 wallet = self.lnworker.wallet 530 assert wallet.txin_type == 'p2wpkh' 531 addr = wallet.get_new_sweep_address_for_channel() 532 static_remotekey = bfh(wallet.get_public_key(addr)) 533 else: 534 static_remotekey = None 535 dust_limit_sat = bitcoin.DUST_LIMIT_DEFAULT_SAT_LEGACY 536 reserve_sat = max(funding_sat // 100, dust_limit_sat) 537 # for comparison of defaults, see 538 # https://github.com/ACINQ/eclair/blob/afa378fbb73c265da44856b4ad0f2128a88ae6c6/eclair-core/src/main/resources/reference.conf#L66 539 # https://github.com/ElementsProject/lightning/blob/0056dd75572a8857cff36fcbdb1a2295a1ac9253/lightningd/options.c#L657 540 # https://github.com/lightningnetwork/lnd/blob/56b61078c5b2be007d318673a5f3b40c6346883a/config.go#L81 541 local_config = LocalConfig.from_seed( 542 channel_seed=channel_seed, 543 static_remotekey=static_remotekey, 544 upfront_shutdown_script=upfront_shutdown_script, 545 to_self_delay=self.network.config.get('lightning_to_self_delay', 7 * 144), 546 dust_limit_sat=dust_limit_sat, 547 max_htlc_value_in_flight_msat=funding_sat * 1000, 548 max_accepted_htlcs=30, 549 initial_msat=initial_msat, 550 reserve_sat=reserve_sat, 551 funding_locked_received=False, 552 was_announced=False, 553 current_commitment_signature=None, 554 current_htlc_signatures=b'', 555 htlc_minimum_msat=1, 556 ) 557 local_config.validate_params(funding_sat=funding_sat) 558 return local_config 559 560 def temporarily_reserve_funding_tx_change_address(func): 561 # During the channel open flow, if we initiated, we might have used a change address 562 # of ours in the funding tx. The funding tx is not part of the wallet history 563 # at that point yet, but we should already consider this change address as 'used'. 564 @functools.wraps(func) 565 async def wrapper(self: 'Peer', *args, **kwargs): 566 funding_tx = kwargs['funding_tx'] # type: PartialTransaction 567 wallet = self.lnworker.wallet 568 change_addresses = [txout.address for txout in funding_tx.outputs() 569 if wallet.is_change(txout.address)] 570 for addr in change_addresses: 571 wallet.set_reserved_state_of_address(addr, reserved=True) 572 try: 573 return await func(self, *args, **kwargs) 574 finally: 575 for addr in change_addresses: 576 self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False) 577 return wrapper 578 579 @log_exceptions 580 @temporarily_reserve_funding_tx_change_address 581 async def channel_establishment_flow( 582 self, *, 583 funding_tx: 'PartialTransaction', 584 funding_sat: int, 585 push_msat: int, 586 temp_channel_id: bytes 587 ) -> Tuple[Channel, 'PartialTransaction']: 588 """Implements the channel opening flow. 589 590 -> open_channel message 591 <- accept_channel message 592 -> funding_created message 593 <- funding_signed message 594 595 Channel configurations are initialized in this method. 596 """ 597 # will raise if init fails 598 await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT) 599 # trampoline is not yet in features 600 if not self.lnworker.channel_db and not self.lnworker.is_trampoline_peer(self.pubkey): 601 raise Exception('Not a trampoline node: ' + str(self.their_features)) 602 603 if funding_sat > LN_MAX_FUNDING_SAT: 604 raise Exception( 605 f"MUST set funding_satoshis to less than 2^24 satoshi. " 606 f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}") 607 if push_msat > 1000 * funding_sat: 608 raise Exception( 609 f"MUST set push_msat to equal or less than 1000 * funding_satoshis: " 610 f"{push_msat} msat > {1000 * funding_sat} msat") 611 if funding_sat < lnutil.MIN_FUNDING_SAT: 612 raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}") 613 614 feerate = self.lnworker.current_feerate_per_kw() 615 local_config = self.make_local_config(funding_sat, push_msat, LOCAL) 616 617 # for the first commitment transaction 618 per_commitment_secret_first = get_per_commitment_secret_from_seed( 619 local_config.per_commitment_secret_seed, 620 RevocationStore.START_INDEX 621 ) 622 per_commitment_point_first = secret_to_pubkey( 623 int.from_bytes(per_commitment_secret_first, 'big')) 624 self.send_message( 625 "open_channel", 626 temporary_channel_id=temp_channel_id, 627 chain_hash=constants.net.rev_genesis_bytes(), 628 funding_satoshis=funding_sat, 629 push_msat=push_msat, 630 dust_limit_satoshis=local_config.dust_limit_sat, 631 feerate_per_kw=feerate, 632 max_accepted_htlcs=local_config.max_accepted_htlcs, 633 funding_pubkey=local_config.multisig_key.pubkey, 634 revocation_basepoint=local_config.revocation_basepoint.pubkey, 635 htlc_basepoint=local_config.htlc_basepoint.pubkey, 636 payment_basepoint=local_config.payment_basepoint.pubkey, 637 delayed_payment_basepoint=local_config.delayed_basepoint.pubkey, 638 first_per_commitment_point=per_commitment_point_first, 639 to_self_delay=local_config.to_self_delay, 640 max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat, 641 channel_flags=0x00, # not willing to announce channel 642 channel_reserve_satoshis=local_config.reserve_sat, 643 htlc_minimum_msat=local_config.htlc_minimum_msat, 644 open_channel_tlvs={ 645 'upfront_shutdown_script': 646 {'shutdown_scriptpubkey': local_config.upfront_shutdown_script} 647 } 648 ) 649 650 # <- accept_channel 651 payload = await self.wait_for_message('accept_channel', temp_channel_id) 652 remote_per_commitment_point = payload['first_per_commitment_point'] 653 funding_txn_minimum_depth = payload['minimum_depth'] 654 if funding_txn_minimum_depth <= 0: 655 raise Exception(f"minimum depth too low, {funding_txn_minimum_depth}") 656 if funding_txn_minimum_depth > 30: 657 raise Exception(f"minimum depth too high, {funding_txn_minimum_depth}") 658 659 upfront_shutdown_script = self.upfront_shutdown_script_from_payload( 660 payload, 'accept') 661 662 remote_config = RemoteConfig( 663 payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']), 664 multisig_key=OnlyPubkeyKeypair(payload["funding_pubkey"]), 665 htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']), 666 delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']), 667 revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']), 668 to_self_delay=payload['to_self_delay'], 669 dust_limit_sat=payload['dust_limit_satoshis'], 670 max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'], 671 max_accepted_htlcs=payload["max_accepted_htlcs"], 672 initial_msat=push_msat, 673 reserve_sat=payload["channel_reserve_satoshis"], 674 htlc_minimum_msat=payload['htlc_minimum_msat'], 675 next_per_commitment_point=remote_per_commitment_point, 676 current_per_commitment_point=None, 677 upfront_shutdown_script=upfront_shutdown_script 678 ) 679 remote_config.validate_params(funding_sat=funding_sat) 680 # if channel_reserve_satoshis is less than dust_limit_satoshis within the open_channel message: 681 # MUST reject the channel. 682 if remote_config.reserve_sat < local_config.dust_limit_sat: 683 raise Exception("violated constraint: remote_config.reserve_sat < local_config.dust_limit_sat") 684 # if channel_reserve_satoshis from the open_channel message is less than dust_limit_satoshis: 685 # MUST reject the channel. 686 if local_config.reserve_sat < remote_config.dust_limit_sat: 687 raise Exception("violated constraint: local_config.reserve_sat < remote_config.dust_limit_sat") 688 689 # -> funding created 690 # replace dummy output in funding tx 691 redeem_script = funding_output_script(local_config, remote_config) 692 funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script) 693 funding_output = PartialTxOutput.from_address_and_value(funding_address, funding_sat) 694 dummy_output = PartialTxOutput.from_address_and_value(ln_dummy_address(), funding_sat) 695 if dummy_output not in funding_tx.outputs(): raise Exception("LN dummy output (err 1)") 696 funding_tx._outputs.remove(dummy_output) 697 if dummy_output in funding_tx.outputs(): raise Exception("LN dummy output (err 2)") 698 funding_tx.add_outputs([funding_output]) 699 # find and encrypt op_return data associated to funding_address 700 has_onchain_backup = self.lnworker and self.lnworker.has_recoverable_channels() 701 if has_onchain_backup: 702 backup_data = self.lnworker.cb_data(self.pubkey) 703 dummy_scriptpubkey = make_op_return(backup_data) 704 for o in funding_tx.outputs(): 705 if o.scriptpubkey == dummy_scriptpubkey: 706 encrypted_data = self.lnworker.encrypt_cb_data(backup_data, funding_address) 707 assert len(encrypted_data) == len(backup_data) 708 o.scriptpubkey = make_op_return(encrypted_data) 709 break 710 else: 711 raise Exception('op_return output not found in funding tx') 712 # must not be malleable 713 funding_tx.set_rbf(False) 714 if not funding_tx.is_segwit(): 715 raise Exception('Funding transaction is not segwit') 716 funding_txid = funding_tx.txid() 717 assert funding_txid 718 funding_index = funding_tx.outputs().index(funding_output) 719 # build remote commitment transaction 720 channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index) 721 outpoint = Outpoint(funding_txid, funding_index) 722 constraints = ChannelConstraints( 723 capacity=funding_sat, 724 is_initiator=True, 725 funding_txn_minimum_depth=funding_txn_minimum_depth 726 ) 727 storage = self.create_channel_storage( 728 channel_id, outpoint, local_config, remote_config, constraints) 729 chan = Channel( 730 storage, 731 sweep_address=self.lnworker.sweep_address, 732 lnworker=self.lnworker, 733 initial_feerate=feerate 734 ) 735 chan.storage['funding_inputs'] = [txin.prevout.to_json() for txin in funding_tx.inputs()] 736 chan.storage['has_onchain_backup'] = has_onchain_backup 737 if isinstance(self.transport, LNTransport): 738 chan.add_or_update_peer_addr(self.transport.peer_addr) 739 sig_64, _ = chan.sign_next_commitment() 740 self.temp_id_to_id[temp_channel_id] = channel_id 741 742 self.send_message("funding_created", 743 temporary_channel_id=temp_channel_id, 744 funding_txid=funding_txid_bytes, 745 funding_output_index=funding_index, 746 signature=sig_64) 747 self.funding_created_sent.add(channel_id) 748 749 # <- funding signed 750 payload = await self.wait_for_message('funding_signed', channel_id) 751 self.logger.info('received funding_signed') 752 remote_sig = payload['signature'] 753 chan.receive_new_commitment(remote_sig, []) 754 chan.open_with_first_pcp(remote_per_commitment_point, remote_sig) 755 chan.set_state(ChannelState.OPENING) 756 self.lnworker.add_new_channel(chan) 757 return chan, funding_tx 758 759 def create_channel_storage(self, channel_id, outpoint, local_config, remote_config, constraints): 760 chan_dict = { 761 "node_id": self.pubkey.hex(), 762 "channel_id": channel_id.hex(), 763 "short_channel_id": None, 764 "funding_outpoint": outpoint, 765 "remote_config": remote_config, 766 "local_config": local_config, 767 "constraints": constraints, 768 "remote_update": None, 769 "state": ChannelState.PREOPENING.name, 770 'onion_keys': {}, 771 'data_loss_protect_remote_pcp': {}, 772 "log": {}, 773 "revocation_store": {}, 774 "static_remotekey_enabled": self.is_static_remotekey(), # stored because it cannot be "downgraded", per BOLT2 775 } 776 return StoredDict(chan_dict, self.lnworker.db if self.lnworker else None, []) 777 778 async def on_open_channel(self, payload): 779 """Implements the channel acceptance flow. 780 781 <- open_channel message 782 -> accept_channel message 783 <- funding_created message 784 -> funding_signed message 785 786 Channel configurations are initialized in this method. 787 """ 788 if self.lnworker.has_recoverable_channels(): 789 # FIXME: we might want to keep the connection open 790 raise Exception('not accepting channels') 791 # <- open_channel 792 if payload['chain_hash'] != constants.net.rev_genesis_bytes(): 793 raise Exception('wrong chain_hash') 794 funding_sat = payload['funding_satoshis'] 795 push_msat = payload['push_msat'] 796 feerate = payload['feerate_per_kw'] # note: we are not validating this 797 temp_chan_id = payload['temporary_channel_id'] 798 local_config = self.make_local_config(funding_sat, push_msat, REMOTE) 799 if funding_sat > LN_MAX_FUNDING_SAT: 800 raise Exception( 801 f"MUST set funding_satoshis to less than 2^24 satoshi. " 802 f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}") 803 if push_msat > 1000 * funding_sat: 804 raise Exception( 805 f"MUST set push_msat to equal or less than 1000 * funding_satoshis: " 806 f"{push_msat} msat > {1000 * funding_sat} msat") 807 if funding_sat < lnutil.MIN_FUNDING_SAT: 808 raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}") 809 810 upfront_shutdown_script = self.upfront_shutdown_script_from_payload( 811 payload, 'open') 812 813 remote_config = RemoteConfig( 814 payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']), 815 multisig_key=OnlyPubkeyKeypair(payload['funding_pubkey']), 816 htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']), 817 delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']), 818 revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']), 819 to_self_delay=payload['to_self_delay'], 820 dust_limit_sat=payload['dust_limit_satoshis'], 821 max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'], 822 max_accepted_htlcs=payload['max_accepted_htlcs'], 823 initial_msat=funding_sat * 1000 - push_msat, 824 reserve_sat=payload['channel_reserve_satoshis'], 825 htlc_minimum_msat=payload['htlc_minimum_msat'], 826 next_per_commitment_point=payload['first_per_commitment_point'], 827 current_per_commitment_point=None, 828 upfront_shutdown_script=upfront_shutdown_script, 829 ) 830 831 remote_config.validate_params(funding_sat=funding_sat) 832 # The receiving node MUST fail the channel if: 833 # the funder's amount for the initial commitment transaction is not 834 # sufficient for full fee payment. 835 if remote_config.initial_msat < calc_fees_for_commitment_tx( 836 num_htlcs=0, 837 feerate=feerate, 838 is_local_initiator=False)[REMOTE]: 839 raise Exception( 840 "the funder's amount for the initial commitment transaction " 841 "is not sufficient for full fee payment") 842 # The receiving node MUST fail the channel if: 843 # both to_local and to_remote amounts for the initial commitment transaction are 844 # less than or equal to channel_reserve_satoshis (see BOLT 3). 845 if (local_config.initial_msat <= 1000 * payload['channel_reserve_satoshis'] 846 and remote_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']): 847 raise Exception( 848 "both to_local and to_remote amounts for the initial commitment " 849 "transaction are less than or equal to channel_reserve_satoshis") 850 # note: we ignore payload['channel_flags'], which e.g. contains 'announce_channel'. 851 # Notably if the remote sets 'announce_channel' to True, we will ignore that too, 852 # but we will not play along with actually announcing the channel (so we keep it private). 853 854 # -> accept channel 855 # for the first commitment transaction 856 per_commitment_secret_first = get_per_commitment_secret_from_seed( 857 local_config.per_commitment_secret_seed, 858 RevocationStore.START_INDEX 859 ) 860 per_commitment_point_first = secret_to_pubkey( 861 int.from_bytes(per_commitment_secret_first, 'big')) 862 min_depth = 3 863 self.send_message( 864 'accept_channel', 865 temporary_channel_id=temp_chan_id, 866 dust_limit_satoshis=local_config.dust_limit_sat, 867 max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat, 868 channel_reserve_satoshis=local_config.reserve_sat, 869 htlc_minimum_msat=local_config.htlc_minimum_msat, 870 minimum_depth=min_depth, 871 to_self_delay=local_config.to_self_delay, 872 max_accepted_htlcs=local_config.max_accepted_htlcs, 873 funding_pubkey=local_config.multisig_key.pubkey, 874 revocation_basepoint=local_config.revocation_basepoint.pubkey, 875 payment_basepoint=local_config.payment_basepoint.pubkey, 876 delayed_payment_basepoint=local_config.delayed_basepoint.pubkey, 877 htlc_basepoint=local_config.htlc_basepoint.pubkey, 878 first_per_commitment_point=per_commitment_point_first, 879 accept_channel_tlvs={ 880 'upfront_shutdown_script': 881 {'shutdown_scriptpubkey': local_config.upfront_shutdown_script} 882 } 883 ) 884 885 # <- funding created 886 funding_created = await self.wait_for_message('funding_created', temp_chan_id) 887 888 # -> funding signed 889 funding_idx = funding_created['funding_output_index'] 890 funding_txid = bh2u(funding_created['funding_txid'][::-1]) 891 channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx) 892 constraints = ChannelConstraints( 893 capacity=funding_sat, 894 is_initiator=False, 895 funding_txn_minimum_depth=min_depth 896 ) 897 outpoint = Outpoint(funding_txid, funding_idx) 898 chan_dict = self.create_channel_storage( 899 channel_id, outpoint, local_config, remote_config, constraints) 900 chan = Channel( 901 chan_dict, 902 sweep_address=self.lnworker.sweep_address, 903 lnworker=self.lnworker, 904 initial_feerate=feerate 905 ) 906 chan.storage['init_timestamp'] = int(time.time()) 907 if isinstance(self.transport, LNTransport): 908 chan.add_or_update_peer_addr(self.transport.peer_addr) 909 remote_sig = funding_created['signature'] 910 chan.receive_new_commitment(remote_sig, []) 911 sig_64, _ = chan.sign_next_commitment() 912 self.send_message('funding_signed', 913 channel_id=channel_id, 914 signature=sig_64, 915 ) 916 self.funding_signed_sent.add(chan.channel_id) 917 chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig) 918 chan.set_state(ChannelState.OPENING) 919 self.lnworker.add_new_channel(chan) 920 921 async def trigger_force_close(self, channel_id: bytes): 922 await self.initialized 923 latest_point = secret_to_pubkey(42) # we need a valid point (BOLT2) 924 self.send_message( 925 "channel_reestablish", 926 channel_id=channel_id, 927 next_commitment_number=0, 928 next_revocation_number=0, 929 your_last_per_commitment_secret=0, 930 my_current_per_commitment_point=latest_point) 931 932 async def reestablish_channel(self, chan: Channel): 933 await self.initialized 934 chan_id = chan.channel_id 935 if chan.should_request_force_close: 936 await self.trigger_force_close(chan_id) 937 chan.should_request_force_close = False 938 return 939 assert ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING 940 if chan.peer_state != PeerState.DISCONNECTED: 941 self.logger.info(f'reestablish_channel was called but channel {chan.get_id_for_log()} ' 942 f'already in peer_state {chan.peer_state!r}') 943 return 944 chan.peer_state = PeerState.REESTABLISHING 945 util.trigger_callback('channel', self.lnworker.wallet, chan) 946 # BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side" 947 chan.hm.discard_unsigned_remote_updates() 948 # ctns 949 oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL) 950 latest_local_ctn = chan.get_latest_ctn(LOCAL) 951 next_local_ctn = chan.get_next_ctn(LOCAL) 952 oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE) 953 latest_remote_ctn = chan.get_latest_ctn(REMOTE) 954 next_remote_ctn = chan.get_next_ctn(REMOTE) 955 assert self.features.supports(LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT) 956 # send message 957 if chan.is_static_remotekey_enabled(): 958 latest_secret, latest_point = chan.get_secret_and_point(LOCAL, 0) 959 else: 960 latest_secret, latest_point = chan.get_secret_and_point(LOCAL, latest_local_ctn) 961 if oldest_unrevoked_remote_ctn == 0: 962 last_rev_secret = 0 963 else: 964 last_rev_index = oldest_unrevoked_remote_ctn - 1 965 last_rev_secret = chan.revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index) 966 self.send_message( 967 "channel_reestablish", 968 channel_id=chan_id, 969 next_commitment_number=next_local_ctn, 970 next_revocation_number=oldest_unrevoked_remote_ctn, 971 your_last_per_commitment_secret=last_rev_secret, 972 my_current_per_commitment_point=latest_point) 973 self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): sent channel_reestablish with ' 974 f'(next_local_ctn={next_local_ctn}, ' 975 f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})') 976 while True: 977 try: 978 msg = await self.wait_for_message('channel_reestablish', chan_id) 979 break 980 except asyncio.TimeoutError: 981 self.logger.info('waiting to receive channel_reestablish...') 982 continue 983 except Exception as e: 984 # do not kill connection, because we might have other channels with that peer 985 self.logger.info(f'channel_reestablish failed, {e}') 986 return 987 their_next_local_ctn = msg["next_commitment_number"] 988 their_oldest_unrevoked_remote_ctn = msg["next_revocation_number"] 989 their_local_pcp = msg.get("my_current_per_commitment_point") 990 their_claim_of_our_last_per_commitment_secret = msg.get("your_last_per_commitment_secret") 991 self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): received channel_reestablish with ' 992 f'(their_next_local_ctn={their_next_local_ctn}, ' 993 f'their_oldest_unrevoked_remote_ctn={their_oldest_unrevoked_remote_ctn})') 994 # sanity checks of received values 995 if their_next_local_ctn < 0: 996 raise RemoteMisbehaving(f"channel reestablish: their_next_local_ctn < 0") 997 if their_oldest_unrevoked_remote_ctn < 0: 998 raise RemoteMisbehaving(f"channel reestablish: their_oldest_unrevoked_remote_ctn < 0") 999 # Replay un-acked local updates (including commitment_signed) byte-for-byte. 1000 # If we have sent them a commitment signature that they "lost" (due to disconnect), 1001 # we need to make sure we replay the same local updates, as otherwise they could 1002 # end up with two (or more) signed valid commitment transactions at the same ctn. 1003 # Multiple valid ctxs at the same ctn is a major headache for pre-signing spending txns, 1004 # e.g. for watchtowers, hence we must ensure these ctxs coincide. 1005 # We replay the local updates even if they were not yet committed. 1006 unacked = chan.hm.get_unacked_local_updates() 1007 n_replayed_msgs = 0 1008 for ctn, messages in unacked.items(): 1009 if ctn < their_next_local_ctn: 1010 # They claim to have received these messages and the corresponding 1011 # commitment_signed, hence we must not replay them. 1012 continue 1013 for raw_upd_msg in messages: 1014 self.transport.send_bytes(raw_upd_msg) 1015 n_replayed_msgs += 1 1016 self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replayed {n_replayed_msgs} unacked messages') 1017 1018 we_are_ahead = False 1019 they_are_ahead = False 1020 # compare remote ctns 1021 if next_remote_ctn != their_next_local_ctn: 1022 if their_next_local_ctn == latest_remote_ctn and chan.hm.is_revack_pending(REMOTE): 1023 # We replayed the local updates (see above), which should have contained a commitment_signed 1024 # (due to is_revack_pending being true), and this should have remedied this situation. 1025 pass 1026 else: 1027 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): " 1028 f"expected remote ctn {next_remote_ctn}, got {their_next_local_ctn}") 1029 if their_next_local_ctn < next_remote_ctn: 1030 we_are_ahead = True 1031 else: 1032 they_are_ahead = True 1033 # compare local ctns 1034 if oldest_unrevoked_local_ctn != their_oldest_unrevoked_remote_ctn: 1035 if oldest_unrevoked_local_ctn - 1 == their_oldest_unrevoked_remote_ctn: 1036 # A node: 1037 # if next_revocation_number is equal to the commitment number of the last revoke_and_ack 1038 # the receiving node sent, AND the receiving node hasn't already received a closing_signed: 1039 # MUST re-send the revoke_and_ack. 1040 last_secret, last_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn - 1) 1041 next_secret, next_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn + 1) 1042 self.send_message( 1043 "revoke_and_ack", 1044 channel_id=chan.channel_id, 1045 per_commitment_secret=last_secret, 1046 next_per_commitment_point=next_point) 1047 else: 1048 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): " 1049 f"expected local ctn {oldest_unrevoked_local_ctn}, got {their_oldest_unrevoked_remote_ctn}") 1050 if their_oldest_unrevoked_remote_ctn < oldest_unrevoked_local_ctn: 1051 we_are_ahead = True 1052 else: 1053 they_are_ahead = True 1054 # option_data_loss_protect 1055 def are_datalossprotect_fields_valid() -> bool: 1056 if their_local_pcp is None or their_claim_of_our_last_per_commitment_secret is None: 1057 return False 1058 if their_oldest_unrevoked_remote_ctn > 0: 1059 our_pcs, __ = chan.get_secret_and_point(LOCAL, their_oldest_unrevoked_remote_ctn - 1) 1060 else: 1061 assert their_oldest_unrevoked_remote_ctn == 0 1062 our_pcs = bytes(32) 1063 if our_pcs != their_claim_of_our_last_per_commitment_secret: 1064 self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): " 1065 f"(DLP) local PCS mismatch: {bh2u(our_pcs)} != {bh2u(their_claim_of_our_last_per_commitment_secret)}") 1066 return False 1067 if chan.is_static_remotekey_enabled(): 1068 return True 1069 try: 1070 __, our_remote_pcp = chan.get_secret_and_point(REMOTE, their_next_local_ctn - 1) 1071 except RemoteCtnTooFarInFuture: 1072 pass 1073 else: 1074 if our_remote_pcp != their_local_pcp: 1075 self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): " 1076 f"(DLP) remote PCP mismatch: {bh2u(our_remote_pcp)} != {bh2u(their_local_pcp)}") 1077 return False 1078 return True 1079 1080 if not are_datalossprotect_fields_valid(): 1081 raise RemoteMisbehaving("channel_reestablish: data loss protect fields invalid") 1082 1083 if they_are_ahead: 1084 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): " 1085 f"remote is ahead of us! They should force-close. Remote PCP: {bh2u(their_local_pcp)}") 1086 # data_loss_protect_remote_pcp is used in lnsweep 1087 chan.set_data_loss_protect_remote_pcp(their_next_local_ctn - 1, their_local_pcp) 1088 self.lnworker.save_channel(chan) 1089 chan.peer_state = PeerState.BAD 1090 return 1091 elif we_are_ahead: 1092 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): we are ahead of remote! trying to force-close.") 1093 await self.lnworker.try_force_closing(chan_id) 1094 return 1095 1096 chan.peer_state = PeerState.GOOD 1097 if chan.is_funded() and their_next_local_ctn == next_local_ctn == 1: 1098 self.send_funding_locked(chan) 1099 # checks done 1100 if chan.is_funded() and chan.config[LOCAL].funding_locked_received: 1101 self.mark_open(chan) 1102 util.trigger_callback('channel', self.lnworker.wallet, chan) 1103 # if we have sent a previous shutdown, it must be retransmitted (Bolt2) 1104 if chan.get_state() == ChannelState.SHUTDOWN: 1105 await self.send_shutdown(chan) 1106 1107 def send_funding_locked(self, chan: Channel): 1108 channel_id = chan.channel_id 1109 per_commitment_secret_index = RevocationStore.START_INDEX - 1 1110 per_commitment_point_second = secret_to_pubkey(int.from_bytes( 1111 get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big')) 1112 # note: if funding_locked was not yet received, we might send it multiple times 1113 self.send_message("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second) 1114 if chan.is_funded() and chan.config[LOCAL].funding_locked_received: 1115 self.mark_open(chan) 1116 1117 def on_funding_locked(self, chan: Channel, payload): 1118 self.logger.info(f"on_funding_locked. channel: {bh2u(chan.channel_id)}") 1119 if not chan.config[LOCAL].funding_locked_received: 1120 their_next_point = payload["next_per_commitment_point"] 1121 chan.config[REMOTE].next_per_commitment_point = their_next_point 1122 chan.config[LOCAL].funding_locked_received = True 1123 self.lnworker.save_channel(chan) 1124 if chan.is_funded(): 1125 self.mark_open(chan) 1126 1127 def on_network_update(self, chan: Channel, funding_tx_depth: int): 1128 """ 1129 Only called when the channel is OPEN. 1130 1131 Runs on the Network thread. 1132 """ 1133 if not chan.config[LOCAL].was_announced and funding_tx_depth >= 6: 1134 # don't announce our channels 1135 # FIXME should this be a field in chan.local_state maybe? 1136 return 1137 chan.config[LOCAL].was_announced = True 1138 self.lnworker.save_channel(chan) 1139 coro = self.handle_announcements(chan) 1140 asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) 1141 1142 @log_exceptions 1143 async def handle_announcements(self, chan: Channel): 1144 h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan) 1145 announcement_signatures_msg = await self.announcement_signatures[chan.channel_id].get() 1146 remote_node_sig = announcement_signatures_msg["node_signature"] 1147 remote_bitcoin_sig = announcement_signatures_msg["bitcoin_signature"] 1148 if not ecc.verify_signature(chan.config[REMOTE].multisig_key.pubkey, remote_bitcoin_sig, h): 1149 raise Exception("bitcoin_sig invalid in announcement_signatures") 1150 if not ecc.verify_signature(self.pubkey, remote_node_sig, h): 1151 raise Exception("node_sig invalid in announcement_signatures") 1152 1153 node_sigs = [remote_node_sig, local_node_sig] 1154 bitcoin_sigs = [remote_bitcoin_sig, local_bitcoin_sig] 1155 bitcoin_keys = [chan.config[REMOTE].multisig_key.pubkey, chan.config[LOCAL].multisig_key.pubkey] 1156 1157 if self.node_ids[0] > self.node_ids[1]: 1158 node_sigs.reverse() 1159 bitcoin_sigs.reverse() 1160 node_ids = list(reversed(self.node_ids)) 1161 bitcoin_keys.reverse() 1162 else: 1163 node_ids = self.node_ids 1164 1165 self.send_message("channel_announcement", 1166 node_signatures_1=node_sigs[0], 1167 node_signatures_2=node_sigs[1], 1168 bitcoin_signature_1=bitcoin_sigs[0], 1169 bitcoin_signature_2=bitcoin_sigs[1], 1170 len=0, 1171 #features not set (defaults to zeros) 1172 chain_hash=constants.net.rev_genesis_bytes(), 1173 short_channel_id=chan.short_channel_id, 1174 node_id_1=node_ids[0], 1175 node_id_2=node_ids[1], 1176 bitcoin_key_1=bitcoin_keys[0], 1177 bitcoin_key_2=bitcoin_keys[1] 1178 ) 1179 1180 def mark_open(self, chan: Channel): 1181 assert chan.is_funded() 1182 # only allow state transition from "FUNDED" to "OPEN" 1183 old_state = chan.get_state() 1184 if old_state == ChannelState.OPEN: 1185 return 1186 if old_state != ChannelState.FUNDED: 1187 self.logger.info(f"cannot mark open ({chan.get_id_for_log()}), current state: {repr(old_state)}") 1188 return 1189 assert chan.config[LOCAL].funding_locked_received 1190 chan.set_state(ChannelState.OPEN) 1191 util.trigger_callback('channel', self.lnworker.wallet, chan) 1192 # peer may have sent us a channel update for the incoming direction previously 1193 pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id) 1194 if pending_channel_update: 1195 chan.set_remote_update(pending_channel_update) 1196 self.logger.info(f"CHANNEL OPENING COMPLETED ({chan.get_id_for_log()})") 1197 forwarding_enabled = self.network.config.get('lightning_forward_payments', False) 1198 if forwarding_enabled: 1199 # send channel_update of outgoing edge to peer, 1200 # so that channel can be used to to receive payments 1201 self.logger.info(f"sending channel update for outgoing edge ({chan.get_id_for_log()})") 1202 chan_upd = chan.get_outgoing_gossip_channel_update() 1203 self.transport.send_bytes(chan_upd) 1204 1205 def send_announcement_signatures(self, chan: Channel): 1206 chan_ann = chan.construct_channel_announcement_without_sigs() 1207 preimage = chan_ann[256+2:] 1208 msg_hash = sha256d(preimage) 1209 bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).sign(msg_hash, sig_string_from_r_and_s) 1210 node_signature = ecc.ECPrivkey(self.privkey).sign(msg_hash, sig_string_from_r_and_s) 1211 self.send_message("announcement_signatures", 1212 channel_id=chan.channel_id, 1213 short_channel_id=chan.short_channel_id, 1214 node_signature=node_signature, 1215 bitcoin_signature=bitcoin_signature 1216 ) 1217 return msg_hash, node_signature, bitcoin_signature 1218 1219 def on_update_fail_htlc(self, chan: Channel, payload): 1220 htlc_id = payload["id"] 1221 reason = payload["reason"] 1222 self.logger.info(f"on_update_fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}") 1223 chan.receive_fail_htlc(htlc_id, error_bytes=reason) # TODO handle exc and maybe fail channel (e.g. bad htlc_id) 1224 self.maybe_send_commitment(chan) 1225 1226 def maybe_send_commitment(self, chan: Channel): 1227 # REMOTE should revoke first before we can sign a new ctx 1228 if chan.hm.is_revack_pending(REMOTE): 1229 return 1230 # if there are no changes, we will not (and must not) send a new commitment 1231 if not chan.has_pending_changes(REMOTE): 1232 return 1233 self.logger.info(f'send_commitment. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(REMOTE)}.') 1234 sig_64, htlc_sigs = chan.sign_next_commitment() 1235 self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs)) 1236 1237 def pay(self, *, 1238 route: 'LNPaymentRoute', 1239 chan: Channel, 1240 amount_msat: int, 1241 total_msat: int, 1242 payment_hash: bytes, 1243 min_final_cltv_expiry: int, 1244 payment_secret: bytes = None, 1245 trampoline_onion=None) -> UpdateAddHtlc: 1246 1247 assert amount_msat > 0, "amount_msat is not greater zero" 1248 assert len(route) > 0 1249 if not chan.can_send_update_add_htlc(): 1250 raise PaymentFailure("Channel cannot send update_add_htlc") 1251 # add features learned during "init" for direct neighbour: 1252 route[0].node_features |= self.features 1253 local_height = self.network.get_local_height() 1254 final_cltv = local_height + min_final_cltv_expiry 1255 hops_data, amount_msat, cltv = calc_hops_data_for_payment( 1256 route, 1257 amount_msat, 1258 final_cltv, 1259 total_msat=total_msat, 1260 payment_secret=payment_secret) 1261 num_hops = len(hops_data) 1262 self.logger.info(f"lnpeer.pay len(route)={len(route)}") 1263 for i in range(len(route)): 1264 self.logger.info(f" {i}: edge={route[i].short_channel_id} hop_data={hops_data[i]!r}") 1265 assert final_cltv <= cltv, (final_cltv, cltv) 1266 session_key = os.urandom(32) # session_key 1267 # if we are forwarding a trampoline payment, add trampoline onion 1268 if trampoline_onion: 1269 self.logger.info(f'adding trampoline onion to final payload') 1270 trampoline_payload = hops_data[num_hops-2].payload 1271 trampoline_payload["trampoline_onion_packet"] = { 1272 "version": trampoline_onion.version, 1273 "public_key": trampoline_onion.public_key, 1274 "hops_data": trampoline_onion.hops_data, 1275 "hmac": trampoline_onion.hmac 1276 } 1277 # create onion packet 1278 payment_path_pubkeys = [x.node_id for x in route] 1279 onion = new_onion_packet(payment_path_pubkeys, session_key, hops_data, associated_data=payment_hash) # must use another sessionkey 1280 self.logger.info(f"starting payment. len(route)={len(hops_data)}.") 1281 # create htlc 1282 if cltv > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE: 1283 raise PaymentFailure(f"htlc expiry too far into future. (in {cltv-local_height} blocks)") 1284 htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_expiry=cltv, timestamp=int(time.time())) 1285 htlc = chan.add_htlc(htlc) 1286 chan.set_onion_key(htlc.htlc_id, session_key) # should it be the outer onion secret? 1287 self.logger.info(f"starting payment. htlc: {htlc}") 1288 self.send_message( 1289 "update_add_htlc", 1290 channel_id=chan.channel_id, 1291 id=htlc.htlc_id, 1292 cltv_expiry=htlc.cltv_expiry, 1293 amount_msat=htlc.amount_msat, 1294 payment_hash=htlc.payment_hash, 1295 onion_routing_packet=onion.to_bytes()) 1296 self.maybe_send_commitment(chan) 1297 return htlc 1298 1299 def send_revoke_and_ack(self, chan: Channel): 1300 self.logger.info(f'send_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(LOCAL)}') 1301 rev = chan.revoke_current_commitment() 1302 self.lnworker.save_channel(chan) 1303 self.send_message("revoke_and_ack", 1304 channel_id=chan.channel_id, 1305 per_commitment_secret=rev.per_commitment_secret, 1306 next_per_commitment_point=rev.next_per_commitment_point) 1307 self.maybe_send_commitment(chan) 1308 1309 def on_commitment_signed(self, chan: Channel, payload): 1310 if chan.peer_state == PeerState.BAD: 1311 return 1312 self.logger.info(f'on_commitment_signed. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(LOCAL)}.') 1313 # make sure there were changes to the ctx, otherwise the remote peer is misbehaving 1314 if not chan.has_pending_changes(LOCAL): 1315 # TODO if feerate changed A->B->A; so there were updates but the value is identical, 1316 # then it might be legal to send a commitment_signature 1317 # see https://github.com/lightningnetwork/lightning-rfc/pull/618 1318 raise RemoteMisbehaving('received commitment_signed without pending changes') 1319 # REMOTE should wait until we have revoked 1320 if chan.hm.is_revack_pending(LOCAL): 1321 raise RemoteMisbehaving('received commitment_signed before we revoked previous ctx') 1322 data = payload["htlc_signature"] 1323 htlc_sigs = list(chunks(data, 64)) 1324 chan.receive_new_commitment(payload["signature"], htlc_sigs) 1325 self.send_revoke_and_ack(chan) 1326 1327 def on_update_fulfill_htlc(self, chan: Channel, payload): 1328 preimage = payload["payment_preimage"] 1329 payment_hash = sha256(preimage) 1330 htlc_id = payload["id"] 1331 self.logger.info(f"on_update_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}") 1332 chan.receive_htlc_settle(preimage, htlc_id) # TODO handle exc and maybe fail channel (e.g. bad htlc_id) 1333 self.lnworker.save_preimage(payment_hash, preimage) 1334 self.maybe_send_commitment(chan) 1335 1336 def on_update_fail_malformed_htlc(self, chan: Channel, payload): 1337 htlc_id = payload["id"] 1338 failure_code = payload["failure_code"] 1339 self.logger.info(f"on_update_fail_malformed_htlc. chan {chan.get_id_for_log()}. " 1340 f"htlc_id {htlc_id}. failure_code={failure_code}") 1341 if failure_code & OnionFailureCodeMetaFlag.BADONION == 0: 1342 asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id)) 1343 raise RemoteMisbehaving(f"received update_fail_malformed_htlc with unexpected failure code: {failure_code}") 1344 reason = OnionRoutingFailure(code=failure_code, data=payload["sha256_of_onion"]) 1345 chan.receive_fail_htlc(htlc_id, error_bytes=None, reason=reason) 1346 self.maybe_send_commitment(chan) 1347 1348 def on_update_add_htlc(self, chan: Channel, payload): 1349 payment_hash = payload["payment_hash"] 1350 htlc_id = payload["id"] 1351 cltv_expiry = payload["cltv_expiry"] 1352 amount_msat_htlc = payload["amount_msat"] 1353 onion_packet = payload["onion_routing_packet"] 1354 htlc = UpdateAddHtlc( 1355 amount_msat=amount_msat_htlc, 1356 payment_hash=payment_hash, 1357 cltv_expiry=cltv_expiry, 1358 timestamp=int(time.time()), 1359 htlc_id=htlc_id) 1360 self.logger.info(f"on_update_add_htlc. chan {chan.short_channel_id}. htlc={str(htlc)}") 1361 if chan.get_state() != ChannelState.OPEN: 1362 raise RemoteMisbehaving(f"received update_add_htlc while chan.get_state() != OPEN. state was {chan.get_state()!r}") 1363 if cltv_expiry > bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX: 1364 asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id)) 1365 raise RemoteMisbehaving(f"received update_add_htlc with cltv_expiry > BLOCKHEIGHT_MAX. value was {cltv_expiry}") 1366 # add htlc 1367 chan.receive_htlc(htlc, onion_packet) 1368 util.trigger_callback('htlc_added', chan, htlc, RECEIVED) 1369 1370 def maybe_forward_htlc( 1371 self, *, 1372 htlc: UpdateAddHtlc, 1373 processed_onion: ProcessedOnionPacket) -> Tuple[bytes, int]: 1374 1375 # Forward HTLC 1376 # FIXME: there are critical safety checks MISSING here 1377 # - for example; atm we forward first and then persist "forwarding_info", 1378 # so if we segfault in-between and restart, we might forward an HTLC twice... 1379 # (same for trampoline forwarding) 1380 forwarding_enabled = self.network.config.get('lightning_forward_payments', False) 1381 if not forwarding_enabled: 1382 self.logger.info(f"forwarding is disabled. failing htlc.") 1383 raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'') 1384 chain = self.network.blockchain() 1385 if chain.is_tip_stale(): 1386 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'') 1387 try: 1388 next_chan_scid = processed_onion.hop_data.payload["short_channel_id"]["short_channel_id"] 1389 except: 1390 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1391 next_chan = self.lnworker.get_channel_by_short_id(next_chan_scid) 1392 local_height = chain.height() 1393 if next_chan is None: 1394 self.logger.info(f"cannot forward htlc. cannot find next_chan {next_chan_scid}") 1395 raise OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'') 1396 outgoing_chan_upd = next_chan.get_outgoing_gossip_channel_update()[2:] 1397 outgoing_chan_upd_len = len(outgoing_chan_upd).to_bytes(2, byteorder="big") 1398 outgoing_chan_upd_message = outgoing_chan_upd_len + outgoing_chan_upd 1399 if not next_chan.can_send_update_add_htlc(): 1400 self.logger.info(f"cannot forward htlc. next_chan {next_chan_scid} cannot send ctx updates. " 1401 f"chan state {next_chan.get_state()!r}, peer state: {next_chan.peer_state!r}") 1402 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message) 1403 try: 1404 next_amount_msat_htlc = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"] 1405 except: 1406 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1407 if not next_chan.can_pay(next_amount_msat_htlc): 1408 self.logger.info(f"cannot forward htlc due to transient errors (likely due to insufficient funds)") 1409 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message) 1410 try: 1411 next_cltv_expiry = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"] 1412 except: 1413 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1414 if htlc.cltv_expiry - next_cltv_expiry < next_chan.forwarding_cltv_expiry_delta: 1415 data = htlc.cltv_expiry.to_bytes(4, byteorder="big") + outgoing_chan_upd_message 1416 raise OnionRoutingFailure(code=OnionFailureCode.INCORRECT_CLTV_EXPIRY, data=data) 1417 if htlc.cltv_expiry - lnutil.MIN_FINAL_CLTV_EXPIRY_ACCEPTED <= local_height \ 1418 or next_cltv_expiry <= local_height: 1419 raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_SOON, data=outgoing_chan_upd_message) 1420 if max(htlc.cltv_expiry, next_cltv_expiry) > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE: 1421 raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_FAR, data=b'') 1422 forwarding_fees = fee_for_edge_msat( 1423 forwarded_amount_msat=next_amount_msat_htlc, 1424 fee_base_msat=next_chan.forwarding_fee_base_msat, 1425 fee_proportional_millionths=next_chan.forwarding_fee_proportional_millionths) 1426 if htlc.amount_msat - next_amount_msat_htlc < forwarding_fees: 1427 data = next_amount_msat_htlc.to_bytes(8, byteorder="big") + outgoing_chan_upd_message 1428 raise OnionRoutingFailure(code=OnionFailureCode.FEE_INSUFFICIENT, data=data) 1429 self.logger.info(f'forwarding htlc to {next_chan.node_id}') 1430 next_htlc = UpdateAddHtlc( 1431 amount_msat=next_amount_msat_htlc, 1432 payment_hash=htlc.payment_hash, 1433 cltv_expiry=next_cltv_expiry, 1434 timestamp=int(time.time())) 1435 next_htlc = next_chan.add_htlc(next_htlc) 1436 next_peer = self.lnworker.peers[next_chan.node_id] 1437 try: 1438 next_peer.send_message( 1439 "update_add_htlc", 1440 channel_id=next_chan.channel_id, 1441 id=next_htlc.htlc_id, 1442 cltv_expiry=next_cltv_expiry, 1443 amount_msat=next_amount_msat_htlc, 1444 payment_hash=next_htlc.payment_hash, 1445 onion_routing_packet=processed_onion.next_packet.to_bytes() 1446 ) 1447 except BaseException as e: 1448 self.logger.info(f"failed to forward htlc: error sending message. {e}") 1449 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message) 1450 return next_chan_scid, next_htlc.htlc_id 1451 1452 def maybe_forward_trampoline( 1453 self, *, 1454 chan: Channel, 1455 htlc: UpdateAddHtlc, 1456 trampoline_onion: ProcessedOnionPacket): 1457 1458 forwarding_enabled = self.network.config.get('lightning_forward_payments', False) 1459 forwarding_trampoline_enabled = self.network.config.get('lightning_forward_trampoline_payments', False) 1460 if not (forwarding_enabled and forwarding_trampoline_enabled): 1461 self.logger.info(f"trampoline forwarding is disabled. failing htlc.") 1462 raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'') 1463 1464 payload = trampoline_onion.hop_data.payload 1465 payment_hash = htlc.payment_hash 1466 payment_secret = os.urandom(32) 1467 try: 1468 outgoing_node_id = payload["outgoing_node_id"]["outgoing_node_id"] 1469 amt_to_forward = payload["amt_to_forward"]["amt_to_forward"] 1470 cltv_from_onion = payload["outgoing_cltv_value"]["outgoing_cltv_value"] 1471 if "invoice_features" in payload: 1472 self.logger.info('forward_trampoline: legacy') 1473 next_trampoline_onion = None 1474 invoice_features = payload["invoice_features"]["invoice_features"] 1475 invoice_routing_info = payload["invoice_routing_info"]["invoice_routing_info"] 1476 # TODO use invoice_routing_info 1477 else: 1478 self.logger.info('forward_trampoline: end-to-end') 1479 invoice_features = LnFeatures.BASIC_MPP_OPT 1480 next_trampoline_onion = trampoline_onion.next_packet 1481 except Exception as e: 1482 self.logger.exception('') 1483 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1484 1485 # these are the fee/cltv paid by the sender 1486 # pay_to_node will raise if they are not sufficient 1487 trampoline_cltv_delta = htlc.cltv_expiry - cltv_from_onion 1488 trampoline_fee = htlc.amount_msat - amt_to_forward 1489 1490 @log_exceptions 1491 async def forward_trampoline_payment(): 1492 try: 1493 await self.lnworker.pay_to_node( 1494 node_pubkey=outgoing_node_id, 1495 payment_hash=payment_hash, 1496 payment_secret=payment_secret, 1497 amount_to_pay=amt_to_forward, 1498 min_cltv_expiry=cltv_from_onion, 1499 r_tags=[], 1500 invoice_features=invoice_features, 1501 fwd_trampoline_onion=next_trampoline_onion, 1502 fwd_trampoline_fee=trampoline_fee, 1503 fwd_trampoline_cltv_delta=trampoline_cltv_delta, 1504 attempts=1) 1505 except OnionRoutingFailure as e: 1506 # FIXME: cannot use payment_hash as key 1507 self.lnworker.trampoline_forwarding_failures[payment_hash] = e 1508 except PaymentFailure as e: 1509 # FIXME: adapt the error code 1510 error_reason = OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'') 1511 self.lnworker.trampoline_forwarding_failures[payment_hash] = error_reason 1512 1513 asyncio.ensure_future(forward_trampoline_payment()) 1514 1515 def maybe_fulfill_htlc( 1516 self, *, 1517 chan: Channel, 1518 htlc: UpdateAddHtlc, 1519 processed_onion: ProcessedOnionPacket, 1520 is_trampoline: bool = False) -> Tuple[Optional[bytes], Optional[OnionPacket]]: 1521 1522 """As a final recipient of an HTLC, decide if we should fulfill it. 1523 Return (preimage, trampoline_onion_packet) with at most a single element not None 1524 """ 1525 def log_fail_reason(reason: str): 1526 self.logger.info(f"maybe_fulfill_htlc. will FAIL HTLC: chan {chan.short_channel_id}. " 1527 f"{reason}. htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}") 1528 1529 try: 1530 amt_to_forward = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"] 1531 except: 1532 log_fail_reason(f"'amt_to_forward' missing from onion") 1533 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1534 1535 # Check that our blockchain tip is sufficiently recent so that we have an approx idea of the height. 1536 # We should not release the preimage for an HTLC that its sender could already time out as 1537 # then they might try to force-close and it becomes a race. 1538 chain = self.network.blockchain() 1539 if chain.is_tip_stale(): 1540 log_fail_reason(f"our chain tip is stale") 1541 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'') 1542 local_height = chain.height() 1543 exc_incorrect_or_unknown_pd = OnionRoutingFailure( 1544 code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS, 1545 data=amt_to_forward.to_bytes(8, byteorder="big") + local_height.to_bytes(4, byteorder="big")) 1546 if local_height + MIN_FINAL_CLTV_EXPIRY_ACCEPTED > htlc.cltv_expiry: 1547 log_fail_reason(f"htlc.cltv_expiry is unreasonably close") 1548 raise exc_incorrect_or_unknown_pd 1549 try: 1550 cltv_from_onion = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"] 1551 except: 1552 log_fail_reason(f"'outgoing_cltv_value' missing from onion") 1553 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1554 1555 if not is_trampoline: 1556 if cltv_from_onion != htlc.cltv_expiry: 1557 log_fail_reason(f"cltv_from_onion != htlc.cltv_expiry") 1558 raise OnionRoutingFailure( 1559 code=OnionFailureCode.FINAL_INCORRECT_CLTV_EXPIRY, 1560 data=htlc.cltv_expiry.to_bytes(4, byteorder="big")) 1561 try: 1562 total_msat = processed_onion.hop_data.payload["payment_data"]["total_msat"] 1563 except: 1564 total_msat = amt_to_forward # fall back to "amt_to_forward" 1565 1566 if not is_trampoline and amt_to_forward != htlc.amount_msat: 1567 log_fail_reason(f"amt_to_forward != htlc.amount_msat") 1568 raise OnionRoutingFailure( 1569 code=OnionFailureCode.FINAL_INCORRECT_HTLC_AMOUNT, 1570 data=htlc.amount_msat.to_bytes(8, byteorder="big")) 1571 1572 try: 1573 payment_secret_from_onion = processed_onion.hop_data.payload["payment_data"]["payment_secret"] 1574 except: 1575 if total_msat > amt_to_forward: 1576 # payment_secret is required for MPP 1577 log_fail_reason(f"'payment_secret' missing from onion") 1578 raise exc_incorrect_or_unknown_pd 1579 # TODO fail here if invoice has set PAYMENT_SECRET_REQ 1580 payment_secret_from_onion = None 1581 1582 if total_msat > amt_to_forward: 1583 mpp_status = self.lnworker.check_received_mpp_htlc(payment_secret_from_onion, chan.short_channel_id, htlc, total_msat) 1584 if mpp_status is None: 1585 return None, None 1586 if mpp_status is False: 1587 log_fail_reason(f"MPP_TIMEOUT") 1588 raise OnionRoutingFailure(code=OnionFailureCode.MPP_TIMEOUT, data=b'') 1589 assert mpp_status is True 1590 1591 # if there is a trampoline_onion, maybe_fulfill_htlc will be called again 1592 if processed_onion.trampoline_onion_packet: 1593 # TODO: we should check that all trampoline_onions are the same 1594 return None, processed_onion.trampoline_onion_packet 1595 1596 # TODO don't accept payments twice for same invoice 1597 # TODO check invoice expiry 1598 info = self.lnworker.get_payment_info(htlc.payment_hash) 1599 if info is None: 1600 log_fail_reason(f"no payment_info found for RHASH {htlc.payment_hash.hex()}") 1601 raise exc_incorrect_or_unknown_pd 1602 preimage = self.lnworker.get_preimage(htlc.payment_hash) 1603 if payment_secret_from_onion: 1604 if payment_secret_from_onion != derive_payment_secret_from_payment_preimage(preimage): 1605 log_fail_reason(f'incorrect payment secret {payment_secret_from_onion.hex()} != {derive_payment_secret_from_payment_preimage(preimage).hex()}') 1606 raise exc_incorrect_or_unknown_pd 1607 invoice_msat = info.amount_msat 1608 if not (invoice_msat is None or invoice_msat <= total_msat <= 2 * invoice_msat): 1609 log_fail_reason(f"total_msat={total_msat} too different from invoice_msat={invoice_msat}") 1610 raise exc_incorrect_or_unknown_pd 1611 self.logger.info(f"maybe_fulfill_htlc. will FULFILL HTLC: chan {chan.short_channel_id}. htlc={str(htlc)}") 1612 self.lnworker.set_request_status(htlc.payment_hash, PR_PAID) 1613 return preimage, None 1614 1615 def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes): 1616 self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}") 1617 assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}" 1618 assert chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id) 1619 self.received_htlcs_pending_removal.add((chan, htlc_id)) 1620 chan.settle_htlc(preimage, htlc_id) 1621 self.send_message( 1622 "update_fulfill_htlc", 1623 channel_id=chan.channel_id, 1624 id=htlc_id, 1625 payment_preimage=preimage) 1626 1627 def fail_htlc(self, *, chan: Channel, htlc_id: int, error_bytes: bytes): 1628 self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.") 1629 assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}" 1630 self.received_htlcs_pending_removal.add((chan, htlc_id)) 1631 chan.fail_htlc(htlc_id) 1632 self.send_message( 1633 "update_fail_htlc", 1634 channel_id=chan.channel_id, 1635 id=htlc_id, 1636 len=len(error_bytes), 1637 reason=error_bytes) 1638 1639 def fail_malformed_htlc(self, *, chan: Channel, htlc_id: int, reason: OnionRoutingFailure): 1640 self.logger.info(f"fail_malformed_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.") 1641 assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}" 1642 if not (reason.code & OnionFailureCodeMetaFlag.BADONION and len(reason.data) == 32): 1643 raise Exception(f"unexpected reason when sending 'update_fail_malformed_htlc': {reason!r}") 1644 self.received_htlcs_pending_removal.add((chan, htlc_id)) 1645 chan.fail_htlc(htlc_id) 1646 self.send_message( 1647 "update_fail_malformed_htlc", 1648 channel_id=chan.channel_id, 1649 id=htlc_id, 1650 sha256_of_onion=reason.data, 1651 failure_code=reason.code) 1652 1653 def on_revoke_and_ack(self, chan: Channel, payload): 1654 if chan.peer_state == PeerState.BAD: 1655 return 1656 self.logger.info(f'on_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(REMOTE)}') 1657 rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"]) 1658 chan.receive_revocation(rev) 1659 self.lnworker.save_channel(chan) 1660 self.maybe_send_commitment(chan) 1661 1662 def on_update_fee(self, chan: Channel, payload): 1663 feerate = payload["feerate_per_kw"] 1664 chan.update_fee(feerate, False) 1665 1666 async def maybe_update_fee(self, chan: Channel): 1667 """ 1668 called when our fee estimates change 1669 """ 1670 if not chan.can_send_ctx_updates(): 1671 return 1672 feerate_per_kw = self.lnworker.current_feerate_per_kw() 1673 if not chan.constraints.is_initiator: 1674 if constants.net is not constants.BitcoinRegtest: 1675 chan_feerate = chan.get_latest_feerate(LOCAL) 1676 ratio = chan_feerate / feerate_per_kw 1677 if ratio < 0.5: 1678 # Note that we trust the Electrum server about fee rates 1679 # Thus, automated force-closing might not be a good idea 1680 # Maybe we should display something in the GUI instead 1681 self.logger.warning( 1682 f"({chan.get_id_for_log()}) feerate is {chan_feerate} sat/kw, " 1683 f"current recommended feerate is {feerate_per_kw} sat/kw, consider force closing!") 1684 return 1685 chan_fee = chan.get_next_feerate(REMOTE) 1686 if feerate_per_kw < chan_fee / 2: 1687 self.logger.info("FEES HAVE FALLEN") 1688 elif feerate_per_kw > chan_fee * 2: 1689 self.logger.info("FEES HAVE RISEN") 1690 elif chan.get_oldest_unrevoked_ctn(REMOTE) == 0: 1691 # workaround eclair issue https://github.com/ACINQ/eclair/issues/1730 1692 self.logger.info("updating fee to bump remote ctn") 1693 if feerate_per_kw == chan_fee: 1694 feerate_per_kw += 1 1695 else: 1696 return 1697 self.logger.info(f"(chan: {chan.get_id_for_log()}) current pending feerate {chan_fee}. " 1698 f"new feerate {feerate_per_kw}") 1699 chan.update_fee(feerate_per_kw, True) 1700 self.send_message( 1701 "update_fee", 1702 channel_id=chan.channel_id, 1703 feerate_per_kw=feerate_per_kw) 1704 self.maybe_send_commitment(chan) 1705 1706 @log_exceptions 1707 async def close_channel(self, chan_id: bytes): 1708 chan = self.channels[chan_id] 1709 self.shutdown_received[chan_id] = asyncio.Future() 1710 await self.send_shutdown(chan) 1711 payload = await self.shutdown_received[chan_id] 1712 try: 1713 txid = await self._shutdown(chan, payload, is_local=True) 1714 self.logger.info(f'({chan.get_id_for_log()}) Channel closed {txid}') 1715 except asyncio.TimeoutError: 1716 txid = chan.unconfirmed_closing_txid 1717 self.logger.info(f'({chan.get_id_for_log()}) did not send closing_signed, {txid}') 1718 if txid is None: 1719 raise Exception('The remote peer did not send their final signature. The channel may not have been be closed') 1720 return txid 1721 1722 async def on_shutdown(self, chan: Channel, payload): 1723 their_scriptpubkey = payload['scriptpubkey'] 1724 their_upfront_scriptpubkey = chan.config[REMOTE].upfront_shutdown_script 1725 # BOLT-02 check if they use the upfront shutdown script they advertized 1726 if their_upfront_scriptpubkey: 1727 if not (their_scriptpubkey == their_upfront_scriptpubkey): 1728 raise UpfrontShutdownScriptViolation("remote didn't use upfront shutdown script it commited to in channel opening") 1729 # BOLT-02 restrict the scriptpubkey to some templates: 1730 if not (match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_WITNESS_V0) 1731 or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2SH) 1732 or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2PKH)): 1733 raise Exception(f'scriptpubkey in received shutdown message does not conform to any template: {their_scriptpubkey.hex()}') 1734 chan_id = chan.channel_id 1735 if chan_id in self.shutdown_received: 1736 self.shutdown_received[chan_id].set_result(payload) 1737 else: 1738 chan = self.channels[chan_id] 1739 await self.send_shutdown(chan) 1740 txid = await self._shutdown(chan, payload, is_local=False) 1741 self.logger.info(f'({chan.get_id_for_log()}) Channel closed by remote peer {txid}') 1742 1743 def can_send_shutdown(self, chan: Channel): 1744 if chan.get_state() >= ChannelState.OPENING: 1745 return True 1746 if chan.constraints.is_initiator and chan.channel_id in self.funding_created_sent: 1747 return True 1748 if not chan.constraints.is_initiator and chan.channel_id in self.funding_signed_sent: 1749 return True 1750 return False 1751 1752 async def send_shutdown(self, chan: Channel): 1753 if not self.can_send_shutdown(chan): 1754 raise Exception('cannot send shutdown') 1755 if chan.config[LOCAL].upfront_shutdown_script: 1756 scriptpubkey = chan.config[LOCAL].upfront_shutdown_script 1757 else: 1758 scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address)) 1759 assert scriptpubkey 1760 # wait until no more pending updates (bolt2) 1761 chan.set_can_send_ctx_updates(False) 1762 while chan.has_pending_changes(REMOTE): 1763 await asyncio.sleep(0.1) 1764 self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey) 1765 chan.set_state(ChannelState.SHUTDOWN) 1766 # can fullfill or fail htlcs. cannot add htlcs, because state != OPEN 1767 chan.set_can_send_ctx_updates(True) 1768 1769 @log_exceptions 1770 async def _shutdown(self, chan: Channel, payload, *, is_local: bool): 1771 # wait until no HTLCs remain in either commitment transaction 1772 while len(chan.hm.htlcs(LOCAL)) + len(chan.hm.htlcs(REMOTE)) > 0: 1773 self.logger.info(f'(chan: {chan.short_channel_id}) waiting for htlcs to settle...') 1774 await asyncio.sleep(1) 1775 # if no HTLCs remain, we must not send updates 1776 chan.set_can_send_ctx_updates(False) 1777 their_scriptpubkey = payload['scriptpubkey'] 1778 if chan.config[LOCAL].upfront_shutdown_script: 1779 our_scriptpubkey = chan.config[LOCAL].upfront_shutdown_script 1780 else: 1781 our_scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address)) 1782 assert our_scriptpubkey 1783 # estimate fee of closing tx 1784 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=0) 1785 fee_rate = self.network.config.fee_per_kb() 1786 our_fee = fee_rate * closing_tx.estimated_size() // 1000 1787 # BOLT2: The sending node MUST set fee less than or equal to the base fee of the final ctx 1788 max_fee = chan.get_latest_fee(LOCAL if is_local else REMOTE) 1789 our_fee = min(our_fee, max_fee) 1790 drop_remote = False 1791 def send_closing_signed(): 1792 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=our_fee, drop_remote=drop_remote) 1793 self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=our_fee, signature=our_sig) 1794 def verify_signature(tx, sig): 1795 their_pubkey = chan.config[REMOTE].multisig_key.pubkey 1796 preimage_hex = tx.serialize_preimage(0) 1797 pre_hash = sha256d(bfh(preimage_hex)) 1798 return ecc.verify_signature(their_pubkey, sig, pre_hash) 1799 # the funder sends the first 'closing_signed' message 1800 if chan.constraints.is_initiator: 1801 send_closing_signed() 1802 # negotiate fee 1803 while True: 1804 # FIXME: the remote SHOULD send closing_signed, but some don't. 1805 cs_payload = await self.wait_for_message('closing_signed', chan.channel_id) 1806 their_fee = cs_payload['fee_satoshis'] 1807 if their_fee > max_fee: 1808 raise Exception(f'the proposed fee exceeds the base fee of the latest commitment transaction {is_local, their_fee, max_fee}') 1809 their_sig = cs_payload['signature'] 1810 # verify their sig: they might have dropped their output 1811 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=False) 1812 if verify_signature(closing_tx, their_sig): 1813 drop_remote = False 1814 else: 1815 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=True) 1816 if verify_signature(closing_tx, their_sig): 1817 drop_remote = True 1818 else: 1819 raise Exception('failed to verify their signature') 1820 # Agree if difference is lower or equal to one (see below) 1821 if abs(our_fee - their_fee) < 2: 1822 our_fee = their_fee 1823 break 1824 # this will be "strictly between" (as in BOLT2) previous values because of the above 1825 our_fee = (our_fee + their_fee) // 2 1826 # another round 1827 send_closing_signed() 1828 # the non-funder replies 1829 if not chan.constraints.is_initiator: 1830 send_closing_signed() 1831 # add signatures 1832 closing_tx.add_signature_to_txin( 1833 txin_idx=0, 1834 signing_pubkey=chan.config[LOCAL].multisig_key.pubkey.hex(), 1835 sig=bh2u(der_sig_from_sig_string(our_sig) + b'\x01')) 1836 closing_tx.add_signature_to_txin( 1837 txin_idx=0, 1838 signing_pubkey=chan.config[REMOTE].multisig_key.pubkey.hex(), 1839 sig=bh2u(der_sig_from_sig_string(their_sig) + b'\x01')) 1840 # save local transaction and set state 1841 try: 1842 self.lnworker.wallet.add_transaction(closing_tx) 1843 except UnrelatedTransactionException: 1844 pass # this can happen if (~all the balance goes to REMOTE) 1845 chan.set_state(ChannelState.CLOSING) 1846 # broadcast 1847 await self.network.try_broadcasting(closing_tx, 'closing') 1848 return closing_tx.txid() 1849 1850 async def htlc_switch(self): 1851 await self.initialized 1852 while True: 1853 self._htlc_switch_iterdone_event.set() 1854 self._htlc_switch_iterdone_event.clear() 1855 await asyncio.sleep(0.1) # TODO maybe make this partly event-driven 1856 self._htlc_switch_iterstart_event.set() 1857 self._htlc_switch_iterstart_event.clear() 1858 self.ping_if_required() 1859 self._maybe_cleanup_received_htlcs_pending_removal() 1860 for chan_id, chan in self.channels.items(): 1861 if not chan.can_send_ctx_updates(): 1862 continue 1863 self.maybe_send_commitment(chan) 1864 done = set() 1865 unfulfilled = chan.hm.log.get('unfulfilled_htlcs', {}) 1866 for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarding_info) in unfulfilled.items(): 1867 if not chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id): 1868 continue 1869 htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id) 1870 error_reason = None # type: Optional[OnionRoutingFailure] 1871 error_bytes = None # type: Optional[bytes] 1872 preimage = None 1873 fw_info = None 1874 onion_packet_bytes = bytes.fromhex(onion_packet_hex) 1875 onion_packet = None 1876 try: 1877 onion_packet = OnionPacket.from_bytes(onion_packet_bytes) 1878 except OnionRoutingFailure as e: 1879 error_reason = e 1880 else: 1881 try: 1882 preimage, fw_info, error_bytes = self.process_unfulfilled_htlc( 1883 chan=chan, 1884 htlc=htlc, 1885 forwarding_info=forwarding_info, 1886 onion_packet_bytes=onion_packet_bytes, 1887 onion_packet=onion_packet) 1888 except OnionRoutingFailure as e: 1889 error_bytes = construct_onion_error(e, onion_packet, our_onion_private_key=self.privkey) 1890 if fw_info: 1891 unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, fw_info 1892 elif preimage or error_reason or error_bytes: 1893 if preimage: 1894 if not self.lnworker.enable_htlc_settle: 1895 continue 1896 self.fulfill_htlc(chan, htlc.htlc_id, preimage) 1897 elif error_bytes: 1898 self.fail_htlc( 1899 chan=chan, 1900 htlc_id=htlc.htlc_id, 1901 error_bytes=error_bytes) 1902 else: 1903 self.fail_malformed_htlc( 1904 chan=chan, 1905 htlc_id=htlc.htlc_id, 1906 reason=error_reason) 1907 done.add(htlc_id) 1908 # cleanup 1909 for htlc_id in done: 1910 unfulfilled.pop(htlc_id) 1911 1912 def _maybe_cleanup_received_htlcs_pending_removal(self) -> None: 1913 done = set() 1914 for chan, htlc_id in self.received_htlcs_pending_removal: 1915 if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id): 1916 done.add((chan, htlc_id)) 1917 if done: 1918 for key in done: 1919 self.received_htlcs_pending_removal.remove(key) 1920 self.received_htlc_removed_event.set() 1921 self.received_htlc_removed_event.clear() 1922 1923 async def wait_one_htlc_switch_iteration(self) -> None: 1924 """Waits until the HTLC switch does a full iteration or the peer disconnects, 1925 whichever happens first. 1926 """ 1927 async def htlc_switch_iteration(): 1928 await self._htlc_switch_iterstart_event.wait() 1929 await self._htlc_switch_iterdone_event.wait() 1930 1931 async with TaskGroup(wait=any) as group: 1932 await group.spawn(htlc_switch_iteration()) 1933 await group.spawn(self.got_disconnected.wait()) 1934 1935 def process_unfulfilled_htlc( 1936 self, *, 1937 chan: Channel, 1938 htlc: UpdateAddHtlc, 1939 forwarding_info: Tuple[str, int], 1940 onion_packet_bytes: bytes, 1941 onion_packet: OnionPacket) -> Tuple[Optional[bytes], Union[bool, None, Tuple[str, int]], Optional[bytes]]: 1942 """ 1943 return (preimage, fw_info, error_bytes) with at most a single element that is not None 1944 raise an OnionRoutingFailure if we need to fail the htlc 1945 """ 1946 payment_hash = htlc.payment_hash 1947 processed_onion = self.process_onion_packet( 1948 onion_packet, 1949 payment_hash=payment_hash, 1950 onion_packet_bytes=onion_packet_bytes) 1951 if processed_onion.are_we_final: 1952 # either we are final recipient; or if trampoline, see cases below 1953 preimage, trampoline_onion_packet = self.maybe_fulfill_htlc( 1954 chan=chan, 1955 htlc=htlc, 1956 processed_onion=processed_onion) 1957 if trampoline_onion_packet: 1958 # trampoline- recipient or forwarding 1959 if not forwarding_info: 1960 trampoline_onion = self.process_onion_packet( 1961 trampoline_onion_packet, 1962 payment_hash=htlc.payment_hash, 1963 onion_packet_bytes=onion_packet_bytes, 1964 is_trampoline=True) 1965 if trampoline_onion.are_we_final: 1966 # trampoline- we are final recipient of HTLC 1967 preimage, _ = self.maybe_fulfill_htlc( 1968 chan=chan, 1969 htlc=htlc, 1970 processed_onion=trampoline_onion, 1971 is_trampoline=True) 1972 else: 1973 # trampoline- HTLC we are supposed to forward, but haven't forwarded yet 1974 if not self.lnworker.enable_htlc_forwarding: 1975 return None, None, None 1976 self.maybe_forward_trampoline( 1977 chan=chan, 1978 htlc=htlc, 1979 trampoline_onion=trampoline_onion) 1980 # return True so that this code gets executed only once 1981 return None, True, None 1982 else: 1983 # trampoline- HTLC we are supposed to forward, and have already forwarded 1984 preimage = self.lnworker.get_preimage(payment_hash) 1985 error_reason = self.lnworker.trampoline_forwarding_failures.pop(payment_hash, None) 1986 if error_reason: 1987 self.logger.info(f'trampoline forwarding failure: {error_reason.code_name()}') 1988 raise error_reason 1989 1990 elif not forwarding_info: 1991 # HTLC we are supposed to forward, but haven't forwarded yet 1992 if not self.lnworker.enable_htlc_forwarding: 1993 return None, None, None 1994 next_chan_id, next_htlc_id = self.maybe_forward_htlc( 1995 htlc=htlc, 1996 processed_onion=processed_onion) 1997 fw_info = (next_chan_id.hex(), next_htlc_id) 1998 return None, fw_info, None 1999 else: 2000 # HTLC we are supposed to forward, and have already forwarded 2001 preimage = self.lnworker.get_preimage(payment_hash) 2002 next_chan_id_hex, htlc_id = forwarding_info 2003 next_chan = self.lnworker.get_channel_by_short_id(bytes.fromhex(next_chan_id_hex)) 2004 if next_chan: 2005 error_bytes, error_reason = next_chan.pop_fail_htlc_reason(htlc_id) 2006 if error_bytes: 2007 return None, None, error_bytes 2008 if error_reason: 2009 raise error_reason 2010 if preimage: 2011 return preimage, None, None 2012 return None, None, None 2013 2014 def process_onion_packet( 2015 self, 2016 onion_packet: OnionPacket, *, 2017 payment_hash: bytes, 2018 onion_packet_bytes: bytes, 2019 is_trampoline: bool = False) -> ProcessedOnionPacket: 2020 2021 failure_data = sha256(onion_packet_bytes) 2022 try: 2023 processed_onion = process_onion_packet( 2024 onion_packet, 2025 associated_data=payment_hash, 2026 our_onion_private_key=self.privkey, 2027 is_trampoline=is_trampoline) 2028 except UnsupportedOnionPacketVersion: 2029 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data) 2030 except InvalidOnionPubkey: 2031 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_KEY, data=failure_data) 2032 except InvalidOnionMac: 2033 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_HMAC, data=failure_data) 2034 except Exception as e: 2035 self.logger.info(f"error processing onion packet: {e!r}") 2036 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data) 2037 if self.network.config.get('test_fail_malformed_htlc'): 2038 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data) 2039 if self.network.config.get('test_fail_htlcs_with_temp_node_failure'): 2040 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'') 2041 return processed_onion 2042