1#!/usr/local/bin/python3.8
2#
3# Copyright (C) 2018 The Electrum developers
4# Distributed under the MIT software license, see the accompanying
5# file LICENCE or http://www.opensource.org/licenses/mit-license.php
6
7import zlib
8from collections import OrderedDict, defaultdict
9import asyncio
10import os
11import time
12from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set
13from datetime import datetime
14import functools
15
16import aiorpcx
17from aiorpcx import TaskGroup
18
19from .crypto import sha256, sha256d
20from . import bitcoin, util
21from . import ecc
22from .ecc import sig_string_from_r_and_s, der_sig_from_sig_string
23from . import constants
24from .util import (bh2u, bfh, log_exceptions, ignore_exceptions, chunks, SilentTaskGroup,
25                   UnrelatedTransactionException)
26from . import transaction
27from .bitcoin import make_op_return
28from .transaction import PartialTxOutput, match_script_against_template
29from .logging import Logger
30from .lnonion import (new_onion_packet, OnionFailureCode, calc_hops_data_for_payment,
31                      process_onion_packet, OnionPacket, construct_onion_error, OnionRoutingFailure,
32                      ProcessedOnionPacket, UnsupportedOnionPacketVersion, InvalidOnionMac, InvalidOnionPubkey,
33                      OnionFailureCodeMetaFlag)
34from .lnchannel import Channel, RevokeAndAck, RemoteCtnTooFarInFuture, ChannelState, PeerState
35from . import lnutil
36from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc,
37                     RemoteConfig, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore,
38                     funding_output_script, get_per_commitment_secret_from_seed,
39                     secret_to_pubkey, PaymentFailure, LnFeatures,
40                     LOCAL, REMOTE, HTLCOwner,
41                     ln_compare_features, privkey_to_pubkey, MIN_FINAL_CLTV_EXPIRY_ACCEPTED,
42                     LightningPeerConnectionClosed, HandshakeFailed,
43                     RemoteMisbehaving, ShortChannelID,
44                     IncompatibleLightningFeatures, derive_payment_secret_from_payment_preimage,
45                     LN_MAX_FUNDING_SAT, calc_fees_for_commitment_tx,
46                     UpfrontShutdownScriptViolation)
47from .lnutil import FeeUpdate, channel_id_from_funding_tx
48from .lntransport import LNTransport, LNTransportBase
49from .lnmsg import encode_msg, decode_msg, UnknownOptionalMsgType
50from .interface import GracefulDisconnect
51from .lnrouter import fee_for_edge_msat
52from .lnutil import ln_dummy_address
53from .json_db import StoredDict
54from .invoices import PR_PAID
55
56if TYPE_CHECKING:
57    from .lnworker import LNGossip, LNWallet
58    from .lnrouter import LNPaymentRoute
59    from .transaction import PartialTransaction
60
61
62LN_P2P_NETWORK_TIMEOUT = 20
63
64
65class Peer(Logger):
66    LOGGING_SHORTCUT = 'P'
67
68    ORDERED_MESSAGES = (
69        'accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'channel_reestablish', 'closing_signed')
70    SPAMMY_MESSAGES = (
71        'ping', 'pong', 'channel_announcement', 'node_announcement', 'channel_update',)
72
73    def __init__(
74            self,
75            lnworker: Union['LNGossip', 'LNWallet'],
76            pubkey: bytes,
77            transport: LNTransportBase,
78            *, is_channel_backup= False):
79
80        self.is_channel_backup = is_channel_backup
81        self._sent_init = False  # type: bool
82        self._received_init = False  # type: bool
83        self.initialized = asyncio.Future()
84        self.got_disconnected = asyncio.Event()
85        self.querying = asyncio.Event()
86        self.transport = transport
87        self.pubkey = pubkey  # remote pubkey
88        self.lnworker = lnworker
89        self.privkey = self.transport.privkey  # local privkey
90        self.features = self.lnworker.features  # type: LnFeatures
91        self.their_features = LnFeatures(0)  # type: LnFeatures
92        self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
93        assert self.node_ids[0] != self.node_ids[1]
94        self.network = lnworker.network
95        self.ping_time = 0
96        self.reply_channel_range = asyncio.Queue()
97        # gossip uses a single queue to preserve message order
98        self.gossip_queue = asyncio.Queue()
99        self.ordered_message_queues = defaultdict(asyncio.Queue) # for messsage that are ordered
100        self.temp_id_to_id = {}   # to forward error messages
101        self.funding_created_sent = set() # for channels in PREOPENING
102        self.funding_signed_sent = set()  # for channels in PREOPENING
103        self.shutdown_received = {} # chan_id -> asyncio.Future()
104        self.announcement_signatures = defaultdict(asyncio.Queue)
105        self.orphan_channel_updates = OrderedDict()  # type: OrderedDict[ShortChannelID, dict]
106        Logger.__init__(self)
107        self.taskgroup = SilentTaskGroup()
108        # HTLCs offered by REMOTE, that we started removing but are still active:
109        self.received_htlcs_pending_removal = set()  # type: Set[Tuple[Channel, int]]
110        self.received_htlc_removed_event = asyncio.Event()
111        self._htlc_switch_iterstart_event = asyncio.Event()
112        self._htlc_switch_iterdone_event = asyncio.Event()
113
114    def send_message(self, message_name: str, **kwargs):
115        assert type(message_name) is str
116        if message_name not in self.SPAMMY_MESSAGES:
117            self.logger.debug(f"Sending {message_name.upper()}")
118        if message_name.upper() != "INIT" and not self.is_initialized():
119            raise Exception("tried to send message before we are initialized")
120        raw_msg = encode_msg(message_name, **kwargs)
121        self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id"))
122        self.transport.send_bytes(raw_msg)
123
124    def _store_raw_msg_if_local_update(self, raw_msg: bytes, *, message_name: str, channel_id: Optional[bytes]):
125        is_commitment_signed = message_name == "commitment_signed"
126        if not (message_name.startswith("update_") or is_commitment_signed):
127            return
128        assert channel_id
129        chan = self.get_channel_by_id(channel_id)
130        if not chan:
131            raise Exception(f"channel {channel_id.hex()} not found for peer {self.pubkey.hex()}")
132        chan.hm.store_local_update_raw_msg(raw_msg, is_commitment_signed=is_commitment_signed)
133        if is_commitment_signed:
134            # saving now, to ensure replaying updates works (in case of channel reestablishment)
135            self.lnworker.save_channel(chan)
136
137    def maybe_set_initialized(self):
138        if self.initialized.done():
139            return
140        if self._sent_init and self._received_init:
141            self.initialized.set_result(True)
142
143    def is_initialized(self) -> bool:
144        return (self.initialized.done()
145                and not self.initialized.cancelled()
146                and self.initialized.exception() is None
147                and self.initialized.result() is True)
148
149    async def initialize(self):
150        if isinstance(self.transport, LNTransport):
151            await self.transport.handshake()
152        features = self.features.for_init_message()
153        b = int.bit_length(features)
154        flen = b // 8 + int(bool(b % 8))
155        self.send_message(
156            "init", gflen=0, flen=flen,
157            features=features,
158            init_tlvs={
159                'networks':
160                {'chains': constants.net.rev_genesis_bytes()}
161            })
162        self._sent_init = True
163        self.maybe_set_initialized()
164
165    @property
166    def channels(self) -> Dict[bytes, Channel]:
167        return self.lnworker.channels_for_peer(self.pubkey)
168
169    def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
170        # note: this is faster than self.channels.get(channel_id)
171        chan = self.lnworker.get_channel_by_id(channel_id)
172        if not chan:
173            return None
174        if chan.node_id != self.pubkey:
175            return None
176        return chan
177
178    def diagnostic_name(self):
179        return self.lnworker.__class__.__name__ + ', ' + self.transport.name()
180
181    def ping_if_required(self):
182        if time.time() - self.ping_time > 120:
183            self.send_message('ping', num_pong_bytes=4, byteslen=4)
184            self.ping_time = time.time()
185
186    def process_message(self, message):
187        try:
188            message_type, payload = decode_msg(message)
189        except UnknownOptionalMsgType as e:
190            self.logger.info(f"received unknown message from peer. ignoring: {e!r}")
191            return
192        if message_type not in self.SPAMMY_MESSAGES:
193            self.logger.debug(f"Received {message_type.upper()}")
194        # only process INIT if we are a backup
195        if self.is_channel_backup is True and message_type != 'init':
196            return
197        if message_type in self.ORDERED_MESSAGES:
198            chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
199            self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
200        else:
201            if message_type != 'error' and 'channel_id' in payload:
202                chan = self.get_channel_by_id(payload['channel_id'])
203                if chan is None:
204                    raise Exception('Got unknown '+ message_type)
205                args = (chan, payload)
206            else:
207                args = (payload,)
208            try:
209                f = getattr(self, 'on_' + message_type)
210            except AttributeError:
211                #self.logger.info("Received '%s'" % message_type.upper(), payload)
212                return
213            # raw message is needed to check signature
214            if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
215                payload['raw'] = message
216            execution_result = f(*args)
217            if asyncio.iscoroutinefunction(f):
218                asyncio.ensure_future(self.taskgroup.spawn(execution_result))
219
220    def on_error(self, payload):
221        self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}")
222        chan_id = payload.get("channel_id")
223        if chan_id in self.temp_id_to_id:
224            chan_id = self.temp_id_to_id[chan_id]
225        self.ordered_message_queues[chan_id].put_nowait((None, {'error':payload['data']}))
226
227    def on_ping(self, payload):
228        l = payload['num_pong_bytes']
229        self.send_message('pong', byteslen=l)
230
231    def on_pong(self, payload):
232        pass
233
234    async def wait_for_message(self, expected_name, channel_id):
235        q = self.ordered_message_queues[channel_id]
236        name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT)
237        if payload.get('error'):
238            raise Exception('Remote peer reported error [DO NOT TRUST THIS MESSAGE]: ' + repr(payload.get('error')))
239        if name != expected_name:
240            raise Exception(f"Received unexpected '{name}'")
241        return payload
242
243    def on_init(self, payload):
244        if self._received_init:
245            self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT")
246            return
247        self.their_features = LnFeatures(int.from_bytes(payload['features'], byteorder="big"))
248        their_globalfeatures = int.from_bytes(payload['globalfeatures'], byteorder="big")
249        self.their_features |= their_globalfeatures
250        # check transitive dependencies for received features
251        if not self.their_features.validate_transitive_dependencies():
252            raise GracefulDisconnect("remote did not set all dependencies for the features they sent")
253        # check if features are compatible, and set self.features to what we negotiated
254        try:
255            self.features = ln_compare_features(self.features, self.their_features)
256        except IncompatibleLightningFeatures as e:
257            self.initialized.set_exception(e)
258            raise GracefulDisconnect(f"{str(e)}")
259        # check that they are on the same chain as us, if provided
260        their_networks = payload["init_tlvs"].get("networks")
261        if their_networks:
262            their_chains = list(chunks(their_networks["chains"], 32))
263            if constants.net.rev_genesis_bytes() not in their_chains:
264                raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})")
265        # all checks passed
266        self.lnworker.on_peer_successfully_established(self)
267        self._received_init = True
268        self.maybe_set_initialized()
269
270    def on_node_announcement(self, payload):
271        if self.lnworker.channel_db:
272            self.gossip_queue.put_nowait(('node_announcement', payload))
273
274    def on_channel_announcement(self, payload):
275        if self.lnworker.channel_db:
276            self.gossip_queue.put_nowait(('channel_announcement', payload))
277
278    def on_channel_update(self, payload):
279        self.maybe_save_remote_update(payload)
280        if self.lnworker.channel_db:
281            self.gossip_queue.put_nowait(('channel_update', payload))
282
283    def maybe_save_remote_update(self, payload):
284        if not self.channels:
285            return
286        for chan in self.channels.values():
287            if chan.short_channel_id == payload['short_channel_id']:
288                chan.set_remote_update(payload)
289                self.logger.info("saved remote_update")
290                break
291        else:
292            # Save (some bounded number of) orphan channel updates for later
293            # as it might be for our own direct channel with this peer
294            # (and we might not yet know the short channel id for that)
295            # Background: this code is here to deal with a bug in LND,
296            # see https://github.com/lightningnetwork/lnd/issues/3651
297            # and https://github.com/lightningnetwork/lightning-rfc/pull/657
298            # This code assumes gossip_queries is set. BOLT7: "if the
299            # gossip_queries feature is negotiated, [a node] MUST NOT
300            # send gossip it did not generate itself"
301            short_channel_id = ShortChannelID(payload['short_channel_id'])
302            self.logger.info(f'received orphan channel update {short_channel_id}')
303            self.orphan_channel_updates[short_channel_id] = payload
304            while len(self.orphan_channel_updates) > 25:
305                self.orphan_channel_updates.popitem(last=False)
306
307    def on_announcement_signatures(self, chan: Channel, payload):
308        if chan.config[LOCAL].was_announced:
309            h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
310        else:
311            self.announcement_signatures[chan.channel_id].put_nowait(payload)
312
313    def handle_disconnect(func):
314        @functools.wraps(func)
315        async def wrapper_func(self, *args, **kwargs):
316            try:
317                return await func(self, *args, **kwargs)
318            except GracefulDisconnect as e:
319                self.logger.log(e.log_level, f"Disconnecting: {repr(e)}")
320            except (LightningPeerConnectionClosed, IncompatibleLightningFeatures,
321                    aiorpcx.socks.SOCKSError) as e:
322                self.logger.info(f"Disconnecting: {repr(e)}")
323            finally:
324                self.close_and_cleanup()
325        return wrapper_func
326
327    @ignore_exceptions  # do not kill outer taskgroup
328    @log_exceptions
329    @handle_disconnect
330    async def main_loop(self):
331        async with self.taskgroup as group:
332            await group.spawn(self._message_loop())
333            await group.spawn(self.htlc_switch())
334            await group.spawn(self.query_gossip())
335            await group.spawn(self.process_gossip())
336
337    async def process_gossip(self):
338        while True:
339            await asyncio.sleep(5)
340            if not self.network.lngossip:
341                continue
342            chan_anns = []
343            chan_upds = []
344            node_anns = []
345            while True:
346                name, payload = await self.gossip_queue.get()
347                if name == 'channel_announcement':
348                    chan_anns.append(payload)
349                elif name == 'channel_update':
350                    chan_upds.append(payload)
351                elif name == 'node_announcement':
352                    node_anns.append(payload)
353                else:
354                    raise Exception('unknown message')
355                if self.gossip_queue.empty():
356                    break
357            if self.network.lngossip:
358                await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds)
359
360    async def query_gossip(self):
361        try:
362            await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
363        except Exception as e:
364            raise GracefulDisconnect(f"Failed to initialize: {e!r}") from e
365        if self.lnworker == self.lnworker.network.lngossip:
366            try:
367                ids, complete = await asyncio.wait_for(self.get_channel_range(), LN_P2P_NETWORK_TIMEOUT)
368            except asyncio.TimeoutError as e:
369                raise GracefulDisconnect("query_channel_range timed out") from e
370            self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
371            await self.lnworker.add_new_ids(ids)
372            while True:
373                todo = self.lnworker.get_ids_to_query()
374                if not todo:
375                    await asyncio.sleep(1)
376                    continue
377                await self.get_short_channel_ids(todo)
378
379    async def get_channel_range(self):
380        first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS
381        num_blocks = self.lnworker.network.get_local_height() - first_block
382        self.query_channel_range(first_block, num_blocks)
383        intervals = []
384        ids = set()
385        # note: implementations behave differently...
386        # "sane implementation that follows BOLT-07" example:
387        #   query_channel_range. <<< first_block 497000, num_blocks 79038
388        #   on_reply_channel_range. >>> first_block 497000, num_blocks 39516, num_ids 4648, complete True
389        #   on_reply_channel_range. >>> first_block 536516, num_blocks 19758, num_ids 5734, complete True
390        #   on_reply_channel_range. >>> first_block 556274, num_blocks 9879, num_ids 13712, complete True
391        #   on_reply_channel_range. >>> first_block 566153, num_blocks 9885, num_ids 18114, complete True
392        # lnd example:
393        #   query_channel_range. <<< first_block 497000, num_blocks 79038
394        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
395        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
396        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
397        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
398        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 5344, complete True
399        while True:
400            index, num, complete, _ids = await self.reply_channel_range.get()
401            ids.update(_ids)
402            intervals.append((index, index+num))
403            intervals.sort()
404            while len(intervals) > 1:
405                a,b = intervals[0]
406                c,d = intervals[1]
407                if not (a <= c and a <= b and c <= d):
408                    raise Exception(f"insane reply_channel_range intervals {(a,b,c,d)}")
409                if b >= c:
410                    intervals = [(a,d)] + intervals[2:]
411                else:
412                    break
413            if len(intervals) == 1 and complete:
414                a, b = intervals[0]
415                if a <= first_block and b >= first_block + num_blocks:
416                    break
417        return ids, complete
418
419    def request_gossip(self, timestamp=0):
420        if timestamp == 0:
421            self.logger.info('requesting whole channel graph')
422        else:
423            self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).ctime()}')
424        self.send_message(
425            'gossip_timestamp_filter',
426            chain_hash=constants.net.rev_genesis_bytes(),
427            first_timestamp=timestamp,
428            timestamp_range=b'\xff'*4)
429
430    def query_channel_range(self, first_block, num_blocks):
431        self.logger.info(f'query channel range {first_block} {num_blocks}')
432        self.send_message(
433            'query_channel_range',
434            chain_hash=constants.net.rev_genesis_bytes(),
435            first_blocknum=first_block,
436            number_of_blocks=num_blocks)
437
438    def decode_short_ids(self, encoded):
439        if encoded[0] == 0:
440            decoded = encoded[1:]
441        elif encoded[0] == 1:
442            decoded = zlib.decompress(encoded[1:])
443        else:
444            raise Exception(f'decode_short_ids: unexpected first byte: {encoded[0]}')
445        ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)]
446        return ids
447
448    def on_reply_channel_range(self, payload):
449        first = payload['first_blocknum']
450        num = payload['number_of_blocks']
451        complete = bool(int.from_bytes(payload['complete'], 'big'))
452        encoded = payload['encoded_short_ids']
453        ids = self.decode_short_ids(encoded)
454        #self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, num_ids {len(ids)}, complete {repr(payload['complete'])}")
455        self.reply_channel_range.put_nowait((first, num, complete, ids))
456
457    async def get_short_channel_ids(self, ids):
458        self.logger.info(f'Querying {len(ids)} short_channel_ids')
459        assert not self.querying.is_set()
460        self.query_short_channel_ids(ids)
461        await self.querying.wait()
462        self.querying.clear()
463
464    def query_short_channel_ids(self, ids, compressed=True):
465        ids = sorted(ids)
466        s = b''.join(ids)
467        encoded = zlib.compress(s) if compressed else s
468        prefix = b'\x01' if compressed else b'\x00'
469        self.send_message(
470            'query_short_channel_ids',
471            chain_hash=constants.net.rev_genesis_bytes(),
472            len=1+len(encoded),
473            encoded_short_ids=prefix+encoded)
474
475    async def _message_loop(self):
476        try:
477            await asyncio.wait_for(self.initialize(), LN_P2P_NETWORK_TIMEOUT)
478        except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
479            raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
480        async for msg in self.transport.read_messages():
481            self.process_message(msg)
482            await asyncio.sleep(.01)
483
484    def on_reply_short_channel_ids_end(self, payload):
485        self.querying.set()
486
487    def close_and_cleanup(self):
488        # note: This method might get called multiple times!
489        #       E.g. if you call close_and_cleanup() to cause a disconnection from the peer,
490        #       it will get called a second time in handle_disconnect().
491        try:
492            if self.transport:
493                self.transport.close()
494        except:
495            pass
496        self.lnworker.peer_closed(self)
497        self.got_disconnected.set()
498
499    def is_static_remotekey(self):
500        return self.features.supports(LnFeatures.OPTION_STATIC_REMOTEKEY_OPT)
501
502    def is_upfront_shutdown_script(self):
503        return self.features.supports(LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT)
504
505    def upfront_shutdown_script_from_payload(self, payload, msg_identifier: str) -> Optional[bytes]:
506        if msg_identifier not in ['accept', 'open']:
507            raise ValueError("msg_identifier must be either 'accept' or 'open'")
508
509        uss_tlv = payload[msg_identifier + '_channel_tlvs'].get(
510            'upfront_shutdown_script')
511
512        if uss_tlv and self.is_upfront_shutdown_script():
513            upfront_shutdown_script = uss_tlv['shutdown_scriptpubkey']
514        else:
515            upfront_shutdown_script = b''
516        self.logger.info(f"upfront shutdown script received: {upfront_shutdown_script}")
517        return upfront_shutdown_script
518
519    def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner) -> LocalConfig:
520        channel_seed = os.urandom(32)
521        initial_msat = funding_sat * 1000 - push_msat if initiator == LOCAL else push_msat
522
523        static_remotekey = None
524        # sending empty bytes as the upfront_shutdown_script will give us the
525        # flexibility to decide an address at closing time
526        upfront_shutdown_script = b''
527
528        if self.is_static_remotekey():
529            wallet = self.lnworker.wallet
530            assert wallet.txin_type == 'p2wpkh'
531            addr = wallet.get_new_sweep_address_for_channel()
532            static_remotekey = bfh(wallet.get_public_key(addr))
533        else:
534            static_remotekey = None
535        dust_limit_sat = bitcoin.DUST_LIMIT_DEFAULT_SAT_LEGACY
536        reserve_sat = max(funding_sat // 100, dust_limit_sat)
537        # for comparison of defaults, see
538        # https://github.com/ACINQ/eclair/blob/afa378fbb73c265da44856b4ad0f2128a88ae6c6/eclair-core/src/main/resources/reference.conf#L66
539        # https://github.com/ElementsProject/lightning/blob/0056dd75572a8857cff36fcbdb1a2295a1ac9253/lightningd/options.c#L657
540        # https://github.com/lightningnetwork/lnd/blob/56b61078c5b2be007d318673a5f3b40c6346883a/config.go#L81
541        local_config = LocalConfig.from_seed(
542            channel_seed=channel_seed,
543            static_remotekey=static_remotekey,
544            upfront_shutdown_script=upfront_shutdown_script,
545            to_self_delay=self.network.config.get('lightning_to_self_delay', 7 * 144),
546            dust_limit_sat=dust_limit_sat,
547            max_htlc_value_in_flight_msat=funding_sat * 1000,
548            max_accepted_htlcs=30,
549            initial_msat=initial_msat,
550            reserve_sat=reserve_sat,
551            funding_locked_received=False,
552            was_announced=False,
553            current_commitment_signature=None,
554            current_htlc_signatures=b'',
555            htlc_minimum_msat=1,
556        )
557        local_config.validate_params(funding_sat=funding_sat)
558        return local_config
559
560    def temporarily_reserve_funding_tx_change_address(func):
561        # During the channel open flow, if we initiated, we might have used a change address
562        # of ours in the funding tx. The funding tx is not part of the wallet history
563        # at that point yet, but we should already consider this change address as 'used'.
564        @functools.wraps(func)
565        async def wrapper(self: 'Peer', *args, **kwargs):
566            funding_tx = kwargs['funding_tx']  # type: PartialTransaction
567            wallet = self.lnworker.wallet
568            change_addresses = [txout.address for txout in funding_tx.outputs()
569                                if wallet.is_change(txout.address)]
570            for addr in change_addresses:
571                wallet.set_reserved_state_of_address(addr, reserved=True)
572            try:
573                return await func(self, *args, **kwargs)
574            finally:
575                for addr in change_addresses:
576                    self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False)
577        return wrapper
578
579    @log_exceptions
580    @temporarily_reserve_funding_tx_change_address
581    async def channel_establishment_flow(
582            self, *,
583            funding_tx: 'PartialTransaction',
584            funding_sat: int,
585            push_msat: int,
586            temp_channel_id: bytes
587    ) -> Tuple[Channel, 'PartialTransaction']:
588        """Implements the channel opening flow.
589
590        -> open_channel message
591        <- accept_channel message
592        -> funding_created message
593        <- funding_signed message
594
595        Channel configurations are initialized in this method.
596        """
597        # will raise if init fails
598        await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
599        # trampoline is not yet in features
600        if not self.lnworker.channel_db and not self.lnworker.is_trampoline_peer(self.pubkey):
601            raise Exception('Not a trampoline node: ' + str(self.their_features))
602
603        if funding_sat > LN_MAX_FUNDING_SAT:
604            raise Exception(
605                f"MUST set funding_satoshis to less than 2^24 satoshi. "
606                f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}")
607        if push_msat > 1000 * funding_sat:
608            raise Exception(
609                f"MUST set push_msat to equal or less than 1000 * funding_satoshis: "
610                f"{push_msat} msat > {1000 * funding_sat} msat")
611        if funding_sat < lnutil.MIN_FUNDING_SAT:
612            raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}")
613
614        feerate = self.lnworker.current_feerate_per_kw()
615        local_config = self.make_local_config(funding_sat, push_msat, LOCAL)
616
617        # for the first commitment transaction
618        per_commitment_secret_first = get_per_commitment_secret_from_seed(
619            local_config.per_commitment_secret_seed,
620            RevocationStore.START_INDEX
621        )
622        per_commitment_point_first = secret_to_pubkey(
623            int.from_bytes(per_commitment_secret_first, 'big'))
624        self.send_message(
625            "open_channel",
626            temporary_channel_id=temp_channel_id,
627            chain_hash=constants.net.rev_genesis_bytes(),
628            funding_satoshis=funding_sat,
629            push_msat=push_msat,
630            dust_limit_satoshis=local_config.dust_limit_sat,
631            feerate_per_kw=feerate,
632            max_accepted_htlcs=local_config.max_accepted_htlcs,
633            funding_pubkey=local_config.multisig_key.pubkey,
634            revocation_basepoint=local_config.revocation_basepoint.pubkey,
635            htlc_basepoint=local_config.htlc_basepoint.pubkey,
636            payment_basepoint=local_config.payment_basepoint.pubkey,
637            delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
638            first_per_commitment_point=per_commitment_point_first,
639            to_self_delay=local_config.to_self_delay,
640            max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
641            channel_flags=0x00,  # not willing to announce channel
642            channel_reserve_satoshis=local_config.reserve_sat,
643            htlc_minimum_msat=local_config.htlc_minimum_msat,
644            open_channel_tlvs={
645                'upfront_shutdown_script':
646                    {'shutdown_scriptpubkey': local_config.upfront_shutdown_script}
647            }
648        )
649
650        # <- accept_channel
651        payload = await self.wait_for_message('accept_channel', temp_channel_id)
652        remote_per_commitment_point = payload['first_per_commitment_point']
653        funding_txn_minimum_depth = payload['minimum_depth']
654        if funding_txn_minimum_depth <= 0:
655            raise Exception(f"minimum depth too low, {funding_txn_minimum_depth}")
656        if funding_txn_minimum_depth > 30:
657            raise Exception(f"minimum depth too high, {funding_txn_minimum_depth}")
658
659        upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
660            payload, 'accept')
661
662        remote_config = RemoteConfig(
663            payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
664            multisig_key=OnlyPubkeyKeypair(payload["funding_pubkey"]),
665            htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
666            delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
667            revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
668            to_self_delay=payload['to_self_delay'],
669            dust_limit_sat=payload['dust_limit_satoshis'],
670            max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
671            max_accepted_htlcs=payload["max_accepted_htlcs"],
672            initial_msat=push_msat,
673            reserve_sat=payload["channel_reserve_satoshis"],
674            htlc_minimum_msat=payload['htlc_minimum_msat'],
675            next_per_commitment_point=remote_per_commitment_point,
676            current_per_commitment_point=None,
677            upfront_shutdown_script=upfront_shutdown_script
678        )
679        remote_config.validate_params(funding_sat=funding_sat)
680        # if channel_reserve_satoshis is less than dust_limit_satoshis within the open_channel message:
681        #     MUST reject the channel.
682        if remote_config.reserve_sat < local_config.dust_limit_sat:
683            raise Exception("violated constraint: remote_config.reserve_sat < local_config.dust_limit_sat")
684        # if channel_reserve_satoshis from the open_channel message is less than dust_limit_satoshis:
685        #     MUST reject the channel.
686        if local_config.reserve_sat < remote_config.dust_limit_sat:
687            raise Exception("violated constraint: local_config.reserve_sat < remote_config.dust_limit_sat")
688
689        # -> funding created
690        # replace dummy output in funding tx
691        redeem_script = funding_output_script(local_config, remote_config)
692        funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script)
693        funding_output = PartialTxOutput.from_address_and_value(funding_address, funding_sat)
694        dummy_output = PartialTxOutput.from_address_and_value(ln_dummy_address(), funding_sat)
695        if dummy_output not in funding_tx.outputs(): raise Exception("LN dummy output (err 1)")
696        funding_tx._outputs.remove(dummy_output)
697        if dummy_output in funding_tx.outputs(): raise Exception("LN dummy output (err 2)")
698        funding_tx.add_outputs([funding_output])
699        # find and encrypt op_return data associated to funding_address
700        has_onchain_backup = self.lnworker and self.lnworker.has_recoverable_channels()
701        if has_onchain_backup:
702            backup_data = self.lnworker.cb_data(self.pubkey)
703            dummy_scriptpubkey = make_op_return(backup_data)
704            for o in funding_tx.outputs():
705                if o.scriptpubkey == dummy_scriptpubkey:
706                    encrypted_data = self.lnworker.encrypt_cb_data(backup_data, funding_address)
707                    assert len(encrypted_data) == len(backup_data)
708                    o.scriptpubkey = make_op_return(encrypted_data)
709                    break
710            else:
711                raise Exception('op_return output not found in funding tx')
712        # must not be malleable
713        funding_tx.set_rbf(False)
714        if not funding_tx.is_segwit():
715            raise Exception('Funding transaction is not segwit')
716        funding_txid = funding_tx.txid()
717        assert funding_txid
718        funding_index = funding_tx.outputs().index(funding_output)
719        # build remote commitment transaction
720        channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index)
721        outpoint = Outpoint(funding_txid, funding_index)
722        constraints = ChannelConstraints(
723            capacity=funding_sat,
724            is_initiator=True,
725            funding_txn_minimum_depth=funding_txn_minimum_depth
726        )
727        storage = self.create_channel_storage(
728            channel_id, outpoint, local_config, remote_config, constraints)
729        chan = Channel(
730            storage,
731            sweep_address=self.lnworker.sweep_address,
732            lnworker=self.lnworker,
733            initial_feerate=feerate
734        )
735        chan.storage['funding_inputs'] = [txin.prevout.to_json() for txin in funding_tx.inputs()]
736        chan.storage['has_onchain_backup'] = has_onchain_backup
737        if isinstance(self.transport, LNTransport):
738            chan.add_or_update_peer_addr(self.transport.peer_addr)
739        sig_64, _ = chan.sign_next_commitment()
740        self.temp_id_to_id[temp_channel_id] = channel_id
741
742        self.send_message("funding_created",
743            temporary_channel_id=temp_channel_id,
744            funding_txid=funding_txid_bytes,
745            funding_output_index=funding_index,
746            signature=sig_64)
747        self.funding_created_sent.add(channel_id)
748
749        # <- funding signed
750        payload = await self.wait_for_message('funding_signed', channel_id)
751        self.logger.info('received funding_signed')
752        remote_sig = payload['signature']
753        chan.receive_new_commitment(remote_sig, [])
754        chan.open_with_first_pcp(remote_per_commitment_point, remote_sig)
755        chan.set_state(ChannelState.OPENING)
756        self.lnworker.add_new_channel(chan)
757        return chan, funding_tx
758
759    def create_channel_storage(self, channel_id, outpoint, local_config, remote_config, constraints):
760        chan_dict = {
761            "node_id": self.pubkey.hex(),
762            "channel_id": channel_id.hex(),
763            "short_channel_id": None,
764            "funding_outpoint": outpoint,
765            "remote_config": remote_config,
766            "local_config": local_config,
767            "constraints": constraints,
768            "remote_update": None,
769            "state": ChannelState.PREOPENING.name,
770            'onion_keys': {},
771            'data_loss_protect_remote_pcp': {},
772            "log": {},
773            "revocation_store": {},
774            "static_remotekey_enabled": self.is_static_remotekey(), # stored because it cannot be "downgraded", per BOLT2
775        }
776        return StoredDict(chan_dict, self.lnworker.db if self.lnworker else None, [])
777
778    async def on_open_channel(self, payload):
779        """Implements the channel acceptance flow.
780
781        <- open_channel message
782        -> accept_channel message
783        <- funding_created message
784        -> funding_signed message
785
786        Channel configurations are initialized in this method.
787        """
788        if self.lnworker.has_recoverable_channels():
789            # FIXME: we might want to keep the connection open
790            raise Exception('not accepting channels')
791        # <- open_channel
792        if payload['chain_hash'] != constants.net.rev_genesis_bytes():
793            raise Exception('wrong chain_hash')
794        funding_sat = payload['funding_satoshis']
795        push_msat = payload['push_msat']
796        feerate = payload['feerate_per_kw']  # note: we are not validating this
797        temp_chan_id = payload['temporary_channel_id']
798        local_config = self.make_local_config(funding_sat, push_msat, REMOTE)
799        if funding_sat > LN_MAX_FUNDING_SAT:
800            raise Exception(
801                f"MUST set funding_satoshis to less than 2^24 satoshi. "
802                f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}")
803        if push_msat > 1000 * funding_sat:
804            raise Exception(
805                f"MUST set push_msat to equal or less than 1000 * funding_satoshis: "
806                f"{push_msat} msat > {1000 * funding_sat} msat")
807        if funding_sat < lnutil.MIN_FUNDING_SAT:
808            raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}")
809
810        upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
811            payload, 'open')
812
813        remote_config = RemoteConfig(
814            payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
815            multisig_key=OnlyPubkeyKeypair(payload['funding_pubkey']),
816            htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
817            delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
818            revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
819            to_self_delay=payload['to_self_delay'],
820            dust_limit_sat=payload['dust_limit_satoshis'],
821            max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
822            max_accepted_htlcs=payload['max_accepted_htlcs'],
823            initial_msat=funding_sat * 1000 - push_msat,
824            reserve_sat=payload['channel_reserve_satoshis'],
825            htlc_minimum_msat=payload['htlc_minimum_msat'],
826            next_per_commitment_point=payload['first_per_commitment_point'],
827            current_per_commitment_point=None,
828            upfront_shutdown_script=upfront_shutdown_script,
829        )
830
831        remote_config.validate_params(funding_sat=funding_sat)
832        # The receiving node MUST fail the channel if:
833        #     the funder's amount for the initial commitment transaction is not
834        #     sufficient for full fee payment.
835        if remote_config.initial_msat < calc_fees_for_commitment_tx(
836                num_htlcs=0,
837                feerate=feerate,
838                is_local_initiator=False)[REMOTE]:
839            raise Exception(
840                "the funder's amount for the initial commitment transaction "
841                "is not sufficient for full fee payment")
842        # The receiving node MUST fail the channel if:
843        #     both to_local and to_remote amounts for the initial commitment transaction are
844        #     less than or equal to channel_reserve_satoshis (see BOLT 3).
845        if (local_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']
846                and remote_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']):
847            raise Exception(
848                "both to_local and to_remote amounts for the initial commitment "
849                "transaction are less than or equal to channel_reserve_satoshis")
850        # note: we ignore payload['channel_flags'],  which e.g. contains 'announce_channel'.
851        #       Notably if the remote sets 'announce_channel' to True, we will ignore that too,
852        #       but we will not play along with actually announcing the channel (so we keep it private).
853
854        # -> accept channel
855        # for the first commitment transaction
856        per_commitment_secret_first = get_per_commitment_secret_from_seed(
857            local_config.per_commitment_secret_seed,
858            RevocationStore.START_INDEX
859        )
860        per_commitment_point_first = secret_to_pubkey(
861            int.from_bytes(per_commitment_secret_first, 'big'))
862        min_depth = 3
863        self.send_message(
864            'accept_channel',
865            temporary_channel_id=temp_chan_id,
866            dust_limit_satoshis=local_config.dust_limit_sat,
867            max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
868            channel_reserve_satoshis=local_config.reserve_sat,
869            htlc_minimum_msat=local_config.htlc_minimum_msat,
870            minimum_depth=min_depth,
871            to_self_delay=local_config.to_self_delay,
872            max_accepted_htlcs=local_config.max_accepted_htlcs,
873            funding_pubkey=local_config.multisig_key.pubkey,
874            revocation_basepoint=local_config.revocation_basepoint.pubkey,
875            payment_basepoint=local_config.payment_basepoint.pubkey,
876            delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
877            htlc_basepoint=local_config.htlc_basepoint.pubkey,
878            first_per_commitment_point=per_commitment_point_first,
879            accept_channel_tlvs={
880                'upfront_shutdown_script':
881                    {'shutdown_scriptpubkey': local_config.upfront_shutdown_script}
882            }
883        )
884
885        # <- funding created
886        funding_created = await self.wait_for_message('funding_created', temp_chan_id)
887
888        # -> funding signed
889        funding_idx = funding_created['funding_output_index']
890        funding_txid = bh2u(funding_created['funding_txid'][::-1])
891        channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx)
892        constraints = ChannelConstraints(
893            capacity=funding_sat,
894            is_initiator=False,
895            funding_txn_minimum_depth=min_depth
896        )
897        outpoint = Outpoint(funding_txid, funding_idx)
898        chan_dict = self.create_channel_storage(
899            channel_id, outpoint, local_config, remote_config, constraints)
900        chan = Channel(
901            chan_dict,
902            sweep_address=self.lnworker.sweep_address,
903            lnworker=self.lnworker,
904            initial_feerate=feerate
905        )
906        chan.storage['init_timestamp'] = int(time.time())
907        if isinstance(self.transport, LNTransport):
908            chan.add_or_update_peer_addr(self.transport.peer_addr)
909        remote_sig = funding_created['signature']
910        chan.receive_new_commitment(remote_sig, [])
911        sig_64, _ = chan.sign_next_commitment()
912        self.send_message('funding_signed',
913            channel_id=channel_id,
914            signature=sig_64,
915        )
916        self.funding_signed_sent.add(chan.channel_id)
917        chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
918        chan.set_state(ChannelState.OPENING)
919        self.lnworker.add_new_channel(chan)
920
921    async def trigger_force_close(self, channel_id: bytes):
922        await self.initialized
923        latest_point = secret_to_pubkey(42) # we need a valid point (BOLT2)
924        self.send_message(
925            "channel_reestablish",
926            channel_id=channel_id,
927            next_commitment_number=0,
928            next_revocation_number=0,
929            your_last_per_commitment_secret=0,
930            my_current_per_commitment_point=latest_point)
931
932    async def reestablish_channel(self, chan: Channel):
933        await self.initialized
934        chan_id = chan.channel_id
935        if chan.should_request_force_close:
936            await self.trigger_force_close(chan_id)
937            chan.should_request_force_close = False
938            return
939        assert ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING
940        if chan.peer_state != PeerState.DISCONNECTED:
941            self.logger.info(f'reestablish_channel was called but channel {chan.get_id_for_log()} '
942                             f'already in peer_state {chan.peer_state!r}')
943            return
944        chan.peer_state = PeerState.REESTABLISHING
945        util.trigger_callback('channel', self.lnworker.wallet, chan)
946        # BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side"
947        chan.hm.discard_unsigned_remote_updates()
948        # ctns
949        oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
950        latest_local_ctn = chan.get_latest_ctn(LOCAL)
951        next_local_ctn = chan.get_next_ctn(LOCAL)
952        oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
953        latest_remote_ctn = chan.get_latest_ctn(REMOTE)
954        next_remote_ctn = chan.get_next_ctn(REMOTE)
955        assert self.features.supports(LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT)
956        # send message
957        if chan.is_static_remotekey_enabled():
958            latest_secret, latest_point = chan.get_secret_and_point(LOCAL, 0)
959        else:
960            latest_secret, latest_point = chan.get_secret_and_point(LOCAL, latest_local_ctn)
961        if oldest_unrevoked_remote_ctn == 0:
962            last_rev_secret = 0
963        else:
964            last_rev_index = oldest_unrevoked_remote_ctn - 1
965            last_rev_secret = chan.revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index)
966        self.send_message(
967            "channel_reestablish",
968            channel_id=chan_id,
969            next_commitment_number=next_local_ctn,
970            next_revocation_number=oldest_unrevoked_remote_ctn,
971            your_last_per_commitment_secret=last_rev_secret,
972            my_current_per_commitment_point=latest_point)
973        self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): sent channel_reestablish with '
974                         f'(next_local_ctn={next_local_ctn}, '
975                         f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})')
976        while True:
977            try:
978                msg = await self.wait_for_message('channel_reestablish', chan_id)
979                break
980            except asyncio.TimeoutError:
981                self.logger.info('waiting to receive channel_reestablish...')
982                continue
983            except Exception as e:
984                # do not kill connection, because we might have other channels with that peer
985                self.logger.info(f'channel_reestablish failed, {e}')
986                return
987        their_next_local_ctn = msg["next_commitment_number"]
988        their_oldest_unrevoked_remote_ctn = msg["next_revocation_number"]
989        their_local_pcp = msg.get("my_current_per_commitment_point")
990        their_claim_of_our_last_per_commitment_secret = msg.get("your_last_per_commitment_secret")
991        self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): received channel_reestablish with '
992                         f'(their_next_local_ctn={their_next_local_ctn}, '
993                         f'their_oldest_unrevoked_remote_ctn={their_oldest_unrevoked_remote_ctn})')
994        # sanity checks of received values
995        if their_next_local_ctn < 0:
996            raise RemoteMisbehaving(f"channel reestablish: their_next_local_ctn < 0")
997        if their_oldest_unrevoked_remote_ctn < 0:
998            raise RemoteMisbehaving(f"channel reestablish: their_oldest_unrevoked_remote_ctn < 0")
999        # Replay un-acked local updates (including commitment_signed) byte-for-byte.
1000        # If we have sent them a commitment signature that they "lost" (due to disconnect),
1001        # we need to make sure we replay the same local updates, as otherwise they could
1002        # end up with two (or more) signed valid commitment transactions at the same ctn.
1003        # Multiple valid ctxs at the same ctn is a major headache for pre-signing spending txns,
1004        # e.g. for watchtowers, hence we must ensure these ctxs coincide.
1005        # We replay the local updates even if they were not yet committed.
1006        unacked = chan.hm.get_unacked_local_updates()
1007        n_replayed_msgs = 0
1008        for ctn, messages in unacked.items():
1009            if ctn < their_next_local_ctn:
1010                # They claim to have received these messages and the corresponding
1011                # commitment_signed, hence we must not replay them.
1012                continue
1013            for raw_upd_msg in messages:
1014                self.transport.send_bytes(raw_upd_msg)
1015                n_replayed_msgs += 1
1016        self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replayed {n_replayed_msgs} unacked messages')
1017
1018        we_are_ahead = False
1019        they_are_ahead = False
1020        # compare remote ctns
1021        if next_remote_ctn != their_next_local_ctn:
1022            if their_next_local_ctn == latest_remote_ctn and chan.hm.is_revack_pending(REMOTE):
1023                # We replayed the local updates (see above), which should have contained a commitment_signed
1024                # (due to is_revack_pending being true), and this should have remedied this situation.
1025                pass
1026            else:
1027                self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
1028                                    f"expected remote ctn {next_remote_ctn}, got {their_next_local_ctn}")
1029                if their_next_local_ctn < next_remote_ctn:
1030                    we_are_ahead = True
1031                else:
1032                    they_are_ahead = True
1033        # compare local ctns
1034        if oldest_unrevoked_local_ctn != their_oldest_unrevoked_remote_ctn:
1035            if oldest_unrevoked_local_ctn - 1 == their_oldest_unrevoked_remote_ctn:
1036                # A node:
1037                #    if next_revocation_number is equal to the commitment number of the last revoke_and_ack
1038                #    the receiving node sent, AND the receiving node hasn't already received a closing_signed:
1039                #        MUST re-send the revoke_and_ack.
1040                last_secret, last_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn - 1)
1041                next_secret, next_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn + 1)
1042                self.send_message(
1043                    "revoke_and_ack",
1044                    channel_id=chan.channel_id,
1045                    per_commitment_secret=last_secret,
1046                    next_per_commitment_point=next_point)
1047            else:
1048                self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
1049                                    f"expected local ctn {oldest_unrevoked_local_ctn}, got {their_oldest_unrevoked_remote_ctn}")
1050                if their_oldest_unrevoked_remote_ctn < oldest_unrevoked_local_ctn:
1051                    we_are_ahead = True
1052                else:
1053                    they_are_ahead = True
1054        # option_data_loss_protect
1055        def are_datalossprotect_fields_valid() -> bool:
1056            if their_local_pcp is None or their_claim_of_our_last_per_commitment_secret is None:
1057                return False
1058            if their_oldest_unrevoked_remote_ctn > 0:
1059                our_pcs, __ = chan.get_secret_and_point(LOCAL, their_oldest_unrevoked_remote_ctn - 1)
1060            else:
1061                assert their_oldest_unrevoked_remote_ctn == 0
1062                our_pcs = bytes(32)
1063            if our_pcs != their_claim_of_our_last_per_commitment_secret:
1064                self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): "
1065                                  f"(DLP) local PCS mismatch: {bh2u(our_pcs)} != {bh2u(their_claim_of_our_last_per_commitment_secret)}")
1066                return False
1067            if chan.is_static_remotekey_enabled():
1068                return True
1069            try:
1070                __, our_remote_pcp = chan.get_secret_and_point(REMOTE, their_next_local_ctn - 1)
1071            except RemoteCtnTooFarInFuture:
1072                pass
1073            else:
1074                if our_remote_pcp != their_local_pcp:
1075                    self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): "
1076                                      f"(DLP) remote PCP mismatch: {bh2u(our_remote_pcp)} != {bh2u(their_local_pcp)}")
1077                    return False
1078            return True
1079
1080        if not are_datalossprotect_fields_valid():
1081            raise RemoteMisbehaving("channel_reestablish: data loss protect fields invalid")
1082
1083        if they_are_ahead:
1084            self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
1085                                f"remote is ahead of us! They should force-close. Remote PCP: {bh2u(their_local_pcp)}")
1086            # data_loss_protect_remote_pcp is used in lnsweep
1087            chan.set_data_loss_protect_remote_pcp(their_next_local_ctn - 1, their_local_pcp)
1088            self.lnworker.save_channel(chan)
1089            chan.peer_state = PeerState.BAD
1090            return
1091        elif we_are_ahead:
1092            self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): we are ahead of remote! trying to force-close.")
1093            await self.lnworker.try_force_closing(chan_id)
1094            return
1095
1096        chan.peer_state = PeerState.GOOD
1097        if chan.is_funded() and their_next_local_ctn == next_local_ctn == 1:
1098            self.send_funding_locked(chan)
1099        # checks done
1100        if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
1101            self.mark_open(chan)
1102        util.trigger_callback('channel', self.lnworker.wallet, chan)
1103        # if we have sent a previous shutdown, it must be retransmitted (Bolt2)
1104        if chan.get_state() == ChannelState.SHUTDOWN:
1105            await self.send_shutdown(chan)
1106
1107    def send_funding_locked(self, chan: Channel):
1108        channel_id = chan.channel_id
1109        per_commitment_secret_index = RevocationStore.START_INDEX - 1
1110        per_commitment_point_second = secret_to_pubkey(int.from_bytes(
1111            get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big'))
1112        # note: if funding_locked was not yet received, we might send it multiple times
1113        self.send_message("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second)
1114        if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
1115            self.mark_open(chan)
1116
1117    def on_funding_locked(self, chan: Channel, payload):
1118        self.logger.info(f"on_funding_locked. channel: {bh2u(chan.channel_id)}")
1119        if not chan.config[LOCAL].funding_locked_received:
1120            their_next_point = payload["next_per_commitment_point"]
1121            chan.config[REMOTE].next_per_commitment_point = their_next_point
1122            chan.config[LOCAL].funding_locked_received = True
1123            self.lnworker.save_channel(chan)
1124        if chan.is_funded():
1125            self.mark_open(chan)
1126
1127    def on_network_update(self, chan: Channel, funding_tx_depth: int):
1128        """
1129        Only called when the channel is OPEN.
1130
1131        Runs on the Network thread.
1132        """
1133        if not chan.config[LOCAL].was_announced and funding_tx_depth >= 6:
1134            # don't announce our channels
1135            # FIXME should this be a field in chan.local_state maybe?
1136            return
1137            chan.config[LOCAL].was_announced = True
1138            self.lnworker.save_channel(chan)
1139            coro = self.handle_announcements(chan)
1140            asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
1141
1142    @log_exceptions
1143    async def handle_announcements(self, chan: Channel):
1144        h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
1145        announcement_signatures_msg = await self.announcement_signatures[chan.channel_id].get()
1146        remote_node_sig = announcement_signatures_msg["node_signature"]
1147        remote_bitcoin_sig = announcement_signatures_msg["bitcoin_signature"]
1148        if not ecc.verify_signature(chan.config[REMOTE].multisig_key.pubkey, remote_bitcoin_sig, h):
1149            raise Exception("bitcoin_sig invalid in announcement_signatures")
1150        if not ecc.verify_signature(self.pubkey, remote_node_sig, h):
1151            raise Exception("node_sig invalid in announcement_signatures")
1152
1153        node_sigs = [remote_node_sig, local_node_sig]
1154        bitcoin_sigs = [remote_bitcoin_sig, local_bitcoin_sig]
1155        bitcoin_keys = [chan.config[REMOTE].multisig_key.pubkey, chan.config[LOCAL].multisig_key.pubkey]
1156
1157        if self.node_ids[0] > self.node_ids[1]:
1158            node_sigs.reverse()
1159            bitcoin_sigs.reverse()
1160            node_ids = list(reversed(self.node_ids))
1161            bitcoin_keys.reverse()
1162        else:
1163            node_ids = self.node_ids
1164
1165        self.send_message("channel_announcement",
1166            node_signatures_1=node_sigs[0],
1167            node_signatures_2=node_sigs[1],
1168            bitcoin_signature_1=bitcoin_sigs[0],
1169            bitcoin_signature_2=bitcoin_sigs[1],
1170            len=0,
1171            #features not set (defaults to zeros)
1172            chain_hash=constants.net.rev_genesis_bytes(),
1173            short_channel_id=chan.short_channel_id,
1174            node_id_1=node_ids[0],
1175            node_id_2=node_ids[1],
1176            bitcoin_key_1=bitcoin_keys[0],
1177            bitcoin_key_2=bitcoin_keys[1]
1178        )
1179
1180    def mark_open(self, chan: Channel):
1181        assert chan.is_funded()
1182        # only allow state transition from "FUNDED" to "OPEN"
1183        old_state = chan.get_state()
1184        if old_state == ChannelState.OPEN:
1185            return
1186        if old_state != ChannelState.FUNDED:
1187            self.logger.info(f"cannot mark open ({chan.get_id_for_log()}), current state: {repr(old_state)}")
1188            return
1189        assert chan.config[LOCAL].funding_locked_received
1190        chan.set_state(ChannelState.OPEN)
1191        util.trigger_callback('channel', self.lnworker.wallet, chan)
1192        # peer may have sent us a channel update for the incoming direction previously
1193        pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id)
1194        if pending_channel_update:
1195            chan.set_remote_update(pending_channel_update)
1196        self.logger.info(f"CHANNEL OPENING COMPLETED ({chan.get_id_for_log()})")
1197        forwarding_enabled = self.network.config.get('lightning_forward_payments', False)
1198        if forwarding_enabled:
1199            # send channel_update of outgoing edge to peer,
1200            # so that channel can be used to to receive payments
1201            self.logger.info(f"sending channel update for outgoing edge ({chan.get_id_for_log()})")
1202            chan_upd = chan.get_outgoing_gossip_channel_update()
1203            self.transport.send_bytes(chan_upd)
1204
1205    def send_announcement_signatures(self, chan: Channel):
1206        chan_ann = chan.construct_channel_announcement_without_sigs()
1207        preimage = chan_ann[256+2:]
1208        msg_hash = sha256d(preimage)
1209        bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).sign(msg_hash, sig_string_from_r_and_s)
1210        node_signature = ecc.ECPrivkey(self.privkey).sign(msg_hash, sig_string_from_r_and_s)
1211        self.send_message("announcement_signatures",
1212            channel_id=chan.channel_id,
1213            short_channel_id=chan.short_channel_id,
1214            node_signature=node_signature,
1215            bitcoin_signature=bitcoin_signature
1216        )
1217        return msg_hash, node_signature, bitcoin_signature
1218
1219    def on_update_fail_htlc(self, chan: Channel, payload):
1220        htlc_id = payload["id"]
1221        reason = payload["reason"]
1222        self.logger.info(f"on_update_fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
1223        chan.receive_fail_htlc(htlc_id, error_bytes=reason)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
1224        self.maybe_send_commitment(chan)
1225
1226    def maybe_send_commitment(self, chan: Channel):
1227        # REMOTE should revoke first before we can sign a new ctx
1228        if chan.hm.is_revack_pending(REMOTE):
1229            return
1230        # if there are no changes, we will not (and must not) send a new commitment
1231        if not chan.has_pending_changes(REMOTE):
1232            return
1233        self.logger.info(f'send_commitment. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(REMOTE)}.')
1234        sig_64, htlc_sigs = chan.sign_next_commitment()
1235        self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
1236
1237    def pay(self, *,
1238            route: 'LNPaymentRoute',
1239            chan: Channel,
1240            amount_msat: int,
1241            total_msat: int,
1242            payment_hash: bytes,
1243            min_final_cltv_expiry: int,
1244            payment_secret: bytes = None,
1245            trampoline_onion=None) -> UpdateAddHtlc:
1246
1247        assert amount_msat > 0, "amount_msat is not greater zero"
1248        assert len(route) > 0
1249        if not chan.can_send_update_add_htlc():
1250            raise PaymentFailure("Channel cannot send update_add_htlc")
1251        # add features learned during "init" for direct neighbour:
1252        route[0].node_features |= self.features
1253        local_height = self.network.get_local_height()
1254        final_cltv = local_height + min_final_cltv_expiry
1255        hops_data, amount_msat, cltv = calc_hops_data_for_payment(
1256            route,
1257            amount_msat,
1258            final_cltv,
1259            total_msat=total_msat,
1260            payment_secret=payment_secret)
1261        num_hops = len(hops_data)
1262        self.logger.info(f"lnpeer.pay len(route)={len(route)}")
1263        for i in range(len(route)):
1264            self.logger.info(f"  {i}: edge={route[i].short_channel_id} hop_data={hops_data[i]!r}")
1265        assert final_cltv <= cltv, (final_cltv, cltv)
1266        session_key = os.urandom(32) # session_key
1267        # if we are forwarding a trampoline payment, add trampoline onion
1268        if trampoline_onion:
1269            self.logger.info(f'adding trampoline onion to final payload')
1270            trampoline_payload = hops_data[num_hops-2].payload
1271            trampoline_payload["trampoline_onion_packet"] = {
1272                "version": trampoline_onion.version,
1273                "public_key": trampoline_onion.public_key,
1274                "hops_data": trampoline_onion.hops_data,
1275                "hmac": trampoline_onion.hmac
1276            }
1277        # create onion packet
1278        payment_path_pubkeys = [x.node_id for x in route]
1279        onion = new_onion_packet(payment_path_pubkeys, session_key, hops_data, associated_data=payment_hash) # must use another sessionkey
1280        self.logger.info(f"starting payment. len(route)={len(hops_data)}.")
1281        # create htlc
1282        if cltv > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
1283            raise PaymentFailure(f"htlc expiry too far into future. (in {cltv-local_height} blocks)")
1284        htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_expiry=cltv, timestamp=int(time.time()))
1285        htlc = chan.add_htlc(htlc)
1286        chan.set_onion_key(htlc.htlc_id, session_key) # should it be the outer onion secret?
1287        self.logger.info(f"starting payment. htlc: {htlc}")
1288        self.send_message(
1289            "update_add_htlc",
1290            channel_id=chan.channel_id,
1291            id=htlc.htlc_id,
1292            cltv_expiry=htlc.cltv_expiry,
1293            amount_msat=htlc.amount_msat,
1294            payment_hash=htlc.payment_hash,
1295            onion_routing_packet=onion.to_bytes())
1296        self.maybe_send_commitment(chan)
1297        return htlc
1298
1299    def send_revoke_and_ack(self, chan: Channel):
1300        self.logger.info(f'send_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(LOCAL)}')
1301        rev = chan.revoke_current_commitment()
1302        self.lnworker.save_channel(chan)
1303        self.send_message("revoke_and_ack",
1304            channel_id=chan.channel_id,
1305            per_commitment_secret=rev.per_commitment_secret,
1306            next_per_commitment_point=rev.next_per_commitment_point)
1307        self.maybe_send_commitment(chan)
1308
1309    def on_commitment_signed(self, chan: Channel, payload):
1310        if chan.peer_state == PeerState.BAD:
1311            return
1312        self.logger.info(f'on_commitment_signed. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(LOCAL)}.')
1313        # make sure there were changes to the ctx, otherwise the remote peer is misbehaving
1314        if not chan.has_pending_changes(LOCAL):
1315            # TODO if feerate changed A->B->A; so there were updates but the value is identical,
1316            #      then it might be legal to send a commitment_signature
1317            #      see https://github.com/lightningnetwork/lightning-rfc/pull/618
1318            raise RemoteMisbehaving('received commitment_signed without pending changes')
1319        # REMOTE should wait until we have revoked
1320        if chan.hm.is_revack_pending(LOCAL):
1321            raise RemoteMisbehaving('received commitment_signed before we revoked previous ctx')
1322        data = payload["htlc_signature"]
1323        htlc_sigs = list(chunks(data, 64))
1324        chan.receive_new_commitment(payload["signature"], htlc_sigs)
1325        self.send_revoke_and_ack(chan)
1326
1327    def on_update_fulfill_htlc(self, chan: Channel, payload):
1328        preimage = payload["payment_preimage"]
1329        payment_hash = sha256(preimage)
1330        htlc_id = payload["id"]
1331        self.logger.info(f"on_update_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
1332        chan.receive_htlc_settle(preimage, htlc_id)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
1333        self.lnworker.save_preimage(payment_hash, preimage)
1334        self.maybe_send_commitment(chan)
1335
1336    def on_update_fail_malformed_htlc(self, chan: Channel, payload):
1337        htlc_id = payload["id"]
1338        failure_code = payload["failure_code"]
1339        self.logger.info(f"on_update_fail_malformed_htlc. chan {chan.get_id_for_log()}. "
1340                         f"htlc_id {htlc_id}. failure_code={failure_code}")
1341        if failure_code & OnionFailureCodeMetaFlag.BADONION == 0:
1342            asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id))
1343            raise RemoteMisbehaving(f"received update_fail_malformed_htlc with unexpected failure code: {failure_code}")
1344        reason = OnionRoutingFailure(code=failure_code, data=payload["sha256_of_onion"])
1345        chan.receive_fail_htlc(htlc_id, error_bytes=None, reason=reason)
1346        self.maybe_send_commitment(chan)
1347
1348    def on_update_add_htlc(self, chan: Channel, payload):
1349        payment_hash = payload["payment_hash"]
1350        htlc_id = payload["id"]
1351        cltv_expiry = payload["cltv_expiry"]
1352        amount_msat_htlc = payload["amount_msat"]
1353        onion_packet = payload["onion_routing_packet"]
1354        htlc = UpdateAddHtlc(
1355            amount_msat=amount_msat_htlc,
1356            payment_hash=payment_hash,
1357            cltv_expiry=cltv_expiry,
1358            timestamp=int(time.time()),
1359            htlc_id=htlc_id)
1360        self.logger.info(f"on_update_add_htlc. chan {chan.short_channel_id}. htlc={str(htlc)}")
1361        if chan.get_state() != ChannelState.OPEN:
1362            raise RemoteMisbehaving(f"received update_add_htlc while chan.get_state() != OPEN. state was {chan.get_state()!r}")
1363        if cltv_expiry > bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX:
1364            asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id))
1365            raise RemoteMisbehaving(f"received update_add_htlc with cltv_expiry > BLOCKHEIGHT_MAX. value was {cltv_expiry}")
1366        # add htlc
1367        chan.receive_htlc(htlc, onion_packet)
1368        util.trigger_callback('htlc_added', chan, htlc, RECEIVED)
1369
1370    def maybe_forward_htlc(
1371            self, *,
1372            htlc: UpdateAddHtlc,
1373            processed_onion: ProcessedOnionPacket) -> Tuple[bytes, int]:
1374
1375        # Forward HTLC
1376        # FIXME: there are critical safety checks MISSING here
1377        #        - for example; atm we forward first and then persist "forwarding_info",
1378        #          so if we segfault in-between and restart, we might forward an HTLC twice...
1379        #          (same for trampoline forwarding)
1380        forwarding_enabled = self.network.config.get('lightning_forward_payments', False)
1381        if not forwarding_enabled:
1382            self.logger.info(f"forwarding is disabled. failing htlc.")
1383            raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'')
1384        chain = self.network.blockchain()
1385        if chain.is_tip_stale():
1386            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
1387        try:
1388            next_chan_scid = processed_onion.hop_data.payload["short_channel_id"]["short_channel_id"]
1389        except:
1390            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1391        next_chan = self.lnworker.get_channel_by_short_id(next_chan_scid)
1392        local_height = chain.height()
1393        if next_chan is None:
1394            self.logger.info(f"cannot forward htlc. cannot find next_chan {next_chan_scid}")
1395            raise OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
1396        outgoing_chan_upd = next_chan.get_outgoing_gossip_channel_update()[2:]
1397        outgoing_chan_upd_len = len(outgoing_chan_upd).to_bytes(2, byteorder="big")
1398        outgoing_chan_upd_message = outgoing_chan_upd_len + outgoing_chan_upd
1399        if not next_chan.can_send_update_add_htlc():
1400            self.logger.info(f"cannot forward htlc. next_chan {next_chan_scid} cannot send ctx updates. "
1401                             f"chan state {next_chan.get_state()!r}, peer state: {next_chan.peer_state!r}")
1402            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message)
1403        try:
1404            next_amount_msat_htlc = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
1405        except:
1406            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1407        if not next_chan.can_pay(next_amount_msat_htlc):
1408            self.logger.info(f"cannot forward htlc due to transient errors (likely due to insufficient funds)")
1409            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message)
1410        try:
1411            next_cltv_expiry = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
1412        except:
1413            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1414        if htlc.cltv_expiry - next_cltv_expiry < next_chan.forwarding_cltv_expiry_delta:
1415            data = htlc.cltv_expiry.to_bytes(4, byteorder="big") + outgoing_chan_upd_message
1416            raise OnionRoutingFailure(code=OnionFailureCode.INCORRECT_CLTV_EXPIRY, data=data)
1417        if htlc.cltv_expiry - lnutil.MIN_FINAL_CLTV_EXPIRY_ACCEPTED <= local_height \
1418                or next_cltv_expiry <= local_height:
1419            raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_SOON, data=outgoing_chan_upd_message)
1420        if max(htlc.cltv_expiry, next_cltv_expiry) > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
1421            raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_FAR, data=b'')
1422        forwarding_fees = fee_for_edge_msat(
1423            forwarded_amount_msat=next_amount_msat_htlc,
1424            fee_base_msat=next_chan.forwarding_fee_base_msat,
1425            fee_proportional_millionths=next_chan.forwarding_fee_proportional_millionths)
1426        if htlc.amount_msat - next_amount_msat_htlc < forwarding_fees:
1427            data = next_amount_msat_htlc.to_bytes(8, byteorder="big") + outgoing_chan_upd_message
1428            raise OnionRoutingFailure(code=OnionFailureCode.FEE_INSUFFICIENT, data=data)
1429        self.logger.info(f'forwarding htlc to {next_chan.node_id}')
1430        next_htlc = UpdateAddHtlc(
1431            amount_msat=next_amount_msat_htlc,
1432            payment_hash=htlc.payment_hash,
1433            cltv_expiry=next_cltv_expiry,
1434            timestamp=int(time.time()))
1435        next_htlc = next_chan.add_htlc(next_htlc)
1436        next_peer = self.lnworker.peers[next_chan.node_id]
1437        try:
1438            next_peer.send_message(
1439                "update_add_htlc",
1440                channel_id=next_chan.channel_id,
1441                id=next_htlc.htlc_id,
1442                cltv_expiry=next_cltv_expiry,
1443                amount_msat=next_amount_msat_htlc,
1444                payment_hash=next_htlc.payment_hash,
1445                onion_routing_packet=processed_onion.next_packet.to_bytes()
1446            )
1447        except BaseException as e:
1448            self.logger.info(f"failed to forward htlc: error sending message. {e}")
1449            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message)
1450        return next_chan_scid, next_htlc.htlc_id
1451
1452    def maybe_forward_trampoline(
1453            self, *,
1454            chan: Channel,
1455            htlc: UpdateAddHtlc,
1456            trampoline_onion: ProcessedOnionPacket):
1457
1458        forwarding_enabled = self.network.config.get('lightning_forward_payments', False)
1459        forwarding_trampoline_enabled = self.network.config.get('lightning_forward_trampoline_payments', False)
1460        if not (forwarding_enabled and forwarding_trampoline_enabled):
1461            self.logger.info(f"trampoline forwarding is disabled. failing htlc.")
1462            raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'')
1463
1464        payload = trampoline_onion.hop_data.payload
1465        payment_hash = htlc.payment_hash
1466        payment_secret = os.urandom(32)
1467        try:
1468            outgoing_node_id = payload["outgoing_node_id"]["outgoing_node_id"]
1469            amt_to_forward = payload["amt_to_forward"]["amt_to_forward"]
1470            cltv_from_onion = payload["outgoing_cltv_value"]["outgoing_cltv_value"]
1471            if "invoice_features" in payload:
1472                self.logger.info('forward_trampoline: legacy')
1473                next_trampoline_onion = None
1474                invoice_features = payload["invoice_features"]["invoice_features"]
1475                invoice_routing_info = payload["invoice_routing_info"]["invoice_routing_info"]
1476                # TODO use invoice_routing_info
1477            else:
1478                self.logger.info('forward_trampoline: end-to-end')
1479                invoice_features = LnFeatures.BASIC_MPP_OPT
1480                next_trampoline_onion = trampoline_onion.next_packet
1481        except Exception as e:
1482            self.logger.exception('')
1483            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1484
1485        # these are the fee/cltv paid by the sender
1486        # pay_to_node will raise if they are not sufficient
1487        trampoline_cltv_delta = htlc.cltv_expiry - cltv_from_onion
1488        trampoline_fee = htlc.amount_msat - amt_to_forward
1489
1490        @log_exceptions
1491        async def forward_trampoline_payment():
1492            try:
1493                await self.lnworker.pay_to_node(
1494                    node_pubkey=outgoing_node_id,
1495                    payment_hash=payment_hash,
1496                    payment_secret=payment_secret,
1497                    amount_to_pay=amt_to_forward,
1498                    min_cltv_expiry=cltv_from_onion,
1499                    r_tags=[],
1500                    invoice_features=invoice_features,
1501                    fwd_trampoline_onion=next_trampoline_onion,
1502                    fwd_trampoline_fee=trampoline_fee,
1503                    fwd_trampoline_cltv_delta=trampoline_cltv_delta,
1504                    attempts=1)
1505            except OnionRoutingFailure as e:
1506                # FIXME: cannot use payment_hash as key
1507                self.lnworker.trampoline_forwarding_failures[payment_hash] = e
1508            except PaymentFailure as e:
1509                # FIXME: adapt the error code
1510                error_reason = OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
1511                self.lnworker.trampoline_forwarding_failures[payment_hash] = error_reason
1512
1513        asyncio.ensure_future(forward_trampoline_payment())
1514
1515    def maybe_fulfill_htlc(
1516            self, *,
1517            chan: Channel,
1518            htlc: UpdateAddHtlc,
1519            processed_onion: ProcessedOnionPacket,
1520            is_trampoline: bool = False) -> Tuple[Optional[bytes], Optional[OnionPacket]]:
1521
1522        """As a final recipient of an HTLC, decide if we should fulfill it.
1523        Return (preimage, trampoline_onion_packet) with at most a single element not None
1524        """
1525        def log_fail_reason(reason: str):
1526            self.logger.info(f"maybe_fulfill_htlc. will FAIL HTLC: chan {chan.short_channel_id}. "
1527                             f"{reason}. htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}")
1528
1529        try:
1530            amt_to_forward = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
1531        except:
1532            log_fail_reason(f"'amt_to_forward' missing from onion")
1533            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1534
1535        # Check that our blockchain tip is sufficiently recent so that we have an approx idea of the height.
1536        # We should not release the preimage for an HTLC that its sender could already time out as
1537        # then they might try to force-close and it becomes a race.
1538        chain = self.network.blockchain()
1539        if chain.is_tip_stale():
1540            log_fail_reason(f"our chain tip is stale")
1541            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
1542        local_height = chain.height()
1543        exc_incorrect_or_unknown_pd = OnionRoutingFailure(
1544            code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS,
1545            data=amt_to_forward.to_bytes(8, byteorder="big") + local_height.to_bytes(4, byteorder="big"))
1546        if local_height + MIN_FINAL_CLTV_EXPIRY_ACCEPTED > htlc.cltv_expiry:
1547            log_fail_reason(f"htlc.cltv_expiry is unreasonably close")
1548            raise exc_incorrect_or_unknown_pd
1549        try:
1550            cltv_from_onion = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
1551        except:
1552            log_fail_reason(f"'outgoing_cltv_value' missing from onion")
1553            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1554
1555        if not is_trampoline:
1556            if cltv_from_onion != htlc.cltv_expiry:
1557                log_fail_reason(f"cltv_from_onion != htlc.cltv_expiry")
1558                raise OnionRoutingFailure(
1559                    code=OnionFailureCode.FINAL_INCORRECT_CLTV_EXPIRY,
1560                    data=htlc.cltv_expiry.to_bytes(4, byteorder="big"))
1561        try:
1562            total_msat = processed_onion.hop_data.payload["payment_data"]["total_msat"]
1563        except:
1564            total_msat = amt_to_forward # fall back to "amt_to_forward"
1565
1566        if not is_trampoline and amt_to_forward != htlc.amount_msat:
1567            log_fail_reason(f"amt_to_forward != htlc.amount_msat")
1568            raise OnionRoutingFailure(
1569                code=OnionFailureCode.FINAL_INCORRECT_HTLC_AMOUNT,
1570                data=htlc.amount_msat.to_bytes(8, byteorder="big"))
1571
1572        try:
1573            payment_secret_from_onion = processed_onion.hop_data.payload["payment_data"]["payment_secret"]
1574        except:
1575            if total_msat > amt_to_forward:
1576                # payment_secret is required for MPP
1577                log_fail_reason(f"'payment_secret' missing from onion")
1578                raise exc_incorrect_or_unknown_pd
1579            # TODO fail here if invoice has set PAYMENT_SECRET_REQ
1580            payment_secret_from_onion = None
1581
1582        if total_msat > amt_to_forward:
1583            mpp_status = self.lnworker.check_received_mpp_htlc(payment_secret_from_onion, chan.short_channel_id, htlc, total_msat)
1584            if mpp_status is None:
1585                return None, None
1586            if mpp_status is False:
1587                log_fail_reason(f"MPP_TIMEOUT")
1588                raise OnionRoutingFailure(code=OnionFailureCode.MPP_TIMEOUT, data=b'')
1589            assert mpp_status is True
1590
1591        # if there is a trampoline_onion, maybe_fulfill_htlc will be called again
1592        if processed_onion.trampoline_onion_packet:
1593            # TODO: we should check that all trampoline_onions are the same
1594            return None, processed_onion.trampoline_onion_packet
1595
1596        # TODO don't accept payments twice for same invoice
1597        # TODO check invoice expiry
1598        info = self.lnworker.get_payment_info(htlc.payment_hash)
1599        if info is None:
1600            log_fail_reason(f"no payment_info found for RHASH {htlc.payment_hash.hex()}")
1601            raise exc_incorrect_or_unknown_pd
1602        preimage = self.lnworker.get_preimage(htlc.payment_hash)
1603        if payment_secret_from_onion:
1604            if payment_secret_from_onion != derive_payment_secret_from_payment_preimage(preimage):
1605                log_fail_reason(f'incorrect payment secret {payment_secret_from_onion.hex()} != {derive_payment_secret_from_payment_preimage(preimage).hex()}')
1606                raise exc_incorrect_or_unknown_pd
1607        invoice_msat = info.amount_msat
1608        if not (invoice_msat is None or invoice_msat <= total_msat <= 2 * invoice_msat):
1609            log_fail_reason(f"total_msat={total_msat} too different from invoice_msat={invoice_msat}")
1610            raise exc_incorrect_or_unknown_pd
1611        self.logger.info(f"maybe_fulfill_htlc. will FULFILL HTLC: chan {chan.short_channel_id}. htlc={str(htlc)}")
1612        self.lnworker.set_request_status(htlc.payment_hash, PR_PAID)
1613        return preimage, None
1614
1615    def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
1616        self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
1617        assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
1618        assert chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id)
1619        self.received_htlcs_pending_removal.add((chan, htlc_id))
1620        chan.settle_htlc(preimage, htlc_id)
1621        self.send_message(
1622            "update_fulfill_htlc",
1623            channel_id=chan.channel_id,
1624            id=htlc_id,
1625            payment_preimage=preimage)
1626
1627    def fail_htlc(self, *, chan: Channel, htlc_id: int, error_bytes: bytes):
1628        self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
1629        assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
1630        self.received_htlcs_pending_removal.add((chan, htlc_id))
1631        chan.fail_htlc(htlc_id)
1632        self.send_message(
1633            "update_fail_htlc",
1634            channel_id=chan.channel_id,
1635            id=htlc_id,
1636            len=len(error_bytes),
1637            reason=error_bytes)
1638
1639    def fail_malformed_htlc(self, *, chan: Channel, htlc_id: int, reason: OnionRoutingFailure):
1640        self.logger.info(f"fail_malformed_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
1641        assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
1642        if not (reason.code & OnionFailureCodeMetaFlag.BADONION and len(reason.data) == 32):
1643            raise Exception(f"unexpected reason when sending 'update_fail_malformed_htlc': {reason!r}")
1644        self.received_htlcs_pending_removal.add((chan, htlc_id))
1645        chan.fail_htlc(htlc_id)
1646        self.send_message(
1647            "update_fail_malformed_htlc",
1648            channel_id=chan.channel_id,
1649            id=htlc_id,
1650            sha256_of_onion=reason.data,
1651            failure_code=reason.code)
1652
1653    def on_revoke_and_ack(self, chan: Channel, payload):
1654        if chan.peer_state == PeerState.BAD:
1655            return
1656        self.logger.info(f'on_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(REMOTE)}')
1657        rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"])
1658        chan.receive_revocation(rev)
1659        self.lnworker.save_channel(chan)
1660        self.maybe_send_commitment(chan)
1661
1662    def on_update_fee(self, chan: Channel, payload):
1663        feerate = payload["feerate_per_kw"]
1664        chan.update_fee(feerate, False)
1665
1666    async def maybe_update_fee(self, chan: Channel):
1667        """
1668        called when our fee estimates change
1669        """
1670        if not chan.can_send_ctx_updates():
1671            return
1672        feerate_per_kw = self.lnworker.current_feerate_per_kw()
1673        if not chan.constraints.is_initiator:
1674            if constants.net is not constants.BitcoinRegtest:
1675                chan_feerate = chan.get_latest_feerate(LOCAL)
1676                ratio = chan_feerate / feerate_per_kw
1677                if ratio < 0.5:
1678                    # Note that we trust the Electrum server about fee rates
1679                    # Thus, automated force-closing might not be a good idea
1680                    # Maybe we should display something in the GUI instead
1681                    self.logger.warning(
1682                        f"({chan.get_id_for_log()}) feerate is {chan_feerate} sat/kw, "
1683                        f"current recommended feerate is {feerate_per_kw} sat/kw, consider force closing!")
1684            return
1685        chan_fee = chan.get_next_feerate(REMOTE)
1686        if feerate_per_kw < chan_fee / 2:
1687            self.logger.info("FEES HAVE FALLEN")
1688        elif feerate_per_kw > chan_fee * 2:
1689            self.logger.info("FEES HAVE RISEN")
1690        elif chan.get_oldest_unrevoked_ctn(REMOTE) == 0:
1691            # workaround eclair issue https://github.com/ACINQ/eclair/issues/1730
1692            self.logger.info("updating fee to bump remote ctn")
1693            if feerate_per_kw == chan_fee:
1694                feerate_per_kw += 1
1695        else:
1696            return
1697        self.logger.info(f"(chan: {chan.get_id_for_log()}) current pending feerate {chan_fee}. "
1698                         f"new feerate {feerate_per_kw}")
1699        chan.update_fee(feerate_per_kw, True)
1700        self.send_message(
1701            "update_fee",
1702            channel_id=chan.channel_id,
1703            feerate_per_kw=feerate_per_kw)
1704        self.maybe_send_commitment(chan)
1705
1706    @log_exceptions
1707    async def close_channel(self, chan_id: bytes):
1708        chan = self.channels[chan_id]
1709        self.shutdown_received[chan_id] = asyncio.Future()
1710        await self.send_shutdown(chan)
1711        payload = await self.shutdown_received[chan_id]
1712        try:
1713            txid = await self._shutdown(chan, payload, is_local=True)
1714            self.logger.info(f'({chan.get_id_for_log()}) Channel closed {txid}')
1715        except asyncio.TimeoutError:
1716            txid = chan.unconfirmed_closing_txid
1717            self.logger.info(f'({chan.get_id_for_log()}) did not send closing_signed, {txid}')
1718            if txid is None:
1719                raise Exception('The remote peer did not send their final signature. The channel may not have been be closed')
1720        return txid
1721
1722    async def on_shutdown(self, chan: Channel, payload):
1723        their_scriptpubkey = payload['scriptpubkey']
1724        their_upfront_scriptpubkey = chan.config[REMOTE].upfront_shutdown_script
1725        # BOLT-02 check if they use the upfront shutdown script they advertized
1726        if their_upfront_scriptpubkey:
1727            if not (their_scriptpubkey == their_upfront_scriptpubkey):
1728                raise UpfrontShutdownScriptViolation("remote didn't use upfront shutdown script it commited to in channel opening")
1729        # BOLT-02 restrict the scriptpubkey to some templates:
1730        if not (match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_WITNESS_V0)
1731                or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2SH)
1732                or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2PKH)):
1733            raise Exception(f'scriptpubkey in received shutdown message does not conform to any template: {their_scriptpubkey.hex()}')
1734        chan_id = chan.channel_id
1735        if chan_id in self.shutdown_received:
1736            self.shutdown_received[chan_id].set_result(payload)
1737        else:
1738            chan = self.channels[chan_id]
1739            await self.send_shutdown(chan)
1740            txid = await self._shutdown(chan, payload, is_local=False)
1741            self.logger.info(f'({chan.get_id_for_log()}) Channel closed by remote peer {txid}')
1742
1743    def can_send_shutdown(self, chan: Channel):
1744        if chan.get_state() >= ChannelState.OPENING:
1745            return True
1746        if chan.constraints.is_initiator and chan.channel_id in self.funding_created_sent:
1747            return True
1748        if not chan.constraints.is_initiator and chan.channel_id in self.funding_signed_sent:
1749            return True
1750        return False
1751
1752    async def send_shutdown(self, chan: Channel):
1753        if not self.can_send_shutdown(chan):
1754            raise Exception('cannot send shutdown')
1755        if chan.config[LOCAL].upfront_shutdown_script:
1756            scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
1757        else:
1758            scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
1759        assert scriptpubkey
1760        # wait until no more pending updates (bolt2)
1761        chan.set_can_send_ctx_updates(False)
1762        while chan.has_pending_changes(REMOTE):
1763            await asyncio.sleep(0.1)
1764        self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey)
1765        chan.set_state(ChannelState.SHUTDOWN)
1766        # can fullfill or fail htlcs. cannot add htlcs, because state != OPEN
1767        chan.set_can_send_ctx_updates(True)
1768
1769    @log_exceptions
1770    async def _shutdown(self, chan: Channel, payload, *, is_local: bool):
1771        # wait until no HTLCs remain in either commitment transaction
1772        while len(chan.hm.htlcs(LOCAL)) + len(chan.hm.htlcs(REMOTE)) > 0:
1773            self.logger.info(f'(chan: {chan.short_channel_id}) waiting for htlcs to settle...')
1774            await asyncio.sleep(1)
1775        # if no HTLCs remain, we must not send updates
1776        chan.set_can_send_ctx_updates(False)
1777        their_scriptpubkey = payload['scriptpubkey']
1778        if chan.config[LOCAL].upfront_shutdown_script:
1779            our_scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
1780        else:
1781            our_scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
1782        assert our_scriptpubkey
1783        # estimate fee of closing tx
1784        our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=0)
1785        fee_rate = self.network.config.fee_per_kb()
1786        our_fee = fee_rate * closing_tx.estimated_size() // 1000
1787        # BOLT2: The sending node MUST set fee less than or equal to the base fee of the final ctx
1788        max_fee = chan.get_latest_fee(LOCAL if is_local else REMOTE)
1789        our_fee = min(our_fee, max_fee)
1790        drop_remote = False
1791        def send_closing_signed():
1792            our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=our_fee, drop_remote=drop_remote)
1793            self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=our_fee, signature=our_sig)
1794        def verify_signature(tx, sig):
1795            their_pubkey = chan.config[REMOTE].multisig_key.pubkey
1796            preimage_hex = tx.serialize_preimage(0)
1797            pre_hash = sha256d(bfh(preimage_hex))
1798            return ecc.verify_signature(their_pubkey, sig, pre_hash)
1799        # the funder sends the first 'closing_signed' message
1800        if chan.constraints.is_initiator:
1801            send_closing_signed()
1802        # negotiate fee
1803        while True:
1804            # FIXME: the remote SHOULD send closing_signed, but some don't.
1805            cs_payload = await self.wait_for_message('closing_signed', chan.channel_id)
1806            their_fee = cs_payload['fee_satoshis']
1807            if their_fee > max_fee:
1808                raise Exception(f'the proposed fee exceeds the base fee of the latest commitment transaction {is_local, their_fee, max_fee}')
1809            their_sig = cs_payload['signature']
1810            # verify their sig: they might have dropped their output
1811            our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=False)
1812            if verify_signature(closing_tx, their_sig):
1813                drop_remote = False
1814            else:
1815                our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=True)
1816                if verify_signature(closing_tx, their_sig):
1817                    drop_remote = True
1818                else:
1819                    raise Exception('failed to verify their signature')
1820            # Agree if difference is lower or equal to one (see below)
1821            if abs(our_fee - their_fee) < 2:
1822                our_fee = their_fee
1823                break
1824            # this will be "strictly between" (as in BOLT2) previous values because of the above
1825            our_fee = (our_fee + their_fee) // 2
1826            # another round
1827            send_closing_signed()
1828        # the non-funder replies
1829        if not chan.constraints.is_initiator:
1830            send_closing_signed()
1831        # add signatures
1832        closing_tx.add_signature_to_txin(
1833            txin_idx=0,
1834            signing_pubkey=chan.config[LOCAL].multisig_key.pubkey.hex(),
1835            sig=bh2u(der_sig_from_sig_string(our_sig) + b'\x01'))
1836        closing_tx.add_signature_to_txin(
1837            txin_idx=0,
1838            signing_pubkey=chan.config[REMOTE].multisig_key.pubkey.hex(),
1839            sig=bh2u(der_sig_from_sig_string(their_sig) + b'\x01'))
1840        # save local transaction and set state
1841        try:
1842            self.lnworker.wallet.add_transaction(closing_tx)
1843        except UnrelatedTransactionException:
1844            pass  # this can happen if (~all the balance goes to REMOTE)
1845        chan.set_state(ChannelState.CLOSING)
1846        # broadcast
1847        await self.network.try_broadcasting(closing_tx, 'closing')
1848        return closing_tx.txid()
1849
1850    async def htlc_switch(self):
1851        await self.initialized
1852        while True:
1853            self._htlc_switch_iterdone_event.set()
1854            self._htlc_switch_iterdone_event.clear()
1855            await asyncio.sleep(0.1)  # TODO maybe make this partly event-driven
1856            self._htlc_switch_iterstart_event.set()
1857            self._htlc_switch_iterstart_event.clear()
1858            self.ping_if_required()
1859            self._maybe_cleanup_received_htlcs_pending_removal()
1860            for chan_id, chan in self.channels.items():
1861                if not chan.can_send_ctx_updates():
1862                    continue
1863                self.maybe_send_commitment(chan)
1864                done = set()
1865                unfulfilled = chan.hm.log.get('unfulfilled_htlcs', {})
1866                for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarding_info) in unfulfilled.items():
1867                    if not chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
1868                        continue
1869                    htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id)
1870                    error_reason = None  # type: Optional[OnionRoutingFailure]
1871                    error_bytes = None  # type: Optional[bytes]
1872                    preimage = None
1873                    fw_info = None
1874                    onion_packet_bytes = bytes.fromhex(onion_packet_hex)
1875                    onion_packet = None
1876                    try:
1877                        onion_packet = OnionPacket.from_bytes(onion_packet_bytes)
1878                    except OnionRoutingFailure as e:
1879                        error_reason = e
1880                    else:
1881                        try:
1882                            preimage, fw_info, error_bytes = self.process_unfulfilled_htlc(
1883                                chan=chan,
1884                                htlc=htlc,
1885                                forwarding_info=forwarding_info,
1886                                onion_packet_bytes=onion_packet_bytes,
1887                                onion_packet=onion_packet)
1888                        except OnionRoutingFailure as e:
1889                            error_bytes = construct_onion_error(e, onion_packet, our_onion_private_key=self.privkey)
1890                    if fw_info:
1891                        unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, fw_info
1892                    elif preimage or error_reason or error_bytes:
1893                        if preimage:
1894                            if not self.lnworker.enable_htlc_settle:
1895                                continue
1896                            self.fulfill_htlc(chan, htlc.htlc_id, preimage)
1897                        elif error_bytes:
1898                            self.fail_htlc(
1899                                chan=chan,
1900                                htlc_id=htlc.htlc_id,
1901                                error_bytes=error_bytes)
1902                        else:
1903                            self.fail_malformed_htlc(
1904                                chan=chan,
1905                                htlc_id=htlc.htlc_id,
1906                                reason=error_reason)
1907                        done.add(htlc_id)
1908                # cleanup
1909                for htlc_id in done:
1910                    unfulfilled.pop(htlc_id)
1911
1912    def _maybe_cleanup_received_htlcs_pending_removal(self) -> None:
1913        done = set()
1914        for chan, htlc_id in self.received_htlcs_pending_removal:
1915            if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
1916                done.add((chan, htlc_id))
1917        if done:
1918            for key in done:
1919                self.received_htlcs_pending_removal.remove(key)
1920            self.received_htlc_removed_event.set()
1921            self.received_htlc_removed_event.clear()
1922
1923    async def wait_one_htlc_switch_iteration(self) -> None:
1924        """Waits until the HTLC switch does a full iteration or the peer disconnects,
1925        whichever happens first.
1926        """
1927        async def htlc_switch_iteration():
1928            await self._htlc_switch_iterstart_event.wait()
1929            await self._htlc_switch_iterdone_event.wait()
1930
1931        async with TaskGroup(wait=any) as group:
1932            await group.spawn(htlc_switch_iteration())
1933            await group.spawn(self.got_disconnected.wait())
1934
1935    def process_unfulfilled_htlc(
1936            self, *,
1937            chan: Channel,
1938            htlc: UpdateAddHtlc,
1939            forwarding_info: Tuple[str, int],
1940            onion_packet_bytes: bytes,
1941            onion_packet: OnionPacket) -> Tuple[Optional[bytes], Union[bool, None, Tuple[str, int]], Optional[bytes]]:
1942        """
1943        return (preimage, fw_info, error_bytes) with at most a single element that is not None
1944        raise an OnionRoutingFailure if we need to fail the htlc
1945        """
1946        payment_hash = htlc.payment_hash
1947        processed_onion = self.process_onion_packet(
1948            onion_packet,
1949            payment_hash=payment_hash,
1950            onion_packet_bytes=onion_packet_bytes)
1951        if processed_onion.are_we_final:
1952            # either we are final recipient; or if trampoline, see cases below
1953            preimage, trampoline_onion_packet = self.maybe_fulfill_htlc(
1954                chan=chan,
1955                htlc=htlc,
1956                processed_onion=processed_onion)
1957            if trampoline_onion_packet:
1958                # trampoline- recipient or forwarding
1959                if not forwarding_info:
1960                    trampoline_onion = self.process_onion_packet(
1961                        trampoline_onion_packet,
1962                        payment_hash=htlc.payment_hash,
1963                        onion_packet_bytes=onion_packet_bytes,
1964                        is_trampoline=True)
1965                    if trampoline_onion.are_we_final:
1966                        # trampoline- we are final recipient of HTLC
1967                        preimage, _ = self.maybe_fulfill_htlc(
1968                            chan=chan,
1969                            htlc=htlc,
1970                            processed_onion=trampoline_onion,
1971                            is_trampoline=True)
1972                    else:
1973                        # trampoline- HTLC we are supposed to forward, but haven't forwarded yet
1974                        if not self.lnworker.enable_htlc_forwarding:
1975                            return None, None, None
1976                        self.maybe_forward_trampoline(
1977                            chan=chan,
1978                            htlc=htlc,
1979                            trampoline_onion=trampoline_onion)
1980                        # return True so that this code gets executed only once
1981                        return None, True, None
1982                else:
1983                    # trampoline- HTLC we are supposed to forward, and have already forwarded
1984                    preimage = self.lnworker.get_preimage(payment_hash)
1985                    error_reason = self.lnworker.trampoline_forwarding_failures.pop(payment_hash, None)
1986                    if error_reason:
1987                        self.logger.info(f'trampoline forwarding failure: {error_reason.code_name()}')
1988                        raise error_reason
1989
1990        elif not forwarding_info:
1991            # HTLC we are supposed to forward, but haven't forwarded yet
1992            if not self.lnworker.enable_htlc_forwarding:
1993                return None, None, None
1994            next_chan_id, next_htlc_id = self.maybe_forward_htlc(
1995                htlc=htlc,
1996                processed_onion=processed_onion)
1997            fw_info = (next_chan_id.hex(), next_htlc_id)
1998            return None, fw_info, None
1999        else:
2000            # HTLC we are supposed to forward, and have already forwarded
2001            preimage = self.lnworker.get_preimage(payment_hash)
2002            next_chan_id_hex, htlc_id = forwarding_info
2003            next_chan = self.lnworker.get_channel_by_short_id(bytes.fromhex(next_chan_id_hex))
2004            if next_chan:
2005                error_bytes, error_reason = next_chan.pop_fail_htlc_reason(htlc_id)
2006                if error_bytes:
2007                    return None, None, error_bytes
2008                if error_reason:
2009                    raise error_reason
2010        if preimage:
2011            return preimage, None, None
2012        return None, None, None
2013
2014    def process_onion_packet(
2015            self,
2016            onion_packet: OnionPacket, *,
2017            payment_hash: bytes,
2018            onion_packet_bytes: bytes,
2019            is_trampoline: bool = False) -> ProcessedOnionPacket:
2020
2021        failure_data = sha256(onion_packet_bytes)
2022        try:
2023            processed_onion = process_onion_packet(
2024                onion_packet,
2025                associated_data=payment_hash,
2026                our_onion_private_key=self.privkey,
2027                is_trampoline=is_trampoline)
2028        except UnsupportedOnionPacketVersion:
2029            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
2030        except InvalidOnionPubkey:
2031            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_KEY, data=failure_data)
2032        except InvalidOnionMac:
2033            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_HMAC, data=failure_data)
2034        except Exception as e:
2035            self.logger.info(f"error processing onion packet: {e!r}")
2036            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
2037        if self.network.config.get('test_fail_malformed_htlc'):
2038            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
2039        if self.network.config.get('test_fail_htlcs_with_temp_node_failure'):
2040            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
2041        return processed_onion
2042