1from bitcoin.core import COIN  # type: ignore
2from bitcoin.rpc import RawProxy as BitcoinProxy  # type: ignore
3from bitcoin.rpc import JSONRPCError
4from contextlib import contextmanager
5from pathlib import Path
6from pyln.client import RpcError
7from pyln.testing.btcproxy import BitcoinRpcProxy
8from collections import OrderedDict
9from decimal import Decimal
10from ephemeral_port_reserve import reserve  # type: ignore
11from pyln.client import LightningRpc
12from pyln.client import Millisatoshi
13
14import json
15import logging
16import lzma
17import math
18import os
19import psutil  # type: ignore
20import random
21import re
22import shutil
23import sqlite3
24import string
25import struct
26import subprocess
27import sys
28import threading
29import time
30import warnings
31
32BITCOIND_CONFIG = {
33    "regtest": 1,
34    "rpcuser": "rpcuser",
35    "rpcpassword": "rpcpass",
36    "fallbackfee": Decimal(1000) / COIN,
37}
38
39
40LIGHTNINGD_CONFIG = OrderedDict({
41    "log-level": "debug",
42    "cltv-delta": 6,
43    "cltv-final": 5,
44    "watchtime-blocks": 5,
45    "rescan": 1,
46    'disable-dns': None,
47})
48
49FUNDAMOUNT = 10**6
50
51
52def env(name, default=None):
53    """Access to environment variables
54
55    Allows access to environment variables, falling back to config.vars (part
56    of c-lightning's `./configure` output), and finally falling back to a
57    default value.
58
59    """
60    fname = 'config.vars'
61    if os.path.exists(fname):
62        lines = open(fname, 'r').readlines()
63        config = dict([(line.rstrip().split('=', 1)) for line in lines])
64    else:
65        config = {}
66
67    if name in os.environ:
68        return os.environ[name]
69    elif name in config:
70        return config[name]
71    else:
72        return default
73
74
75VALGRIND = env("VALGRIND") == "1"
76TEST_NETWORK = env("TEST_NETWORK", 'regtest')
77DEVELOPER = env("DEVELOPER", "0") == "1"
78TEST_DEBUG = env("TEST_DEBUG", "0") == "1"
79SLOW_MACHINE = env("SLOW_MACHINE", "0") == "1"
80DEPRECATED_APIS = env("DEPRECATED_APIS", "0") == "1"
81TIMEOUT = int(env("TIMEOUT", 180 if SLOW_MACHINE else 60))
82EXPERIMENTAL_DUAL_FUND = env("EXPERIMENTAL_DUAL_FUND", "0") == "1"
83
84
85def wait_for(success, timeout=TIMEOUT):
86    start_time = time.time()
87    interval = 0.25
88    while not success():
89        time_left = start_time + timeout - time.time()
90        if time_left <= 0:
91            raise ValueError("Timeout while waiting for {}", success)
92        time.sleep(min(interval, time_left))
93        interval *= 2
94        if interval > 5:
95            interval = 5
96
97
98def write_config(filename, opts, regtest_opts=None, section_name='regtest'):
99    with open(filename, 'w') as f:
100        for k, v in opts.items():
101            f.write("{}={}\n".format(k, v))
102        if regtest_opts:
103            f.write("[{}]\n".format(section_name))
104            for k, v in regtest_opts.items():
105                f.write("{}={}\n".format(k, v))
106
107
108def only_one(arr):
109    """Many JSON RPC calls return an array; often we only expect a single entry
110    """
111    assert len(arr) == 1
112    return arr[0]
113
114
115def sync_blockheight(bitcoind, nodes):
116    height = bitcoind.rpc.getblockchaininfo()['blocks']
117    for n in nodes:
118        wait_for(lambda: n.rpc.getinfo()['blockheight'] == height)
119
120
121def wait_channel_quiescent(n1, n2):
122    wait_for(lambda: only_one(only_one(n1.rpc.listpeers(n2.info['id'])['peers'])['channels'])['htlcs'] == [])
123    wait_for(lambda: only_one(only_one(n2.rpc.listpeers(n1.info['id'])['peers'])['channels'])['htlcs'] == [])
124
125
126def get_tx_p2wsh_outnum(bitcoind, tx, amount):
127    """Get output number of this tx which is p2wsh of amount"""
128    decoded = bitcoind.rpc.decoderawtransaction(tx, True)
129
130    for out in decoded['vout']:
131        if out['scriptPubKey']['type'] == 'witness_v0_scripthash':
132            if out['value'] == Decimal(amount) / 10**8:
133                return out['n']
134
135    return None
136
137
138class TailableProc(object):
139    """A monitorable process that we can start, stop and tail.
140
141    This is the base class for the daemons. It allows us to directly
142    tail the processes and react to their output.
143    """
144
145    def __init__(self, outputDir=None, verbose=True):
146        self.logs = []
147        self.logs_cond = threading.Condition(threading.RLock())
148        self.env = os.environ.copy()
149        self.running = False
150        self.proc = None
151        self.outputDir = outputDir
152        self.logsearch_start = 0
153        self.err_logs = []
154        self.prefix = ""
155
156        # Should we be logging lines we read from stdout?
157        self.verbose = verbose
158
159        # A filter function that'll tell us whether to filter out the line (not
160        # pass it to the log matcher and not print it to stdout).
161        self.log_filter = lambda line: False
162
163    def start(self, stdin=None, stdout=None, stderr=None):
164        """Start the underlying process and start monitoring it.
165        """
166        logging.debug("Starting '%s'", " ".join(self.cmd_line))
167        self.proc = subprocess.Popen(self.cmd_line,
168                                     stdin=stdin,
169                                     stdout=stdout if stdout else subprocess.PIPE,
170                                     stderr=stderr,
171                                     env=self.env)
172        self.thread = threading.Thread(target=self.tail)
173        self.thread.daemon = True
174        self.thread.start()
175        self.running = True
176
177    def save_log(self):
178        if self.outputDir:
179            logpath = os.path.join(self.outputDir, 'log')
180            with open(logpath, 'w') as f:
181                for l in self.logs:
182                    f.write(l + '\n')
183
184    def stop(self, timeout=10):
185        self.save_log()
186        self.proc.terminate()
187
188        # Now give it some time to react to the signal
189        rc = self.proc.wait(timeout)
190
191        if rc is None:
192            self.proc.kill()
193
194        self.proc.wait()
195        self.thread.join()
196
197        return self.proc.returncode
198
199    def kill(self):
200        """Kill process without giving it warning."""
201        self.proc.kill()
202        self.proc.wait()
203        self.thread.join()
204
205    def tail(self):
206        """Tail the stdout of the process and remember it.
207
208        Stores the lines of output produced by the process in
209        self.logs and signals that a new line was read so that it can
210        be picked up by consumers.
211        """
212        for line in iter(self.proc.stdout.readline, ''):
213            if len(line) == 0:
214                break
215
216            line = line.decode('UTF-8', 'replace').rstrip()
217
218            if self.log_filter(line):
219                continue
220
221            if self.verbose:
222                sys.stdout.write("{}: {}\n".format(self.prefix, line))
223
224            with self.logs_cond:
225                self.logs.append(line)
226                self.logs_cond.notifyAll()
227
228        self.running = False
229        self.proc.stdout.close()
230
231        if self.proc.stderr:
232            for line in iter(self.proc.stderr.readline, ''):
233
234                if line is None or len(line) == 0:
235                    break
236
237                line = line.rstrip().decode('UTF-8', 'replace')
238                self.err_logs.append(line)
239
240            self.proc.stderr.close()
241
242    def is_in_log(self, regex, start=0):
243        """Look for `regex` in the logs."""
244
245        ex = re.compile(regex)
246        for l in self.logs[start:]:
247            if ex.search(l):
248                logging.debug("Found '%s' in logs", regex)
249                return l
250
251        logging.debug("Did not find '%s' in logs", regex)
252        return None
253
254    def is_in_stderr(self, regex):
255        """Look for `regex` in stderr."""
256
257        ex = re.compile(regex)
258        for l in self.err_logs:
259            if ex.search(l):
260                logging.debug("Found '%s' in stderr", regex)
261                return l
262
263        logging.debug("Did not find '%s' in stderr", regex)
264        return None
265
266    def wait_for_logs(self, regexs, timeout=TIMEOUT):
267        """Look for `regexs` in the logs.
268
269        We tail the stdout of the process and look for each regex in `regexs`,
270        starting from last of the previous waited-for log entries (if any).  We
271        fail if the timeout is exceeded or if the underlying process
272        exits before all the `regexs` were found.
273
274        If timeout is None, no time-out is applied.
275        """
276        logging.debug("Waiting for {} in the logs".format(regexs))
277        exs = [re.compile(r) for r in regexs]
278        start_time = time.time()
279        pos = self.logsearch_start
280        while True:
281            if timeout is not None and time.time() > start_time + timeout:
282                print("Time-out: can't find {} in logs".format(exs))
283                for r in exs:
284                    if self.is_in_log(r):
285                        print("({} was previously in logs!)".format(r))
286                raise TimeoutError('Unable to find "{}" in logs.'.format(exs))
287
288            with self.logs_cond:
289                if pos >= len(self.logs):
290                    if not self.running:
291                        raise ValueError('Process died while waiting for logs')
292                    self.logs_cond.wait(1)
293                    continue
294
295                for r in exs.copy():
296                    self.logsearch_start = pos + 1
297                    if r.search(self.logs[pos]):
298                        logging.debug("Found '%s' in logs", r)
299                        exs.remove(r)
300                        break
301                if len(exs) == 0:
302                    return self.logs[pos]
303                pos += 1
304
305    def wait_for_log(self, regex, timeout=TIMEOUT):
306        """Look for `regex` in the logs.
307
308        Convenience wrapper for the common case of only seeking a single entry.
309        """
310        return self.wait_for_logs([regex], timeout)
311
312
313class SimpleBitcoinProxy:
314    """Wrapper for BitcoinProxy to reconnect.
315
316    Long wait times between calls to the Bitcoin RPC could result in
317    `bitcoind` closing the connection, so here we just create
318    throwaway connections. This is easier than to reach into the RPC
319    library to close, reopen and reauth upon failure.
320    """
321    def __init__(self, btc_conf_file, *args, **kwargs):
322        self.__btc_conf_file__ = btc_conf_file
323
324    def __getattr__(self, name):
325        if name.startswith('__') and name.endswith('__'):
326            # Python internal stuff
327            raise AttributeError
328
329        # Create a callable to do the actual call
330        proxy = BitcoinProxy(btc_conf_file=self.__btc_conf_file__)
331
332        def f(*args):
333            logging.debug("Calling {name} with arguments {args}".format(
334                name=name,
335                args=args
336            ))
337            res = proxy._call(name, *args)
338            logging.debug("Result for {name} call: {res}".format(
339                name=name,
340                res=res,
341            ))
342            return res
343
344        # Make debuggers show <function bitcoin.rpc.name> rather than <function
345        # bitcoin.rpc.<lambda>>
346        f.__name__ = name
347        return f
348
349
350class BitcoinD(TailableProc):
351
352    def __init__(self, bitcoin_dir="/tmp/bitcoind-test", rpcport=None):
353        TailableProc.__init__(self, bitcoin_dir, verbose=False)
354
355        if rpcport is None:
356            rpcport = reserve()
357
358        self.bitcoin_dir = bitcoin_dir
359        self.rpcport = rpcport
360        self.prefix = 'bitcoind'
361
362        regtestdir = os.path.join(bitcoin_dir, 'regtest')
363        if not os.path.exists(regtestdir):
364            os.makedirs(regtestdir)
365
366        self.cmd_line = [
367            'bitcoind',
368            '-datadir={}'.format(bitcoin_dir),
369            '-printtoconsole',
370            '-server',
371            '-logtimestamps',
372            '-nolisten',
373            '-txindex',
374            '-nowallet',
375            '-addresstype=bech32'
376        ]
377        # For up to and including 0.16.1, this needs to be in main section.
378        BITCOIND_CONFIG['rpcport'] = rpcport
379        # For after 0.16.1 (eg. 3f398d7a17f136cd4a67998406ca41a124ae2966), this
380        # needs its own [regtest] section.
381        BITCOIND_REGTEST = {'rpcport': rpcport}
382        self.conf_file = os.path.join(bitcoin_dir, 'bitcoin.conf')
383        write_config(self.conf_file, BITCOIND_CONFIG, BITCOIND_REGTEST)
384        self.rpc = SimpleBitcoinProxy(btc_conf_file=self.conf_file)
385        self.proxies = []
386
387    def start(self):
388        TailableProc.start(self)
389        self.wait_for_log("Done loading", timeout=TIMEOUT)
390
391        logging.info("BitcoinD started")
392        try:
393            self.rpc.createwallet("lightningd-tests")
394        except JSONRPCError:
395            self.rpc.loadwallet("lightningd-tests")
396
397    def stop(self):
398        for p in self.proxies:
399            p.stop()
400        self.rpc.stop()
401        return TailableProc.stop(self)
402
403    def get_proxy(self):
404        proxy = BitcoinRpcProxy(self)
405        self.proxies.append(proxy)
406        proxy.start()
407        return proxy
408
409    # wait_for_mempool can be used to wait for the mempool before generating blocks:
410    # True := wait for at least 1 transation
411    # int > 0 := wait for at least N transactions
412    # 'tx_id' := wait for one transaction id given as a string
413    # ['tx_id1', 'tx_id2'] := wait until all of the specified transaction IDs
414    def generate_block(self, numblocks=1, wait_for_mempool=0):
415        if wait_for_mempool:
416            if isinstance(wait_for_mempool, str):
417                wait_for_mempool = [wait_for_mempool]
418            if isinstance(wait_for_mempool, list):
419                wait_for(lambda: all(txid in self.rpc.getrawmempool() for txid in wait_for_mempool))
420            else:
421                wait_for(lambda: len(self.rpc.getrawmempool()) >= wait_for_mempool)
422
423        mempool = self.rpc.getrawmempool()
424        logging.debug("Generating {numblocks}, confirming {lenmempool} transactions: {mempool}".format(
425            numblocks=numblocks,
426            mempool=mempool,
427            lenmempool=len(mempool),
428        ))
429
430        # As of 0.16, generate() is removed; use generatetoaddress.
431        return self.rpc.generatetoaddress(numblocks, self.rpc.getnewaddress())
432
433    def simple_reorg(self, height, shift=0):
434        """
435        Reorganize chain by creating a fork at height=[height] and re-mine all mempool
436        transactions into [height + shift], where shift >= 0. Returns hashes of generated
437        blocks.
438
439        Note that tx's that become invalid at [height] (because coin maturity, locktime
440        etc.) are removed from mempool. The length of the new chain will be original + 1
441        OR original + [shift], whichever is larger.
442
443        For example: to push tx's backward from height h1 to h2 < h1, use [height]=h2.
444
445        Or to change the txindex of tx's at height h1:
446        1. A block at height h2 < h1 should contain a non-coinbase tx that can be pulled
447           forward to h1.
448        2. Set [height]=h2 and [shift]= h1-h2
449        """
450        hashes = []
451        fee_delta = 1000000
452        orig_len = self.rpc.getblockcount()
453        old_hash = self.rpc.getblockhash(height)
454        final_len = height + shift if height + shift > orig_len else 1 + orig_len
455        # TODO: raise error for insane args?
456
457        self.rpc.invalidateblock(old_hash)
458        self.wait_for_log(r'InvalidChainFound: invalid block=.*  height={}'.format(height))
459        memp = self.rpc.getrawmempool()
460
461        if shift == 0:
462            hashes += self.generate_block(1 + final_len - height)
463        else:
464            for txid in memp:
465                # lower priority (to effective feerate=0) so they are not mined
466                self.rpc.prioritisetransaction(txid, None, -fee_delta)
467            hashes += self.generate_block(shift)
468
469            for txid in memp:
470                # restore priority so they are mined
471                self.rpc.prioritisetransaction(txid, None, fee_delta)
472            hashes += self.generate_block(1 + final_len - (height + shift))
473        self.wait_for_log(r'UpdateTip: new best=.* height={}'.format(final_len))
474        return hashes
475
476    def getnewaddress(self):
477        return self.rpc.getnewaddress()
478
479
480class ElementsD(BitcoinD):
481    def __init__(self, bitcoin_dir="/tmp/bitcoind-test", rpcport=None):
482        config = BITCOIND_CONFIG.copy()
483        if 'regtest' in config:
484            del config['regtest']
485
486        config['chain'] = 'liquid-regtest'
487        BitcoinD.__init__(self, bitcoin_dir, rpcport)
488
489        self.cmd_line = [
490            'elementsd',
491            '-datadir={}'.format(bitcoin_dir),
492            '-printtoconsole',
493            '-server',
494            '-logtimestamps',
495            '-nolisten',
496            '-nowallet',
497            '-validatepegin=0',
498            '-con_blocksubsidy=5000000000',
499        ]
500        conf_file = os.path.join(bitcoin_dir, 'elements.conf')
501        config['rpcport'] = self.rpcport
502        BITCOIND_REGTEST = {'rpcport': self.rpcport}
503        write_config(conf_file, config, BITCOIND_REGTEST, section_name='liquid-regtest')
504        self.conf_file = conf_file
505        self.rpc = SimpleBitcoinProxy(btc_conf_file=self.conf_file)
506        self.prefix = 'elementsd'
507
508    def getnewaddress(self):
509        """Need to get an address and then make it unconfidential
510        """
511        addr = self.rpc.getnewaddress()
512        info = self.rpc.getaddressinfo(addr)
513        return info['unconfidential']
514
515
516class LightningD(TailableProc):
517    def __init__(self, lightning_dir, bitcoindproxy, port=9735, random_hsm=False, node_id=0):
518        TailableProc.__init__(self, lightning_dir)
519        self.executable = 'lightningd'
520        self.lightning_dir = lightning_dir
521        self.port = port
522        self.cmd_prefix = []
523        self.disconnect_file = None
524
525        self.rpcproxy = bitcoindproxy
526
527        self.opts = LIGHTNINGD_CONFIG.copy()
528        opts = {
529            'lightning-dir': lightning_dir,
530            'addr': '127.0.0.1:{}'.format(port),
531            'allow-deprecated-apis': '{}'.format("true" if DEPRECATED_APIS
532                                                 else "false"),
533            'network': TEST_NETWORK,
534            'ignore-fee-limits': 'false',
535            'bitcoin-rpcuser': BITCOIND_CONFIG['rpcuser'],
536            'bitcoin-rpcpassword': BITCOIND_CONFIG['rpcpassword'],
537
538            # Make sure we don't touch any existing config files in the user's $HOME
539            'bitcoin-datadir': lightning_dir,
540        }
541
542        for k, v in opts.items():
543            self.opts[k] = v
544
545        if not os.path.exists(os.path.join(lightning_dir, TEST_NETWORK)):
546            os.makedirs(os.path.join(lightning_dir, TEST_NETWORK))
547
548        # Last 32-bytes of final part of dir -> seed.
549        seed = (bytes(re.search('([^/]+)/*$', lightning_dir).group(1), encoding='utf-8') + bytes(32))[:32]
550        if not random_hsm:
551            with open(os.path.join(lightning_dir, TEST_NETWORK, 'hsm_secret'), 'wb') as f:
552                f.write(seed)
553        if DEVELOPER:
554            self.opts['dev-fast-gossip'] = None
555            self.opts['dev-bitcoind-poll'] = 1
556        self.prefix = 'lightningd-%d' % (node_id)
557
558    def cleanup(self):
559        # To force blackhole to exit, disconnect file must be truncated!
560        if self.disconnect_file:
561            with open(self.disconnect_file, "w") as f:
562                f.truncate()
563
564    @property
565    def cmd_line(self):
566
567        opts = []
568        for k, v in self.opts.items():
569            if v is None:
570                opts.append("--{}".format(k))
571            elif isinstance(v, list):
572                for i in v:
573                    opts.append("--{}={}".format(k, i))
574            else:
575                opts.append("--{}={}".format(k, v))
576
577        return self.cmd_prefix + [self.executable] + opts
578
579    def start(self, stdin=None, stdout=None, stderr=None,
580              wait_for_initialized=True):
581        self.opts['bitcoin-rpcport'] = self.rpcproxy.rpcport
582        TailableProc.start(self, stdin, stdout, stderr)
583        if wait_for_initialized:
584            self.wait_for_log("Server started with public key")
585        logging.info("LightningD started")
586
587    def wait(self, timeout=10):
588        """Wait for the daemon to stop for up to timeout seconds
589
590        Returns the returncode of the process, None if the process did
591        not return before the timeout triggers.
592        """
593        self.proc.wait(timeout)
594        return self.proc.returncode
595
596
597class PrettyPrintingLightningRpc(LightningRpc):
598    """A version of the LightningRpc that pretty-prints calls and results.
599
600    Useful when debugging based on logs, and less painful to the
601    eyes. It has some overhead since we re-serialize the request and
602    result to json in order to pretty print it.
603
604    Also validates (optional) schemas for us.
605    """
606    def __init__(self, socket_path, executor=None, logger=logging,
607                 patch_json=True, jsonschemas={}):
608        super().__init__(
609            socket_path,
610            executor,
611            logger,
612            patch_json,
613        )
614        self.jsonschemas = jsonschemas
615
616    def call(self, method, payload=None):
617        id = self.next_id
618        self.logger.debug(json.dumps({
619            "id": id,
620            "method": method,
621            "params": payload
622        }, indent=2))
623        res = LightningRpc.call(self, method, payload)
624        self.logger.debug(json.dumps({
625            "id": id,
626            "result": res
627        }, indent=2))
628
629        if method in self.jsonschemas:
630            self.jsonschemas[method].validate(res)
631
632        return res
633
634
635class LightningNode(object):
636    def __init__(self, node_id, lightning_dir, bitcoind, executor, valgrind, may_fail=False,
637                 may_reconnect=False,
638                 allow_broken_log=False,
639                 allow_warning=False,
640                 allow_bad_gossip=False,
641                 db=None, port=None, disconnect=None, random_hsm=None, options=None,
642                 jsonschemas={},
643                 **kwargs):
644        self.bitcoin = bitcoind
645        self.executor = executor
646        self.may_fail = may_fail
647        self.may_reconnect = may_reconnect
648        self.allow_broken_log = allow_broken_log
649        self.allow_bad_gossip = allow_bad_gossip
650        self.allow_warning = allow_warning
651        self.db = db
652
653        # Assume successful exit
654        self.rc = 0
655
656        socket_path = os.path.join(lightning_dir, TEST_NETWORK, "lightning-rpc").format(node_id)
657        self.rpc = PrettyPrintingLightningRpc(socket_path, self.executor, jsonschemas=jsonschemas)
658
659        self.daemon = LightningD(
660            lightning_dir, bitcoindproxy=bitcoind.get_proxy(),
661            port=port, random_hsm=random_hsm, node_id=node_id
662        )
663        # If we have a disconnect string, dump it to a file for daemon.
664        if disconnect:
665            self.daemon.disconnect_file = os.path.join(lightning_dir, TEST_NETWORK, "dev_disconnect")
666            with open(self.daemon.disconnect_file, "w") as f:
667                f.write("\n".join(disconnect))
668            self.daemon.opts["dev-disconnect"] = "dev_disconnect"
669        if DEVELOPER:
670            self.daemon.opts["dev-fail-on-subdaemon-fail"] = None
671            # Don't run --version on every subdaemon if we're valgrinding and slow.
672            if SLOW_MACHINE and VALGRIND:
673                self.daemon.opts["dev-no-version-checks"] = None
674            if os.getenv("DEBUG_SUBD"):
675                self.daemon.opts["dev-debugger"] = os.getenv("DEBUG_SUBD")
676            if valgrind:
677                self.daemon.env["LIGHTNINGD_DEV_NO_BACKTRACE"] = "1"
678            else:
679                # Under valgrind, scanning can access uninitialized mem.
680                self.daemon.env["LIGHTNINGD_DEV_MEMLEAK"] = "1"
681            if not may_reconnect:
682                self.daemon.opts["dev-no-reconnect"] = None
683        if EXPERIMENTAL_DUAL_FUND:
684            self.daemon.opts["experimental-dual-fund"] = None
685
686        if options is not None:
687            self.daemon.opts.update(options)
688        dsn = db.get_dsn()
689        if dsn is not None:
690            self.daemon.opts['wallet'] = dsn
691        if valgrind:
692            self.daemon.cmd_prefix = [
693                'valgrind',
694                '-q',
695                '--trace-children=yes',
696                '--trace-children-skip=*python*,*bitcoin-cli*,*elements-cli*',
697                '--error-exitcode=7',
698                '--log-file={}/valgrind-errors.%p'.format(self.daemon.lightning_dir)
699            ]
700            # Reduce precision of errors, speeding startup and reducing memory greatly:
701            if SLOW_MACHINE:
702                self.daemon.cmd_prefix += ['--read-inline-info=no']
703
704    def connect(self, remote_node):
705        self.rpc.connect(remote_node.info['id'], '127.0.0.1', remote_node.daemon.port)
706
707    def is_connected(self, remote_node):
708        return remote_node.info['id'] in [p['id'] for p in self.rpc.listpeers()['peers']]
709
710    def openchannel(self, remote_node, capacity=FUNDAMOUNT, addrtype="p2sh-segwit", confirm=True, wait_for_announce=True, connect=True):
711        addr, wallettxid = self.fundwallet(10 * capacity, addrtype)
712
713        if connect and not self.is_connected(remote_node):
714            self.connect(remote_node)
715
716        fundingtx = self.rpc.fundchannel(remote_node.info['id'], capacity)
717
718        # Wait for the funding transaction to be in bitcoind's mempool
719        wait_for(lambda: fundingtx['txid'] in self.bitcoin.rpc.getrawmempool())
720
721        if confirm or wait_for_announce:
722            self.bitcoin.generate_block(1)
723
724        if wait_for_announce:
725            self.bitcoin.generate_block(5)
726
727        if confirm or wait_for_announce:
728            self.daemon.wait_for_log(
729                r'Funding tx {} depth'.format(fundingtx['txid']))
730        return {'address': addr, 'wallettxid': wallettxid, 'fundingtx': fundingtx}
731
732    def fundwallet(self, sats, addrtype="p2sh-segwit", mine_block=True):
733        addr = self.rpc.newaddr(addrtype)[addrtype]
734        txid = self.bitcoin.rpc.sendtoaddress(addr, sats / 10**8)
735        if mine_block:
736            self.bitcoin.generate_block(1)
737            self.daemon.wait_for_log('Owning output .* txid {} CONFIRMED'.format(txid))
738        return addr, txid
739
740    def fundbalancedchannel(self, remote_node, total_capacity, announce=True):
741        '''
742        Creates a perfectly-balanced channel, as all things should be.
743        '''
744        if isinstance(total_capacity, Millisatoshi):
745            total_capacity = int(total_capacity.to_satoshi())
746        else:
747            total_capacity = int(total_capacity)
748
749        self.fundwallet(total_capacity + 10000)
750
751        if remote_node.config('experimental-dual-fund'):
752            remote_node.fundwallet(total_capacity + 10000)
753            # We cut the total_capacity in half, since the peer's
754            # expected to contribute that same amount
755            chan_capacity = total_capacity // 2
756            total_capacity = chan_capacity * 2
757            # Tell the node to equally dual-fund the channel
758            remote_node.rpc.call('funderupdate', {'policy': 'match',
759                                                  'policy_mod': 100,
760                                                  'fuzz_percent': 0})
761        else:
762            chan_capacity = total_capacity
763
764        self.rpc.connect(remote_node.info['id'], 'localhost', remote_node.port)
765
766        # Make sure the fundchannel is confirmed.
767        num_tx = len(self.bitcoin.rpc.getrawmempool())
768        res = self.rpc.fundchannel(remote_node.info['id'], chan_capacity, feerate='slow', minconf=0, announce=announce, push_msat=Millisatoshi(chan_capacity * 500))
769        wait_for(lambda: len(self.bitcoin.rpc.getrawmempool()) == num_tx + 1)
770        blockid = self.bitcoin.generate_block(1)[0]
771
772        # Generate the scid.
773        outnum = get_tx_p2wsh_outnum(self.bitcoin, res['tx'], total_capacity)
774        if outnum is None:
775            raise ValueError("no outnum found. capacity {} tx {}".format(total_capacity, res['tx']))
776
777        for i, txid in enumerate(self.bitcoin.rpc.getblock(blockid)['tx']):
778            if txid == res['txid']:
779                txnum = i
780
781        return '{}x{}x{}'.format(self.bitcoin.rpc.getblockcount(), txnum, outnum)
782
783    def getactivechannels(self):
784        return [c for c in self.rpc.listchannels()['channels'] if c['active']]
785
786    def db_query(self, query):
787        return self.db.query(query)
788
789    # Assumes node is stopped!
790    def db_manip(self, query):
791        db = sqlite3.connect(os.path.join(self.daemon.lightning_dir, TEST_NETWORK, "lightningd.sqlite3"))
792        db.row_factory = sqlite3.Row
793        c = db.cursor()
794        c.execute(query)
795        db.commit()
796        c.close()
797        db.close()
798
799    def is_synced_with_bitcoin(self, info=None):
800        if info is None:
801            info = self.rpc.getinfo()
802        return 'warning_bitcoind_sync' not in info and 'warning_lightningd_sync' not in info
803
804    def start(self, wait_for_bitcoind_sync=True, stderr=None):
805        self.daemon.start(stderr=stderr)
806        # Cache `getinfo`, we'll be using it a lot
807        self.info = self.rpc.getinfo()
808        # This shortcut is sufficient for our simple tests.
809        self.port = self.info['binding'][0]['port']
810        if wait_for_bitcoind_sync and not self.is_synced_with_bitcoin(self.info):
811            wait_for(lambda: self.is_synced_with_bitcoin())
812
813    def stop(self, timeout=10):
814        """ Attempt to do a clean shutdown, but kill if it hangs
815        """
816
817        # Tell the daemon to stop
818        try:
819            # May fail if the process already died
820            self.rpc.stop()
821        except Exception:
822            pass
823
824        self.rc = self.daemon.wait(timeout)
825
826        # If it did not stop be more insistent
827        if self.rc is None:
828            self.rc = self.daemon.stop()
829
830        self.daemon.save_log()
831        self.daemon.cleanup()
832
833        if self.rc != 0 and not self.may_fail:
834            raise ValueError("Node did not exit cleanly, rc={}".format(self.rc))
835        else:
836            return self.rc
837
838    def restart(self, timeout=10, clean=True):
839        """Stop and restart the lightning node.
840
841        Keyword arguments:
842        timeout: number of seconds to wait for a shutdown
843        clean: whether to issue a `stop` RPC command before killing
844        """
845        if clean:
846            self.stop(timeout)
847        else:
848            self.daemon.stop()
849
850        self.start()
851
852    def fund_channel(self, l2, amount, wait_for_active=True, announce_channel=True):
853        warnings.warn("LightningNode.fund_channel is deprecated in favor of "
854                      "LightningNode.fundchannel", category=DeprecationWarning)
855        return self.fundchannel(l2, amount, wait_for_active, announce_channel)
856
857    def fundchannel(self, l2, amount=FUNDAMOUNT, wait_for_active=True,
858                    announce_channel=True, **kwargs):
859        # Give yourself some funds to work with
860        addr = self.rpc.newaddr()['bech32']
861
862        def has_funds_on_addr(addr):
863            """Check if the given address has funds in the internal wallet.
864            """
865            outs = self.rpc.listfunds()['outputs']
866            addrs = [o['address'] for o in outs]
867            return addr in addrs
868
869        # We should not have funds on that address yet, we just generated it.
870        assert(not has_funds_on_addr(addr))
871
872        self.bitcoin.rpc.sendtoaddress(addr, (amount + 1000000) / 10**8)
873        self.bitcoin.generate_block(1)
874
875        # Now we should.
876        wait_for(lambda: has_funds_on_addr(addr))
877
878        # Now go ahead and open a channel
879        res = self.rpc.fundchannel(l2.info['id'], amount,
880                                   announce=announce_channel,
881                                   **kwargs)
882        wait_for(lambda: res['txid'] in self.bitcoin.rpc.getrawmempool())
883        blockid = self.bitcoin.generate_block(1)[0]
884
885        for i, txid in enumerate(self.bitcoin.rpc.getblock(blockid)['tx']):
886            if txid == res['txid']:
887                txnum = i
888
889        scid = "{}x{}x{}".format(self.bitcoin.rpc.getblockcount(),
890                                 txnum, res['outnum'])
891
892        if wait_for_active:
893            self.wait_channel_active(scid)
894            l2.wait_channel_active(scid)
895
896        return scid, res
897
898    def subd_pid(self, subd, peerid=None):
899        """Get the process id of the given subdaemon, eg channeld or gossipd"""
900        if peerid:
901            ex = re.compile(r'{}-.*{}.*: pid ([0-9]*),'
902                            .format(peerid, subd))
903        else:
904            ex = re.compile('{}-.*: pid ([0-9]*),'.format(subd))
905        # Make sure we get latest one if it's restarted!
906        for l in reversed(self.daemon.logs):
907            group = ex.search(l)
908            if group:
909                return group.group(1)
910        raise ValueError("No daemon {} found".format(subd))
911
912    def channel_state(self, other):
913        """Return the state of the channel to the other node.
914
915        Returns None if there is no such peer, or a channel hasn't been funded
916        yet.
917
918        """
919        peers = self.rpc.listpeers(other.info['id'])['peers']
920        if not peers or 'channels' not in peers[0]:
921            return None
922        channel = peers[0]['channels'][0]
923        return channel['state']
924
925    def get_channel_scid(self, other):
926        """Get the short_channel_id for the channel to the other node.
927        """
928        peers = self.rpc.listpeers(other.info['id'])['peers']
929        if not peers or 'channels' not in peers[0]:
930            return None
931        channel = peers[0]['channels'][0]
932        return channel['short_channel_id']
933
934    def get_channel_id(self, other):
935        """Get the channel_id for the channel to the other node.
936        """
937        peers = self.rpc.listpeers(other.info['id'])['peers']
938        if not peers or 'channels' not in peers[0]:
939            return None
940        channel = peers[0]['channels'][0]
941        return channel['channel_id']
942
943    def is_channel_active(self, chanid):
944        channels = self.rpc.listchannels(chanid)['channels']
945        active = [(c['short_channel_id'], c['channel_flags']) for c in channels if c['active']]
946        return (chanid, 0) in active and (chanid, 1) in active
947
948    def wait_for_channel_onchain(self, peerid):
949        txid = only_one(only_one(self.rpc.listpeers(peerid)['peers'])['channels'])['scratch_txid']
950        wait_for(lambda: txid in self.bitcoin.rpc.getrawmempool())
951
952    def wait_channel_active(self, chanid):
953        wait_for(lambda: self.is_channel_active(chanid))
954
955    # This waits until gossipd sees channel_update in both directions
956    # (or for local channels, at least a local announcement)
957    def wait_for_channel_updates(self, scids):
958        # Could happen in any order...
959        self.daemon.wait_for_logs(['Received channel_update for channel {}/0'.format(c)
960                                   for c in scids]
961                                  + ['Received channel_update for channel {}/1'.format(c)
962                                     for c in scids])
963
964    def wait_for_route(self, destination, timeout=TIMEOUT):
965        """ Wait for a route to the destination to become available.
966        """
967        start_time = time.time()
968        while time.time() < start_time + timeout:
969            try:
970                self.rpc.getroute(destination.info['id'], 1, 1)
971                return True
972            except Exception:
973                time.sleep(1)
974        if time.time() > start_time + timeout:
975            raise ValueError("Error waiting for a route to destination {}".format(destination))
976
977    # This helper waits for all HTLCs to settle
978    # `scids` can be a list of strings. If unset wait on all channels.
979    def wait_for_htlcs(self, scids=None):
980        peers = self.rpc.listpeers()['peers']
981        for p, peer in enumerate(peers):
982            if 'channels' in peer:
983                for c, channel in enumerate(peer['channels']):
984                    if scids is not None and channel['short_channel_id'] not in scids:
985                        continue
986                    if 'htlcs' in channel:
987                        wait_for(lambda: len(self.rpc.listpeers()['peers'][p]['channels'][c]['htlcs']) == 0)
988
989    # This sends money to a directly connected peer
990    def pay(self, dst, amt, label=None):
991        if not label:
992            label = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(20))
993
994        # check we are connected
995        dst_id = dst.info['id']
996        assert len(self.rpc.listpeers(dst_id).get('peers')) == 1
997
998        # make an invoice
999        inv = dst.rpc.invoice(amt, label, label)
1000        # FIXME: pre 0.10.1 invoice calls didn't have payment_secret field
1001        psecret = dst.rpc.decodepay(inv['bolt11'])['payment_secret']
1002        rhash = inv['payment_hash']
1003        invoices = dst.rpc.listinvoices(label)['invoices']
1004        assert len(invoices) == 1 and invoices[0]['status'] == 'unpaid'
1005
1006        routestep = {
1007            'msatoshi': amt,
1008            'id': dst_id,
1009            'delay': 5,
1010            'channel': '1x1x1'  # note: can be bogus for 1-hop direct payments
1011        }
1012
1013        # sendpay is async now
1014        self.rpc.sendpay([routestep], rhash, payment_secret=psecret)
1015        # wait for sendpay to comply
1016        result = self.rpc.waitsendpay(rhash)
1017        assert(result.get('status') == 'complete')
1018
1019    # This helper sends all money to a peer until even 1 msat can't get through.
1020    def drain(self, peer):
1021        total = 0
1022        msat = 4294967295  # Max payment size in some configs
1023        while msat != 0:
1024            try:
1025                logging.debug("Drain step with size={}".format(msat))
1026                self.pay(peer, msat)
1027                total += msat
1028            except RpcError as e:
1029                logging.debug("Got an exception while draining channel: {}".format(e))
1030                msat //= 2
1031        logging.debug("Draining complete after sending a total of {}msats".format(total))
1032        return total
1033
1034    # Note: this feeds through the smoother in update_feerate, so changing
1035    # it on a running daemon may not give expected result!
1036    def set_feerates(self, feerates, wait_for_effect=True):
1037        # (bitcoind returns bitcoin per kb, so these are * 4)
1038
1039        def mock_estimatesmartfee(r):
1040            params = r['params']
1041            if params == [2, 'CONSERVATIVE']:
1042                feerate = feerates[0] * 4
1043            elif params == [6, 'ECONOMICAL']:
1044                feerate = feerates[1] * 4
1045            elif params == [12, 'ECONOMICAL']:
1046                feerate = feerates[2] * 4
1047            elif params == [100, 'ECONOMICAL']:
1048                feerate = feerates[3] * 4
1049            else:
1050                warnings.warn("Don't have a feerate set for {}/{}.".format(
1051                    params[0], params[1],
1052                ))
1053                feerate = 42
1054            return {
1055                'id': r['id'],
1056                'error': None,
1057                'result': {
1058                    'feerate': Decimal(feerate) / 10**8
1059                },
1060            }
1061        self.daemon.rpcproxy.mock_rpc('estimatesmartfee', mock_estimatesmartfee)
1062
1063        # Technically, this waits until it's called, not until it's processed.
1064        # We wait until all three levels have been called.
1065        if wait_for_effect:
1066            wait_for(lambda:
1067                     self.daemon.rpcproxy.mock_counts['estimatesmartfee'] >= 4)
1068
1069    # force new feerates by restarting and thus skipping slow smoothed process
1070    # Note: testnode must be created with: opts={'may_reconnect': True}
1071    def force_feerates(self, rate):
1072        assert(self.may_reconnect)
1073        self.set_feerates([rate] * 4, False)
1074        self.restart()
1075        self.daemon.wait_for_log('peer_out WIRE_UPDATE_FEE')
1076        assert(self.rpc.feerates('perkw')['perkw']['opening'] == rate)
1077
1078    def wait_for_onchaind_broadcast(self, name, resolve=None):
1079        """Wait for onchaind to drop tx name to resolve (if any)"""
1080        if resolve:
1081            r = self.daemon.wait_for_log('Broadcasting {} .* to resolve {}'
1082                                         .format(name, resolve))
1083        else:
1084            r = self.daemon.wait_for_log('Broadcasting {} .* to resolve '
1085                                         .format(name))
1086
1087        rawtx = re.search(r'.* \(([0-9a-fA-F]*)\) ', r).group(1)
1088        txid = self.bitcoin.rpc.decoderawtransaction(rawtx, True)['txid']
1089
1090        wait_for(lambda: txid in self.bitcoin.rpc.getrawmempool())
1091
1092    def query_gossip(self, querytype, *args, filters=[]):
1093        """Generate a gossip query, feed it into this node and get responses
1094        in hex"""
1095        query = subprocess.run(['devtools/mkquery',
1096                                querytype] + [str(a) for a in args],
1097                               check=True,
1098                               timeout=TIMEOUT,
1099                               stdout=subprocess.PIPE).stdout.strip()
1100        out = subprocess.run(['devtools/gossipwith',
1101                              '--timeout-after={}'.format(int(math.sqrt(TIMEOUT) + 1)),
1102                              '{}@localhost:{}'.format(self.info['id'],
1103                                                       self.port),
1104                              query],
1105                             check=True,
1106                             timeout=TIMEOUT, stdout=subprocess.PIPE).stdout
1107
1108        def passes_filters(hmsg, filters):
1109            for f in filters:
1110                if hmsg.startswith(f):
1111                    return False
1112            return True
1113
1114        msgs = []
1115        while len(out):
1116            length = struct.unpack('>H', out[0:2])[0]
1117            hmsg = out[2:2 + length].hex()
1118            if passes_filters(hmsg, filters):
1119                msgs.append(out[2:2 + length].hex())
1120            out = out[2 + length:]
1121        return msgs
1122
1123    def config(self, config_name):
1124        try:
1125            opt = self.rpc.listconfigs(config_name)
1126            return opt[config_name]
1127        except RpcError:
1128            return None
1129
1130
1131@contextmanager
1132def flock(directory: Path):
1133    """A fair filelock, based on atomic fs operations.
1134    """
1135    if not isinstance(directory, Path):
1136        directory = Path(directory)
1137    d = directory / Path(".locks")
1138    os.makedirs(str(d), exist_ok=True)
1139    fname = None
1140
1141    while True:
1142        # Try until we find a filename that doesn't exist yet.
1143        try:
1144            fname = d / Path("lock-{}".format(time.time()))
1145            fd = os.open(str(fname), flags=os.O_CREAT | os.O_EXCL)
1146            os.close(fd)
1147            break
1148        except FileExistsError:
1149            time.sleep(0.1)
1150
1151    # So now we have a position in the lock, let's check if we are the
1152    # next one to go:
1153    while True:
1154        files = sorted([f for f in d.iterdir() if f.is_file()])
1155        # We're queued, so it should at least have us.
1156        assert len(files) >= 1
1157        if files[0] == fname:
1158            break
1159        time.sleep(0.1)
1160
1161    # We can continue
1162    yield fname
1163
1164    # Remove our file, so the next one can go ahead.
1165    fname.unlink()
1166
1167
1168class Throttler(object):
1169    """Throttles the creation of system-processes to avoid overload.
1170
1171    There is no reason to overload the system with too many processes
1172    being spawned or run at the same time. It causes timeouts by
1173    aggressively preempting processes and swapping if the memory limit is
1174    reached. In order to reduce this loss of performance we provide a
1175    `wait()` method which will serialize the creation of processes, but
1176    also delay if the system load is too high.
1177
1178    Notice that technically we are throttling too late, i.e., we react
1179    to an overload, but chances are pretty good that some other
1180    already running process is about to terminate, and so the overload
1181    is short-lived. We throttle when the process object is first
1182    created, not when restarted, in order to avoid delaying running
1183    tests, which could cause more timeouts.
1184
1185    """
1186    def __init__(self, directory: str, target: float = 90):
1187        """If specified we try to stick to a load of target (in percent).
1188        """
1189        self.target = target
1190        self.current_load = self.target  # Start slow
1191        psutil.cpu_percent()  # Prime the internal load metric
1192        self.directory = directory
1193
1194    def wait(self):
1195        start_time = time.time()
1196        with flock(self.directory):
1197            # We just got the lock, assume someone else just released it
1198            self.current_load = 100
1199            while self.load() >= self.target:
1200                time.sleep(1)
1201
1202            self.current_load = 100  # Back off slightly to avoid triggering right away
1203        print("Throttler delayed startup for {} seconds".format(time.time() - start_time))
1204
1205    def load(self):
1206        """An exponential moving average of the load
1207        """
1208        decay = 0.5
1209        load = psutil.cpu_percent()
1210        self.current_load = decay * load + (1 - decay) * self.current_load
1211        return self.current_load
1212
1213
1214class NodeFactory(object):
1215    """A factory to setup and start `lightningd` daemons.
1216    """
1217    def __init__(self, request, testname, bitcoind, executor, directory,
1218                 db_provider, node_cls, throttler, jsonschemas):
1219        if request.node.get_closest_marker("slow_test") and SLOW_MACHINE:
1220            self.valgrind = False
1221        else:
1222            self.valgrind = VALGRIND
1223        self.testname = testname
1224        self.next_id = 1
1225        self.nodes = []
1226        self.executor = executor
1227        self.bitcoind = bitcoind
1228        self.directory = directory
1229        self.lock = threading.Lock()
1230        self.db_provider = db_provider
1231        self.node_cls = node_cls
1232        self.throttler = throttler
1233        self.jsonschemas = jsonschemas
1234
1235    def split_options(self, opts):
1236        """Split node options from cli options
1237
1238        Some options are used to instrument the node wrapper and some are passed
1239        to the daemon on the command line. Split them so we know where to use
1240        them.
1241        """
1242        node_opt_keys = [
1243            'disconnect',
1244            'may_fail',
1245            'allow_broken_log',
1246            'allow_warning',
1247            'may_reconnect',
1248            'random_hsm',
1249            'feerates',
1250            'wait_for_bitcoind_sync',
1251            'allow_bad_gossip',
1252            'start',
1253        ]
1254        node_opts = {k: v for k, v in opts.items() if k in node_opt_keys}
1255        cli_opts = {k: v for k, v in opts.items() if k not in node_opt_keys}
1256        return node_opts, cli_opts
1257
1258    def get_next_port(self):
1259        with self.lock:
1260            return reserve()
1261
1262    def get_node_id(self):
1263        """Generate a unique numeric ID for a lightning node
1264        """
1265        with self.lock:
1266            node_id = self.next_id
1267            self.next_id += 1
1268            return node_id
1269
1270    def get_nodes(self, num_nodes, opts=None):
1271        """Start a number of nodes in parallel, each with its own options
1272        """
1273        if opts is None:
1274            # No opts were passed in, give some dummy opts
1275            opts = [{} for _ in range(num_nodes)]
1276        elif isinstance(opts, dict):
1277            # A single dict was passed in, so we use these opts for all nodes
1278            opts = [opts] * num_nodes
1279
1280        assert len(opts) == num_nodes
1281
1282        jobs = []
1283        for i in range(num_nodes):
1284            node_opts, cli_opts = self.split_options(opts[i])
1285            jobs.append(self.executor.submit(
1286                self.get_node, options=cli_opts,
1287                node_id=self.get_node_id(), **node_opts
1288            ))
1289
1290        return [j.result() for j in jobs]
1291
1292    def get_node(self, node_id=None, options=None, dbfile=None,
1293                 feerates=(15000, 11000, 7500, 3750), start=True,
1294                 wait_for_bitcoind_sync=True, may_fail=False,
1295                 expect_fail=False, cleandir=True, **kwargs):
1296        self.throttler.wait()
1297        node_id = self.get_node_id() if not node_id else node_id
1298        port = self.get_next_port()
1299
1300        lightning_dir = os.path.join(
1301            self.directory, "lightning-{}/".format(node_id))
1302
1303        if cleandir and os.path.exists(lightning_dir):
1304            shutil.rmtree(lightning_dir)
1305
1306        # Get the DB backend DSN we should be using for this test and this
1307        # node.
1308        db = self.db_provider.get_db(os.path.join(lightning_dir, TEST_NETWORK), self.testname, node_id)
1309        node = self.node_cls(
1310            node_id, lightning_dir, self.bitcoind, self.executor, self.valgrind, db=db,
1311            port=port, options=options, may_fail=may_fail or expect_fail,
1312            jsonschemas=self.jsonschemas,
1313            **kwargs
1314        )
1315
1316        # Regtest estimatefee are unusable, so override.
1317        node.set_feerates(feerates, False)
1318
1319        self.nodes.append(node)
1320        if dbfile:
1321            out = open(os.path.join(node.daemon.lightning_dir, TEST_NETWORK,
1322                                    'lightningd.sqlite3'), 'xb')
1323            with lzma.open(os.path.join('tests/data', dbfile), 'rb') as f:
1324                out.write(f.read())
1325
1326        if start:
1327            try:
1328                # Capture stderr if we're failing
1329                if expect_fail:
1330                    stderr = subprocess.PIPE
1331                else:
1332                    stderr = None
1333                node.start(wait_for_bitcoind_sync, stderr=stderr)
1334            except Exception:
1335                if expect_fail:
1336                    return node
1337                node.daemon.stop()
1338                raise
1339        return node
1340
1341    def join_nodes(self, nodes, fundchannel=True, fundamount=FUNDAMOUNT, wait_for_announce=False, announce_channels=True) -> None:
1342        """Given nodes, connect them in a line, optionally funding a channel"""
1343        assert not (wait_for_announce and not announce_channels), "You've asked to wait for an announcement that's not coming. (wait_for_announce=True,announce_channels=False)"
1344        connections = [(nodes[i], nodes[i + 1]) for i in range(len(nodes) - 1)]
1345
1346        for src, dst in connections:
1347            src.rpc.connect(dst.info['id'], 'localhost', dst.port)
1348
1349        # If we're returning now, make sure dst all show connections in
1350        # getpeers.
1351        if not fundchannel:
1352            for src, dst in connections:
1353                dst.daemon.wait_for_log(r'{}-.*-chan#[0-9]*: Handed peer, entering loop'.format(src.info['id']))
1354            return
1355
1356        bitcoind = nodes[0].bitcoin
1357        # If we got here, we want to fund channels
1358        for src, dst in connections:
1359            addr = src.rpc.newaddr()['bech32']
1360            bitcoind.rpc.sendtoaddress(addr, (fundamount + 1000000) / 10**8)
1361
1362        bitcoind.generate_block(1)
1363        sync_blockheight(bitcoind, nodes)
1364        txids = []
1365        for src, dst in connections:
1366            txids.append(src.rpc.fundchannel(dst.info['id'], fundamount, announce=announce_channels)['txid'])
1367
1368        wait_for(lambda: set(txids).issubset(set(bitcoind.rpc.getrawmempool())))
1369
1370        # Confirm all channels and wait for them to become usable
1371        bitcoind.generate_block(1)
1372        scids = []
1373        for src, dst in connections:
1374            wait_for(lambda: src.channel_state(dst) == 'CHANNELD_NORMAL')
1375            scid = src.get_channel_scid(dst)
1376            scids.append(scid)
1377
1378        # Wait for all channels to be active (locally)
1379        for i, n in enumerate(scids):
1380            nodes[i].wait_channel_active(scids[i])
1381            nodes[i + 1].wait_channel_active(scids[i])
1382
1383        if not wait_for_announce:
1384            return
1385
1386        bitcoind.generate_block(5)
1387
1388        # Make sure everyone sees all channels: we can cheat and
1389        # simply check the ends (since it's a line).
1390        nodes[0].wait_channel_active(scids[-1])
1391        nodes[-1].wait_channel_active(scids[0])
1392
1393        # Make sure we have all node announcements, too (just check ends)
1394        for n in nodes:
1395            for end in (nodes[0], nodes[-1]):
1396                wait_for(lambda: 'alias' in only_one(end.rpc.listnodes(n.info['id'])['nodes']))
1397
1398    def line_graph(self, num_nodes, fundchannel=True, fundamount=FUNDAMOUNT, wait_for_announce=False, opts=None, announce_channels=True):
1399        """ Create nodes, connect them and optionally fund channels.
1400        """
1401        nodes = self.get_nodes(num_nodes, opts=opts)
1402
1403        self.join_nodes(nodes, fundchannel, fundamount, wait_for_announce, announce_channels)
1404        return nodes
1405
1406    def killall(self, expected_successes):
1407        """Returns true if every node we expected to succeed actually succeeded"""
1408        unexpected_fail = False
1409        err_msgs = []
1410        for i in range(len(self.nodes)):
1411            leaks = None
1412            # leak detection upsets VALGRIND by reading uninitialized mem.
1413            # If it's dead, we'll catch it below.
1414            if not self.valgrind and DEVELOPER:
1415                try:
1416                    # This also puts leaks in log.
1417                    leaks = self.nodes[i].rpc.dev_memleak()['leaks']
1418                except Exception:
1419                    pass
1420
1421            try:
1422                self.nodes[i].stop()
1423            except Exception:
1424                if expected_successes[i]:
1425                    unexpected_fail = True
1426
1427            if leaks is not None and len(leaks) != 0:
1428                unexpected_fail = True
1429                err_msgs.append("Node {} has memory leaks: {}".format(
1430                    self.nodes[i].daemon.lightning_dir,
1431                    json.dumps(leaks, sort_keys=True, indent=4)
1432                ))
1433
1434        return not unexpected_fail, err_msgs
1435