1from bitcoin.core import COIN # type: ignore 2from bitcoin.rpc import RawProxy as BitcoinProxy # type: ignore 3from bitcoin.rpc import JSONRPCError 4from contextlib import contextmanager 5from pathlib import Path 6from pyln.client import RpcError 7from pyln.testing.btcproxy import BitcoinRpcProxy 8from collections import OrderedDict 9from decimal import Decimal 10from ephemeral_port_reserve import reserve # type: ignore 11from pyln.client import LightningRpc 12from pyln.client import Millisatoshi 13 14import json 15import logging 16import lzma 17import math 18import os 19import psutil # type: ignore 20import random 21import re 22import shutil 23import sqlite3 24import string 25import struct 26import subprocess 27import sys 28import threading 29import time 30import warnings 31 32BITCOIND_CONFIG = { 33 "regtest": 1, 34 "rpcuser": "rpcuser", 35 "rpcpassword": "rpcpass", 36 "fallbackfee": Decimal(1000) / COIN, 37} 38 39 40LIGHTNINGD_CONFIG = OrderedDict({ 41 "log-level": "debug", 42 "cltv-delta": 6, 43 "cltv-final": 5, 44 "watchtime-blocks": 5, 45 "rescan": 1, 46 'disable-dns': None, 47}) 48 49FUNDAMOUNT = 10**6 50 51 52def env(name, default=None): 53 """Access to environment variables 54 55 Allows access to environment variables, falling back to config.vars (part 56 of c-lightning's `./configure` output), and finally falling back to a 57 default value. 58 59 """ 60 fname = 'config.vars' 61 if os.path.exists(fname): 62 lines = open(fname, 'r').readlines() 63 config = dict([(line.rstrip().split('=', 1)) for line in lines]) 64 else: 65 config = {} 66 67 if name in os.environ: 68 return os.environ[name] 69 elif name in config: 70 return config[name] 71 else: 72 return default 73 74 75VALGRIND = env("VALGRIND") == "1" 76TEST_NETWORK = env("TEST_NETWORK", 'regtest') 77DEVELOPER = env("DEVELOPER", "0") == "1" 78TEST_DEBUG = env("TEST_DEBUG", "0") == "1" 79SLOW_MACHINE = env("SLOW_MACHINE", "0") == "1" 80DEPRECATED_APIS = env("DEPRECATED_APIS", "0") == "1" 81TIMEOUT = int(env("TIMEOUT", 180 if SLOW_MACHINE else 60)) 82EXPERIMENTAL_DUAL_FUND = env("EXPERIMENTAL_DUAL_FUND", "0") == "1" 83 84 85def wait_for(success, timeout=TIMEOUT): 86 start_time = time.time() 87 interval = 0.25 88 while not success(): 89 time_left = start_time + timeout - time.time() 90 if time_left <= 0: 91 raise ValueError("Timeout while waiting for {}", success) 92 time.sleep(min(interval, time_left)) 93 interval *= 2 94 if interval > 5: 95 interval = 5 96 97 98def write_config(filename, opts, regtest_opts=None, section_name='regtest'): 99 with open(filename, 'w') as f: 100 for k, v in opts.items(): 101 f.write("{}={}\n".format(k, v)) 102 if regtest_opts: 103 f.write("[{}]\n".format(section_name)) 104 for k, v in regtest_opts.items(): 105 f.write("{}={}\n".format(k, v)) 106 107 108def only_one(arr): 109 """Many JSON RPC calls return an array; often we only expect a single entry 110 """ 111 assert len(arr) == 1 112 return arr[0] 113 114 115def sync_blockheight(bitcoind, nodes): 116 height = bitcoind.rpc.getblockchaininfo()['blocks'] 117 for n in nodes: 118 wait_for(lambda: n.rpc.getinfo()['blockheight'] == height) 119 120 121def wait_channel_quiescent(n1, n2): 122 wait_for(lambda: only_one(only_one(n1.rpc.listpeers(n2.info['id'])['peers'])['channels'])['htlcs'] == []) 123 wait_for(lambda: only_one(only_one(n2.rpc.listpeers(n1.info['id'])['peers'])['channels'])['htlcs'] == []) 124 125 126def get_tx_p2wsh_outnum(bitcoind, tx, amount): 127 """Get output number of this tx which is p2wsh of amount""" 128 decoded = bitcoind.rpc.decoderawtransaction(tx, True) 129 130 for out in decoded['vout']: 131 if out['scriptPubKey']['type'] == 'witness_v0_scripthash': 132 if out['value'] == Decimal(amount) / 10**8: 133 return out['n'] 134 135 return None 136 137 138class TailableProc(object): 139 """A monitorable process that we can start, stop and tail. 140 141 This is the base class for the daemons. It allows us to directly 142 tail the processes and react to their output. 143 """ 144 145 def __init__(self, outputDir=None, verbose=True): 146 self.logs = [] 147 self.logs_cond = threading.Condition(threading.RLock()) 148 self.env = os.environ.copy() 149 self.running = False 150 self.proc = None 151 self.outputDir = outputDir 152 self.logsearch_start = 0 153 self.err_logs = [] 154 self.prefix = "" 155 156 # Should we be logging lines we read from stdout? 157 self.verbose = verbose 158 159 # A filter function that'll tell us whether to filter out the line (not 160 # pass it to the log matcher and not print it to stdout). 161 self.log_filter = lambda line: False 162 163 def start(self, stdin=None, stdout=None, stderr=None): 164 """Start the underlying process and start monitoring it. 165 """ 166 logging.debug("Starting '%s'", " ".join(self.cmd_line)) 167 self.proc = subprocess.Popen(self.cmd_line, 168 stdin=stdin, 169 stdout=stdout if stdout else subprocess.PIPE, 170 stderr=stderr, 171 env=self.env) 172 self.thread = threading.Thread(target=self.tail) 173 self.thread.daemon = True 174 self.thread.start() 175 self.running = True 176 177 def save_log(self): 178 if self.outputDir: 179 logpath = os.path.join(self.outputDir, 'log') 180 with open(logpath, 'w') as f: 181 for l in self.logs: 182 f.write(l + '\n') 183 184 def stop(self, timeout=10): 185 self.save_log() 186 self.proc.terminate() 187 188 # Now give it some time to react to the signal 189 rc = self.proc.wait(timeout) 190 191 if rc is None: 192 self.proc.kill() 193 194 self.proc.wait() 195 self.thread.join() 196 197 return self.proc.returncode 198 199 def kill(self): 200 """Kill process without giving it warning.""" 201 self.proc.kill() 202 self.proc.wait() 203 self.thread.join() 204 205 def tail(self): 206 """Tail the stdout of the process and remember it. 207 208 Stores the lines of output produced by the process in 209 self.logs and signals that a new line was read so that it can 210 be picked up by consumers. 211 """ 212 for line in iter(self.proc.stdout.readline, ''): 213 if len(line) == 0: 214 break 215 216 line = line.decode('UTF-8', 'replace').rstrip() 217 218 if self.log_filter(line): 219 continue 220 221 if self.verbose: 222 sys.stdout.write("{}: {}\n".format(self.prefix, line)) 223 224 with self.logs_cond: 225 self.logs.append(line) 226 self.logs_cond.notifyAll() 227 228 self.running = False 229 self.proc.stdout.close() 230 231 if self.proc.stderr: 232 for line in iter(self.proc.stderr.readline, ''): 233 234 if line is None or len(line) == 0: 235 break 236 237 line = line.rstrip().decode('UTF-8', 'replace') 238 self.err_logs.append(line) 239 240 self.proc.stderr.close() 241 242 def is_in_log(self, regex, start=0): 243 """Look for `regex` in the logs.""" 244 245 ex = re.compile(regex) 246 for l in self.logs[start:]: 247 if ex.search(l): 248 logging.debug("Found '%s' in logs", regex) 249 return l 250 251 logging.debug("Did not find '%s' in logs", regex) 252 return None 253 254 def is_in_stderr(self, regex): 255 """Look for `regex` in stderr.""" 256 257 ex = re.compile(regex) 258 for l in self.err_logs: 259 if ex.search(l): 260 logging.debug("Found '%s' in stderr", regex) 261 return l 262 263 logging.debug("Did not find '%s' in stderr", regex) 264 return None 265 266 def wait_for_logs(self, regexs, timeout=TIMEOUT): 267 """Look for `regexs` in the logs. 268 269 We tail the stdout of the process and look for each regex in `regexs`, 270 starting from last of the previous waited-for log entries (if any). We 271 fail if the timeout is exceeded or if the underlying process 272 exits before all the `regexs` were found. 273 274 If timeout is None, no time-out is applied. 275 """ 276 logging.debug("Waiting for {} in the logs".format(regexs)) 277 exs = [re.compile(r) for r in regexs] 278 start_time = time.time() 279 pos = self.logsearch_start 280 while True: 281 if timeout is not None and time.time() > start_time + timeout: 282 print("Time-out: can't find {} in logs".format(exs)) 283 for r in exs: 284 if self.is_in_log(r): 285 print("({} was previously in logs!)".format(r)) 286 raise TimeoutError('Unable to find "{}" in logs.'.format(exs)) 287 288 with self.logs_cond: 289 if pos >= len(self.logs): 290 if not self.running: 291 raise ValueError('Process died while waiting for logs') 292 self.logs_cond.wait(1) 293 continue 294 295 for r in exs.copy(): 296 self.logsearch_start = pos + 1 297 if r.search(self.logs[pos]): 298 logging.debug("Found '%s' in logs", r) 299 exs.remove(r) 300 break 301 if len(exs) == 0: 302 return self.logs[pos] 303 pos += 1 304 305 def wait_for_log(self, regex, timeout=TIMEOUT): 306 """Look for `regex` in the logs. 307 308 Convenience wrapper for the common case of only seeking a single entry. 309 """ 310 return self.wait_for_logs([regex], timeout) 311 312 313class SimpleBitcoinProxy: 314 """Wrapper for BitcoinProxy to reconnect. 315 316 Long wait times between calls to the Bitcoin RPC could result in 317 `bitcoind` closing the connection, so here we just create 318 throwaway connections. This is easier than to reach into the RPC 319 library to close, reopen and reauth upon failure. 320 """ 321 def __init__(self, btc_conf_file, *args, **kwargs): 322 self.__btc_conf_file__ = btc_conf_file 323 324 def __getattr__(self, name): 325 if name.startswith('__') and name.endswith('__'): 326 # Python internal stuff 327 raise AttributeError 328 329 # Create a callable to do the actual call 330 proxy = BitcoinProxy(btc_conf_file=self.__btc_conf_file__) 331 332 def f(*args): 333 logging.debug("Calling {name} with arguments {args}".format( 334 name=name, 335 args=args 336 )) 337 res = proxy._call(name, *args) 338 logging.debug("Result for {name} call: {res}".format( 339 name=name, 340 res=res, 341 )) 342 return res 343 344 # Make debuggers show <function bitcoin.rpc.name> rather than <function 345 # bitcoin.rpc.<lambda>> 346 f.__name__ = name 347 return f 348 349 350class BitcoinD(TailableProc): 351 352 def __init__(self, bitcoin_dir="/tmp/bitcoind-test", rpcport=None): 353 TailableProc.__init__(self, bitcoin_dir, verbose=False) 354 355 if rpcport is None: 356 rpcport = reserve() 357 358 self.bitcoin_dir = bitcoin_dir 359 self.rpcport = rpcport 360 self.prefix = 'bitcoind' 361 362 regtestdir = os.path.join(bitcoin_dir, 'regtest') 363 if not os.path.exists(regtestdir): 364 os.makedirs(regtestdir) 365 366 self.cmd_line = [ 367 'bitcoind', 368 '-datadir={}'.format(bitcoin_dir), 369 '-printtoconsole', 370 '-server', 371 '-logtimestamps', 372 '-nolisten', 373 '-txindex', 374 '-nowallet', 375 '-addresstype=bech32' 376 ] 377 # For up to and including 0.16.1, this needs to be in main section. 378 BITCOIND_CONFIG['rpcport'] = rpcport 379 # For after 0.16.1 (eg. 3f398d7a17f136cd4a67998406ca41a124ae2966), this 380 # needs its own [regtest] section. 381 BITCOIND_REGTEST = {'rpcport': rpcport} 382 self.conf_file = os.path.join(bitcoin_dir, 'bitcoin.conf') 383 write_config(self.conf_file, BITCOIND_CONFIG, BITCOIND_REGTEST) 384 self.rpc = SimpleBitcoinProxy(btc_conf_file=self.conf_file) 385 self.proxies = [] 386 387 def start(self): 388 TailableProc.start(self) 389 self.wait_for_log("Done loading", timeout=TIMEOUT) 390 391 logging.info("BitcoinD started") 392 try: 393 self.rpc.createwallet("lightningd-tests") 394 except JSONRPCError: 395 self.rpc.loadwallet("lightningd-tests") 396 397 def stop(self): 398 for p in self.proxies: 399 p.stop() 400 self.rpc.stop() 401 return TailableProc.stop(self) 402 403 def get_proxy(self): 404 proxy = BitcoinRpcProxy(self) 405 self.proxies.append(proxy) 406 proxy.start() 407 return proxy 408 409 # wait_for_mempool can be used to wait for the mempool before generating blocks: 410 # True := wait for at least 1 transation 411 # int > 0 := wait for at least N transactions 412 # 'tx_id' := wait for one transaction id given as a string 413 # ['tx_id1', 'tx_id2'] := wait until all of the specified transaction IDs 414 def generate_block(self, numblocks=1, wait_for_mempool=0): 415 if wait_for_mempool: 416 if isinstance(wait_for_mempool, str): 417 wait_for_mempool = [wait_for_mempool] 418 if isinstance(wait_for_mempool, list): 419 wait_for(lambda: all(txid in self.rpc.getrawmempool() for txid in wait_for_mempool)) 420 else: 421 wait_for(lambda: len(self.rpc.getrawmempool()) >= wait_for_mempool) 422 423 mempool = self.rpc.getrawmempool() 424 logging.debug("Generating {numblocks}, confirming {lenmempool} transactions: {mempool}".format( 425 numblocks=numblocks, 426 mempool=mempool, 427 lenmempool=len(mempool), 428 )) 429 430 # As of 0.16, generate() is removed; use generatetoaddress. 431 return self.rpc.generatetoaddress(numblocks, self.rpc.getnewaddress()) 432 433 def simple_reorg(self, height, shift=0): 434 """ 435 Reorganize chain by creating a fork at height=[height] and re-mine all mempool 436 transactions into [height + shift], where shift >= 0. Returns hashes of generated 437 blocks. 438 439 Note that tx's that become invalid at [height] (because coin maturity, locktime 440 etc.) are removed from mempool. The length of the new chain will be original + 1 441 OR original + [shift], whichever is larger. 442 443 For example: to push tx's backward from height h1 to h2 < h1, use [height]=h2. 444 445 Or to change the txindex of tx's at height h1: 446 1. A block at height h2 < h1 should contain a non-coinbase tx that can be pulled 447 forward to h1. 448 2. Set [height]=h2 and [shift]= h1-h2 449 """ 450 hashes = [] 451 fee_delta = 1000000 452 orig_len = self.rpc.getblockcount() 453 old_hash = self.rpc.getblockhash(height) 454 final_len = height + shift if height + shift > orig_len else 1 + orig_len 455 # TODO: raise error for insane args? 456 457 self.rpc.invalidateblock(old_hash) 458 self.wait_for_log(r'InvalidChainFound: invalid block=.* height={}'.format(height)) 459 memp = self.rpc.getrawmempool() 460 461 if shift == 0: 462 hashes += self.generate_block(1 + final_len - height) 463 else: 464 for txid in memp: 465 # lower priority (to effective feerate=0) so they are not mined 466 self.rpc.prioritisetransaction(txid, None, -fee_delta) 467 hashes += self.generate_block(shift) 468 469 for txid in memp: 470 # restore priority so they are mined 471 self.rpc.prioritisetransaction(txid, None, fee_delta) 472 hashes += self.generate_block(1 + final_len - (height + shift)) 473 self.wait_for_log(r'UpdateTip: new best=.* height={}'.format(final_len)) 474 return hashes 475 476 def getnewaddress(self): 477 return self.rpc.getnewaddress() 478 479 480class ElementsD(BitcoinD): 481 def __init__(self, bitcoin_dir="/tmp/bitcoind-test", rpcport=None): 482 config = BITCOIND_CONFIG.copy() 483 if 'regtest' in config: 484 del config['regtest'] 485 486 config['chain'] = 'liquid-regtest' 487 BitcoinD.__init__(self, bitcoin_dir, rpcport) 488 489 self.cmd_line = [ 490 'elementsd', 491 '-datadir={}'.format(bitcoin_dir), 492 '-printtoconsole', 493 '-server', 494 '-logtimestamps', 495 '-nolisten', 496 '-nowallet', 497 '-validatepegin=0', 498 '-con_blocksubsidy=5000000000', 499 ] 500 conf_file = os.path.join(bitcoin_dir, 'elements.conf') 501 config['rpcport'] = self.rpcport 502 BITCOIND_REGTEST = {'rpcport': self.rpcport} 503 write_config(conf_file, config, BITCOIND_REGTEST, section_name='liquid-regtest') 504 self.conf_file = conf_file 505 self.rpc = SimpleBitcoinProxy(btc_conf_file=self.conf_file) 506 self.prefix = 'elementsd' 507 508 def getnewaddress(self): 509 """Need to get an address and then make it unconfidential 510 """ 511 addr = self.rpc.getnewaddress() 512 info = self.rpc.getaddressinfo(addr) 513 return info['unconfidential'] 514 515 516class LightningD(TailableProc): 517 def __init__(self, lightning_dir, bitcoindproxy, port=9735, random_hsm=False, node_id=0): 518 TailableProc.__init__(self, lightning_dir) 519 self.executable = 'lightningd' 520 self.lightning_dir = lightning_dir 521 self.port = port 522 self.cmd_prefix = [] 523 self.disconnect_file = None 524 525 self.rpcproxy = bitcoindproxy 526 527 self.opts = LIGHTNINGD_CONFIG.copy() 528 opts = { 529 'lightning-dir': lightning_dir, 530 'addr': '127.0.0.1:{}'.format(port), 531 'allow-deprecated-apis': '{}'.format("true" if DEPRECATED_APIS 532 else "false"), 533 'network': TEST_NETWORK, 534 'ignore-fee-limits': 'false', 535 'bitcoin-rpcuser': BITCOIND_CONFIG['rpcuser'], 536 'bitcoin-rpcpassword': BITCOIND_CONFIG['rpcpassword'], 537 538 # Make sure we don't touch any existing config files in the user's $HOME 539 'bitcoin-datadir': lightning_dir, 540 } 541 542 for k, v in opts.items(): 543 self.opts[k] = v 544 545 if not os.path.exists(os.path.join(lightning_dir, TEST_NETWORK)): 546 os.makedirs(os.path.join(lightning_dir, TEST_NETWORK)) 547 548 # Last 32-bytes of final part of dir -> seed. 549 seed = (bytes(re.search('([^/]+)/*$', lightning_dir).group(1), encoding='utf-8') + bytes(32))[:32] 550 if not random_hsm: 551 with open(os.path.join(lightning_dir, TEST_NETWORK, 'hsm_secret'), 'wb') as f: 552 f.write(seed) 553 if DEVELOPER: 554 self.opts['dev-fast-gossip'] = None 555 self.opts['dev-bitcoind-poll'] = 1 556 self.prefix = 'lightningd-%d' % (node_id) 557 558 def cleanup(self): 559 # To force blackhole to exit, disconnect file must be truncated! 560 if self.disconnect_file: 561 with open(self.disconnect_file, "w") as f: 562 f.truncate() 563 564 @property 565 def cmd_line(self): 566 567 opts = [] 568 for k, v in self.opts.items(): 569 if v is None: 570 opts.append("--{}".format(k)) 571 elif isinstance(v, list): 572 for i in v: 573 opts.append("--{}={}".format(k, i)) 574 else: 575 opts.append("--{}={}".format(k, v)) 576 577 return self.cmd_prefix + [self.executable] + opts 578 579 def start(self, stdin=None, stdout=None, stderr=None, 580 wait_for_initialized=True): 581 self.opts['bitcoin-rpcport'] = self.rpcproxy.rpcport 582 TailableProc.start(self, stdin, stdout, stderr) 583 if wait_for_initialized: 584 self.wait_for_log("Server started with public key") 585 logging.info("LightningD started") 586 587 def wait(self, timeout=10): 588 """Wait for the daemon to stop for up to timeout seconds 589 590 Returns the returncode of the process, None if the process did 591 not return before the timeout triggers. 592 """ 593 self.proc.wait(timeout) 594 return self.proc.returncode 595 596 597class PrettyPrintingLightningRpc(LightningRpc): 598 """A version of the LightningRpc that pretty-prints calls and results. 599 600 Useful when debugging based on logs, and less painful to the 601 eyes. It has some overhead since we re-serialize the request and 602 result to json in order to pretty print it. 603 604 Also validates (optional) schemas for us. 605 """ 606 def __init__(self, socket_path, executor=None, logger=logging, 607 patch_json=True, jsonschemas={}): 608 super().__init__( 609 socket_path, 610 executor, 611 logger, 612 patch_json, 613 ) 614 self.jsonschemas = jsonschemas 615 616 def call(self, method, payload=None): 617 id = self.next_id 618 self.logger.debug(json.dumps({ 619 "id": id, 620 "method": method, 621 "params": payload 622 }, indent=2)) 623 res = LightningRpc.call(self, method, payload) 624 self.logger.debug(json.dumps({ 625 "id": id, 626 "result": res 627 }, indent=2)) 628 629 if method in self.jsonschemas: 630 self.jsonschemas[method].validate(res) 631 632 return res 633 634 635class LightningNode(object): 636 def __init__(self, node_id, lightning_dir, bitcoind, executor, valgrind, may_fail=False, 637 may_reconnect=False, 638 allow_broken_log=False, 639 allow_warning=False, 640 allow_bad_gossip=False, 641 db=None, port=None, disconnect=None, random_hsm=None, options=None, 642 jsonschemas={}, 643 **kwargs): 644 self.bitcoin = bitcoind 645 self.executor = executor 646 self.may_fail = may_fail 647 self.may_reconnect = may_reconnect 648 self.allow_broken_log = allow_broken_log 649 self.allow_bad_gossip = allow_bad_gossip 650 self.allow_warning = allow_warning 651 self.db = db 652 653 # Assume successful exit 654 self.rc = 0 655 656 socket_path = os.path.join(lightning_dir, TEST_NETWORK, "lightning-rpc").format(node_id) 657 self.rpc = PrettyPrintingLightningRpc(socket_path, self.executor, jsonschemas=jsonschemas) 658 659 self.daemon = LightningD( 660 lightning_dir, bitcoindproxy=bitcoind.get_proxy(), 661 port=port, random_hsm=random_hsm, node_id=node_id 662 ) 663 # If we have a disconnect string, dump it to a file for daemon. 664 if disconnect: 665 self.daemon.disconnect_file = os.path.join(lightning_dir, TEST_NETWORK, "dev_disconnect") 666 with open(self.daemon.disconnect_file, "w") as f: 667 f.write("\n".join(disconnect)) 668 self.daemon.opts["dev-disconnect"] = "dev_disconnect" 669 if DEVELOPER: 670 self.daemon.opts["dev-fail-on-subdaemon-fail"] = None 671 # Don't run --version on every subdaemon if we're valgrinding and slow. 672 if SLOW_MACHINE and VALGRIND: 673 self.daemon.opts["dev-no-version-checks"] = None 674 if os.getenv("DEBUG_SUBD"): 675 self.daemon.opts["dev-debugger"] = os.getenv("DEBUG_SUBD") 676 if valgrind: 677 self.daemon.env["LIGHTNINGD_DEV_NO_BACKTRACE"] = "1" 678 else: 679 # Under valgrind, scanning can access uninitialized mem. 680 self.daemon.env["LIGHTNINGD_DEV_MEMLEAK"] = "1" 681 if not may_reconnect: 682 self.daemon.opts["dev-no-reconnect"] = None 683 if EXPERIMENTAL_DUAL_FUND: 684 self.daemon.opts["experimental-dual-fund"] = None 685 686 if options is not None: 687 self.daemon.opts.update(options) 688 dsn = db.get_dsn() 689 if dsn is not None: 690 self.daemon.opts['wallet'] = dsn 691 if valgrind: 692 self.daemon.cmd_prefix = [ 693 'valgrind', 694 '-q', 695 '--trace-children=yes', 696 '--trace-children-skip=*python*,*bitcoin-cli*,*elements-cli*', 697 '--error-exitcode=7', 698 '--log-file={}/valgrind-errors.%p'.format(self.daemon.lightning_dir) 699 ] 700 # Reduce precision of errors, speeding startup and reducing memory greatly: 701 if SLOW_MACHINE: 702 self.daemon.cmd_prefix += ['--read-inline-info=no'] 703 704 def connect(self, remote_node): 705 self.rpc.connect(remote_node.info['id'], '127.0.0.1', remote_node.daemon.port) 706 707 def is_connected(self, remote_node): 708 return remote_node.info['id'] in [p['id'] for p in self.rpc.listpeers()['peers']] 709 710 def openchannel(self, remote_node, capacity=FUNDAMOUNT, addrtype="p2sh-segwit", confirm=True, wait_for_announce=True, connect=True): 711 addr, wallettxid = self.fundwallet(10 * capacity, addrtype) 712 713 if connect and not self.is_connected(remote_node): 714 self.connect(remote_node) 715 716 fundingtx = self.rpc.fundchannel(remote_node.info['id'], capacity) 717 718 # Wait for the funding transaction to be in bitcoind's mempool 719 wait_for(lambda: fundingtx['txid'] in self.bitcoin.rpc.getrawmempool()) 720 721 if confirm or wait_for_announce: 722 self.bitcoin.generate_block(1) 723 724 if wait_for_announce: 725 self.bitcoin.generate_block(5) 726 727 if confirm or wait_for_announce: 728 self.daemon.wait_for_log( 729 r'Funding tx {} depth'.format(fundingtx['txid'])) 730 return {'address': addr, 'wallettxid': wallettxid, 'fundingtx': fundingtx} 731 732 def fundwallet(self, sats, addrtype="p2sh-segwit", mine_block=True): 733 addr = self.rpc.newaddr(addrtype)[addrtype] 734 txid = self.bitcoin.rpc.sendtoaddress(addr, sats / 10**8) 735 if mine_block: 736 self.bitcoin.generate_block(1) 737 self.daemon.wait_for_log('Owning output .* txid {} CONFIRMED'.format(txid)) 738 return addr, txid 739 740 def fundbalancedchannel(self, remote_node, total_capacity, announce=True): 741 ''' 742 Creates a perfectly-balanced channel, as all things should be. 743 ''' 744 if isinstance(total_capacity, Millisatoshi): 745 total_capacity = int(total_capacity.to_satoshi()) 746 else: 747 total_capacity = int(total_capacity) 748 749 self.fundwallet(total_capacity + 10000) 750 751 if remote_node.config('experimental-dual-fund'): 752 remote_node.fundwallet(total_capacity + 10000) 753 # We cut the total_capacity in half, since the peer's 754 # expected to contribute that same amount 755 chan_capacity = total_capacity // 2 756 total_capacity = chan_capacity * 2 757 # Tell the node to equally dual-fund the channel 758 remote_node.rpc.call('funderupdate', {'policy': 'match', 759 'policy_mod': 100, 760 'fuzz_percent': 0}) 761 else: 762 chan_capacity = total_capacity 763 764 self.rpc.connect(remote_node.info['id'], 'localhost', remote_node.port) 765 766 # Make sure the fundchannel is confirmed. 767 num_tx = len(self.bitcoin.rpc.getrawmempool()) 768 res = self.rpc.fundchannel(remote_node.info['id'], chan_capacity, feerate='slow', minconf=0, announce=announce, push_msat=Millisatoshi(chan_capacity * 500)) 769 wait_for(lambda: len(self.bitcoin.rpc.getrawmempool()) == num_tx + 1) 770 blockid = self.bitcoin.generate_block(1)[0] 771 772 # Generate the scid. 773 outnum = get_tx_p2wsh_outnum(self.bitcoin, res['tx'], total_capacity) 774 if outnum is None: 775 raise ValueError("no outnum found. capacity {} tx {}".format(total_capacity, res['tx'])) 776 777 for i, txid in enumerate(self.bitcoin.rpc.getblock(blockid)['tx']): 778 if txid == res['txid']: 779 txnum = i 780 781 return '{}x{}x{}'.format(self.bitcoin.rpc.getblockcount(), txnum, outnum) 782 783 def getactivechannels(self): 784 return [c for c in self.rpc.listchannels()['channels'] if c['active']] 785 786 def db_query(self, query): 787 return self.db.query(query) 788 789 # Assumes node is stopped! 790 def db_manip(self, query): 791 db = sqlite3.connect(os.path.join(self.daemon.lightning_dir, TEST_NETWORK, "lightningd.sqlite3")) 792 db.row_factory = sqlite3.Row 793 c = db.cursor() 794 c.execute(query) 795 db.commit() 796 c.close() 797 db.close() 798 799 def is_synced_with_bitcoin(self, info=None): 800 if info is None: 801 info = self.rpc.getinfo() 802 return 'warning_bitcoind_sync' not in info and 'warning_lightningd_sync' not in info 803 804 def start(self, wait_for_bitcoind_sync=True, stderr=None): 805 self.daemon.start(stderr=stderr) 806 # Cache `getinfo`, we'll be using it a lot 807 self.info = self.rpc.getinfo() 808 # This shortcut is sufficient for our simple tests. 809 self.port = self.info['binding'][0]['port'] 810 if wait_for_bitcoind_sync and not self.is_synced_with_bitcoin(self.info): 811 wait_for(lambda: self.is_synced_with_bitcoin()) 812 813 def stop(self, timeout=10): 814 """ Attempt to do a clean shutdown, but kill if it hangs 815 """ 816 817 # Tell the daemon to stop 818 try: 819 # May fail if the process already died 820 self.rpc.stop() 821 except Exception: 822 pass 823 824 self.rc = self.daemon.wait(timeout) 825 826 # If it did not stop be more insistent 827 if self.rc is None: 828 self.rc = self.daemon.stop() 829 830 self.daemon.save_log() 831 self.daemon.cleanup() 832 833 if self.rc != 0 and not self.may_fail: 834 raise ValueError("Node did not exit cleanly, rc={}".format(self.rc)) 835 else: 836 return self.rc 837 838 def restart(self, timeout=10, clean=True): 839 """Stop and restart the lightning node. 840 841 Keyword arguments: 842 timeout: number of seconds to wait for a shutdown 843 clean: whether to issue a `stop` RPC command before killing 844 """ 845 if clean: 846 self.stop(timeout) 847 else: 848 self.daemon.stop() 849 850 self.start() 851 852 def fund_channel(self, l2, amount, wait_for_active=True, announce_channel=True): 853 warnings.warn("LightningNode.fund_channel is deprecated in favor of " 854 "LightningNode.fundchannel", category=DeprecationWarning) 855 return self.fundchannel(l2, amount, wait_for_active, announce_channel) 856 857 def fundchannel(self, l2, amount=FUNDAMOUNT, wait_for_active=True, 858 announce_channel=True, **kwargs): 859 # Give yourself some funds to work with 860 addr = self.rpc.newaddr()['bech32'] 861 862 def has_funds_on_addr(addr): 863 """Check if the given address has funds in the internal wallet. 864 """ 865 outs = self.rpc.listfunds()['outputs'] 866 addrs = [o['address'] for o in outs] 867 return addr in addrs 868 869 # We should not have funds on that address yet, we just generated it. 870 assert(not has_funds_on_addr(addr)) 871 872 self.bitcoin.rpc.sendtoaddress(addr, (amount + 1000000) / 10**8) 873 self.bitcoin.generate_block(1) 874 875 # Now we should. 876 wait_for(lambda: has_funds_on_addr(addr)) 877 878 # Now go ahead and open a channel 879 res = self.rpc.fundchannel(l2.info['id'], amount, 880 announce=announce_channel, 881 **kwargs) 882 wait_for(lambda: res['txid'] in self.bitcoin.rpc.getrawmempool()) 883 blockid = self.bitcoin.generate_block(1)[0] 884 885 for i, txid in enumerate(self.bitcoin.rpc.getblock(blockid)['tx']): 886 if txid == res['txid']: 887 txnum = i 888 889 scid = "{}x{}x{}".format(self.bitcoin.rpc.getblockcount(), 890 txnum, res['outnum']) 891 892 if wait_for_active: 893 self.wait_channel_active(scid) 894 l2.wait_channel_active(scid) 895 896 return scid, res 897 898 def subd_pid(self, subd, peerid=None): 899 """Get the process id of the given subdaemon, eg channeld or gossipd""" 900 if peerid: 901 ex = re.compile(r'{}-.*{}.*: pid ([0-9]*),' 902 .format(peerid, subd)) 903 else: 904 ex = re.compile('{}-.*: pid ([0-9]*),'.format(subd)) 905 # Make sure we get latest one if it's restarted! 906 for l in reversed(self.daemon.logs): 907 group = ex.search(l) 908 if group: 909 return group.group(1) 910 raise ValueError("No daemon {} found".format(subd)) 911 912 def channel_state(self, other): 913 """Return the state of the channel to the other node. 914 915 Returns None if there is no such peer, or a channel hasn't been funded 916 yet. 917 918 """ 919 peers = self.rpc.listpeers(other.info['id'])['peers'] 920 if not peers or 'channels' not in peers[0]: 921 return None 922 channel = peers[0]['channels'][0] 923 return channel['state'] 924 925 def get_channel_scid(self, other): 926 """Get the short_channel_id for the channel to the other node. 927 """ 928 peers = self.rpc.listpeers(other.info['id'])['peers'] 929 if not peers or 'channels' not in peers[0]: 930 return None 931 channel = peers[0]['channels'][0] 932 return channel['short_channel_id'] 933 934 def get_channel_id(self, other): 935 """Get the channel_id for the channel to the other node. 936 """ 937 peers = self.rpc.listpeers(other.info['id'])['peers'] 938 if not peers or 'channels' not in peers[0]: 939 return None 940 channel = peers[0]['channels'][0] 941 return channel['channel_id'] 942 943 def is_channel_active(self, chanid): 944 channels = self.rpc.listchannels(chanid)['channels'] 945 active = [(c['short_channel_id'], c['channel_flags']) for c in channels if c['active']] 946 return (chanid, 0) in active and (chanid, 1) in active 947 948 def wait_for_channel_onchain(self, peerid): 949 txid = only_one(only_one(self.rpc.listpeers(peerid)['peers'])['channels'])['scratch_txid'] 950 wait_for(lambda: txid in self.bitcoin.rpc.getrawmempool()) 951 952 def wait_channel_active(self, chanid): 953 wait_for(lambda: self.is_channel_active(chanid)) 954 955 # This waits until gossipd sees channel_update in both directions 956 # (or for local channels, at least a local announcement) 957 def wait_for_channel_updates(self, scids): 958 # Could happen in any order... 959 self.daemon.wait_for_logs(['Received channel_update for channel {}/0'.format(c) 960 for c in scids] 961 + ['Received channel_update for channel {}/1'.format(c) 962 for c in scids]) 963 964 def wait_for_route(self, destination, timeout=TIMEOUT): 965 """ Wait for a route to the destination to become available. 966 """ 967 start_time = time.time() 968 while time.time() < start_time + timeout: 969 try: 970 self.rpc.getroute(destination.info['id'], 1, 1) 971 return True 972 except Exception: 973 time.sleep(1) 974 if time.time() > start_time + timeout: 975 raise ValueError("Error waiting for a route to destination {}".format(destination)) 976 977 # This helper waits for all HTLCs to settle 978 # `scids` can be a list of strings. If unset wait on all channels. 979 def wait_for_htlcs(self, scids=None): 980 peers = self.rpc.listpeers()['peers'] 981 for p, peer in enumerate(peers): 982 if 'channels' in peer: 983 for c, channel in enumerate(peer['channels']): 984 if scids is not None and channel['short_channel_id'] not in scids: 985 continue 986 if 'htlcs' in channel: 987 wait_for(lambda: len(self.rpc.listpeers()['peers'][p]['channels'][c]['htlcs']) == 0) 988 989 # This sends money to a directly connected peer 990 def pay(self, dst, amt, label=None): 991 if not label: 992 label = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(20)) 993 994 # check we are connected 995 dst_id = dst.info['id'] 996 assert len(self.rpc.listpeers(dst_id).get('peers')) == 1 997 998 # make an invoice 999 inv = dst.rpc.invoice(amt, label, label) 1000 # FIXME: pre 0.10.1 invoice calls didn't have payment_secret field 1001 psecret = dst.rpc.decodepay(inv['bolt11'])['payment_secret'] 1002 rhash = inv['payment_hash'] 1003 invoices = dst.rpc.listinvoices(label)['invoices'] 1004 assert len(invoices) == 1 and invoices[0]['status'] == 'unpaid' 1005 1006 routestep = { 1007 'msatoshi': amt, 1008 'id': dst_id, 1009 'delay': 5, 1010 'channel': '1x1x1' # note: can be bogus for 1-hop direct payments 1011 } 1012 1013 # sendpay is async now 1014 self.rpc.sendpay([routestep], rhash, payment_secret=psecret) 1015 # wait for sendpay to comply 1016 result = self.rpc.waitsendpay(rhash) 1017 assert(result.get('status') == 'complete') 1018 1019 # This helper sends all money to a peer until even 1 msat can't get through. 1020 def drain(self, peer): 1021 total = 0 1022 msat = 4294967295 # Max payment size in some configs 1023 while msat != 0: 1024 try: 1025 logging.debug("Drain step with size={}".format(msat)) 1026 self.pay(peer, msat) 1027 total += msat 1028 except RpcError as e: 1029 logging.debug("Got an exception while draining channel: {}".format(e)) 1030 msat //= 2 1031 logging.debug("Draining complete after sending a total of {}msats".format(total)) 1032 return total 1033 1034 # Note: this feeds through the smoother in update_feerate, so changing 1035 # it on a running daemon may not give expected result! 1036 def set_feerates(self, feerates, wait_for_effect=True): 1037 # (bitcoind returns bitcoin per kb, so these are * 4) 1038 1039 def mock_estimatesmartfee(r): 1040 params = r['params'] 1041 if params == [2, 'CONSERVATIVE']: 1042 feerate = feerates[0] * 4 1043 elif params == [6, 'ECONOMICAL']: 1044 feerate = feerates[1] * 4 1045 elif params == [12, 'ECONOMICAL']: 1046 feerate = feerates[2] * 4 1047 elif params == [100, 'ECONOMICAL']: 1048 feerate = feerates[3] * 4 1049 else: 1050 warnings.warn("Don't have a feerate set for {}/{}.".format( 1051 params[0], params[1], 1052 )) 1053 feerate = 42 1054 return { 1055 'id': r['id'], 1056 'error': None, 1057 'result': { 1058 'feerate': Decimal(feerate) / 10**8 1059 }, 1060 } 1061 self.daemon.rpcproxy.mock_rpc('estimatesmartfee', mock_estimatesmartfee) 1062 1063 # Technically, this waits until it's called, not until it's processed. 1064 # We wait until all three levels have been called. 1065 if wait_for_effect: 1066 wait_for(lambda: 1067 self.daemon.rpcproxy.mock_counts['estimatesmartfee'] >= 4) 1068 1069 # force new feerates by restarting and thus skipping slow smoothed process 1070 # Note: testnode must be created with: opts={'may_reconnect': True} 1071 def force_feerates(self, rate): 1072 assert(self.may_reconnect) 1073 self.set_feerates([rate] * 4, False) 1074 self.restart() 1075 self.daemon.wait_for_log('peer_out WIRE_UPDATE_FEE') 1076 assert(self.rpc.feerates('perkw')['perkw']['opening'] == rate) 1077 1078 def wait_for_onchaind_broadcast(self, name, resolve=None): 1079 """Wait for onchaind to drop tx name to resolve (if any)""" 1080 if resolve: 1081 r = self.daemon.wait_for_log('Broadcasting {} .* to resolve {}' 1082 .format(name, resolve)) 1083 else: 1084 r = self.daemon.wait_for_log('Broadcasting {} .* to resolve ' 1085 .format(name)) 1086 1087 rawtx = re.search(r'.* \(([0-9a-fA-F]*)\) ', r).group(1) 1088 txid = self.bitcoin.rpc.decoderawtransaction(rawtx, True)['txid'] 1089 1090 wait_for(lambda: txid in self.bitcoin.rpc.getrawmempool()) 1091 1092 def query_gossip(self, querytype, *args, filters=[]): 1093 """Generate a gossip query, feed it into this node and get responses 1094 in hex""" 1095 query = subprocess.run(['devtools/mkquery', 1096 querytype] + [str(a) for a in args], 1097 check=True, 1098 timeout=TIMEOUT, 1099 stdout=subprocess.PIPE).stdout.strip() 1100 out = subprocess.run(['devtools/gossipwith', 1101 '--timeout-after={}'.format(int(math.sqrt(TIMEOUT) + 1)), 1102 '{}@localhost:{}'.format(self.info['id'], 1103 self.port), 1104 query], 1105 check=True, 1106 timeout=TIMEOUT, stdout=subprocess.PIPE).stdout 1107 1108 def passes_filters(hmsg, filters): 1109 for f in filters: 1110 if hmsg.startswith(f): 1111 return False 1112 return True 1113 1114 msgs = [] 1115 while len(out): 1116 length = struct.unpack('>H', out[0:2])[0] 1117 hmsg = out[2:2 + length].hex() 1118 if passes_filters(hmsg, filters): 1119 msgs.append(out[2:2 + length].hex()) 1120 out = out[2 + length:] 1121 return msgs 1122 1123 def config(self, config_name): 1124 try: 1125 opt = self.rpc.listconfigs(config_name) 1126 return opt[config_name] 1127 except RpcError: 1128 return None 1129 1130 1131@contextmanager 1132def flock(directory: Path): 1133 """A fair filelock, based on atomic fs operations. 1134 """ 1135 if not isinstance(directory, Path): 1136 directory = Path(directory) 1137 d = directory / Path(".locks") 1138 os.makedirs(str(d), exist_ok=True) 1139 fname = None 1140 1141 while True: 1142 # Try until we find a filename that doesn't exist yet. 1143 try: 1144 fname = d / Path("lock-{}".format(time.time())) 1145 fd = os.open(str(fname), flags=os.O_CREAT | os.O_EXCL) 1146 os.close(fd) 1147 break 1148 except FileExistsError: 1149 time.sleep(0.1) 1150 1151 # So now we have a position in the lock, let's check if we are the 1152 # next one to go: 1153 while True: 1154 files = sorted([f for f in d.iterdir() if f.is_file()]) 1155 # We're queued, so it should at least have us. 1156 assert len(files) >= 1 1157 if files[0] == fname: 1158 break 1159 time.sleep(0.1) 1160 1161 # We can continue 1162 yield fname 1163 1164 # Remove our file, so the next one can go ahead. 1165 fname.unlink() 1166 1167 1168class Throttler(object): 1169 """Throttles the creation of system-processes to avoid overload. 1170 1171 There is no reason to overload the system with too many processes 1172 being spawned or run at the same time. It causes timeouts by 1173 aggressively preempting processes and swapping if the memory limit is 1174 reached. In order to reduce this loss of performance we provide a 1175 `wait()` method which will serialize the creation of processes, but 1176 also delay if the system load is too high. 1177 1178 Notice that technically we are throttling too late, i.e., we react 1179 to an overload, but chances are pretty good that some other 1180 already running process is about to terminate, and so the overload 1181 is short-lived. We throttle when the process object is first 1182 created, not when restarted, in order to avoid delaying running 1183 tests, which could cause more timeouts. 1184 1185 """ 1186 def __init__(self, directory: str, target: float = 90): 1187 """If specified we try to stick to a load of target (in percent). 1188 """ 1189 self.target = target 1190 self.current_load = self.target # Start slow 1191 psutil.cpu_percent() # Prime the internal load metric 1192 self.directory = directory 1193 1194 def wait(self): 1195 start_time = time.time() 1196 with flock(self.directory): 1197 # We just got the lock, assume someone else just released it 1198 self.current_load = 100 1199 while self.load() >= self.target: 1200 time.sleep(1) 1201 1202 self.current_load = 100 # Back off slightly to avoid triggering right away 1203 print("Throttler delayed startup for {} seconds".format(time.time() - start_time)) 1204 1205 def load(self): 1206 """An exponential moving average of the load 1207 """ 1208 decay = 0.5 1209 load = psutil.cpu_percent() 1210 self.current_load = decay * load + (1 - decay) * self.current_load 1211 return self.current_load 1212 1213 1214class NodeFactory(object): 1215 """A factory to setup and start `lightningd` daemons. 1216 """ 1217 def __init__(self, request, testname, bitcoind, executor, directory, 1218 db_provider, node_cls, throttler, jsonschemas): 1219 if request.node.get_closest_marker("slow_test") and SLOW_MACHINE: 1220 self.valgrind = False 1221 else: 1222 self.valgrind = VALGRIND 1223 self.testname = testname 1224 self.next_id = 1 1225 self.nodes = [] 1226 self.executor = executor 1227 self.bitcoind = bitcoind 1228 self.directory = directory 1229 self.lock = threading.Lock() 1230 self.db_provider = db_provider 1231 self.node_cls = node_cls 1232 self.throttler = throttler 1233 self.jsonschemas = jsonschemas 1234 1235 def split_options(self, opts): 1236 """Split node options from cli options 1237 1238 Some options are used to instrument the node wrapper and some are passed 1239 to the daemon on the command line. Split them so we know where to use 1240 them. 1241 """ 1242 node_opt_keys = [ 1243 'disconnect', 1244 'may_fail', 1245 'allow_broken_log', 1246 'allow_warning', 1247 'may_reconnect', 1248 'random_hsm', 1249 'feerates', 1250 'wait_for_bitcoind_sync', 1251 'allow_bad_gossip', 1252 'start', 1253 ] 1254 node_opts = {k: v for k, v in opts.items() if k in node_opt_keys} 1255 cli_opts = {k: v for k, v in opts.items() if k not in node_opt_keys} 1256 return node_opts, cli_opts 1257 1258 def get_next_port(self): 1259 with self.lock: 1260 return reserve() 1261 1262 def get_node_id(self): 1263 """Generate a unique numeric ID for a lightning node 1264 """ 1265 with self.lock: 1266 node_id = self.next_id 1267 self.next_id += 1 1268 return node_id 1269 1270 def get_nodes(self, num_nodes, opts=None): 1271 """Start a number of nodes in parallel, each with its own options 1272 """ 1273 if opts is None: 1274 # No opts were passed in, give some dummy opts 1275 opts = [{} for _ in range(num_nodes)] 1276 elif isinstance(opts, dict): 1277 # A single dict was passed in, so we use these opts for all nodes 1278 opts = [opts] * num_nodes 1279 1280 assert len(opts) == num_nodes 1281 1282 jobs = [] 1283 for i in range(num_nodes): 1284 node_opts, cli_opts = self.split_options(opts[i]) 1285 jobs.append(self.executor.submit( 1286 self.get_node, options=cli_opts, 1287 node_id=self.get_node_id(), **node_opts 1288 )) 1289 1290 return [j.result() for j in jobs] 1291 1292 def get_node(self, node_id=None, options=None, dbfile=None, 1293 feerates=(15000, 11000, 7500, 3750), start=True, 1294 wait_for_bitcoind_sync=True, may_fail=False, 1295 expect_fail=False, cleandir=True, **kwargs): 1296 self.throttler.wait() 1297 node_id = self.get_node_id() if not node_id else node_id 1298 port = self.get_next_port() 1299 1300 lightning_dir = os.path.join( 1301 self.directory, "lightning-{}/".format(node_id)) 1302 1303 if cleandir and os.path.exists(lightning_dir): 1304 shutil.rmtree(lightning_dir) 1305 1306 # Get the DB backend DSN we should be using for this test and this 1307 # node. 1308 db = self.db_provider.get_db(os.path.join(lightning_dir, TEST_NETWORK), self.testname, node_id) 1309 node = self.node_cls( 1310 node_id, lightning_dir, self.bitcoind, self.executor, self.valgrind, db=db, 1311 port=port, options=options, may_fail=may_fail or expect_fail, 1312 jsonschemas=self.jsonschemas, 1313 **kwargs 1314 ) 1315 1316 # Regtest estimatefee are unusable, so override. 1317 node.set_feerates(feerates, False) 1318 1319 self.nodes.append(node) 1320 if dbfile: 1321 out = open(os.path.join(node.daemon.lightning_dir, TEST_NETWORK, 1322 'lightningd.sqlite3'), 'xb') 1323 with lzma.open(os.path.join('tests/data', dbfile), 'rb') as f: 1324 out.write(f.read()) 1325 1326 if start: 1327 try: 1328 # Capture stderr if we're failing 1329 if expect_fail: 1330 stderr = subprocess.PIPE 1331 else: 1332 stderr = None 1333 node.start(wait_for_bitcoind_sync, stderr=stderr) 1334 except Exception: 1335 if expect_fail: 1336 return node 1337 node.daemon.stop() 1338 raise 1339 return node 1340 1341 def join_nodes(self, nodes, fundchannel=True, fundamount=FUNDAMOUNT, wait_for_announce=False, announce_channels=True) -> None: 1342 """Given nodes, connect them in a line, optionally funding a channel""" 1343 assert not (wait_for_announce and not announce_channels), "You've asked to wait for an announcement that's not coming. (wait_for_announce=True,announce_channels=False)" 1344 connections = [(nodes[i], nodes[i + 1]) for i in range(len(nodes) - 1)] 1345 1346 for src, dst in connections: 1347 src.rpc.connect(dst.info['id'], 'localhost', dst.port) 1348 1349 # If we're returning now, make sure dst all show connections in 1350 # getpeers. 1351 if not fundchannel: 1352 for src, dst in connections: 1353 dst.daemon.wait_for_log(r'{}-.*-chan#[0-9]*: Handed peer, entering loop'.format(src.info['id'])) 1354 return 1355 1356 bitcoind = nodes[0].bitcoin 1357 # If we got here, we want to fund channels 1358 for src, dst in connections: 1359 addr = src.rpc.newaddr()['bech32'] 1360 bitcoind.rpc.sendtoaddress(addr, (fundamount + 1000000) / 10**8) 1361 1362 bitcoind.generate_block(1) 1363 sync_blockheight(bitcoind, nodes) 1364 txids = [] 1365 for src, dst in connections: 1366 txids.append(src.rpc.fundchannel(dst.info['id'], fundamount, announce=announce_channels)['txid']) 1367 1368 wait_for(lambda: set(txids).issubset(set(bitcoind.rpc.getrawmempool()))) 1369 1370 # Confirm all channels and wait for them to become usable 1371 bitcoind.generate_block(1) 1372 scids = [] 1373 for src, dst in connections: 1374 wait_for(lambda: src.channel_state(dst) == 'CHANNELD_NORMAL') 1375 scid = src.get_channel_scid(dst) 1376 scids.append(scid) 1377 1378 # Wait for all channels to be active (locally) 1379 for i, n in enumerate(scids): 1380 nodes[i].wait_channel_active(scids[i]) 1381 nodes[i + 1].wait_channel_active(scids[i]) 1382 1383 if not wait_for_announce: 1384 return 1385 1386 bitcoind.generate_block(5) 1387 1388 # Make sure everyone sees all channels: we can cheat and 1389 # simply check the ends (since it's a line). 1390 nodes[0].wait_channel_active(scids[-1]) 1391 nodes[-1].wait_channel_active(scids[0]) 1392 1393 # Make sure we have all node announcements, too (just check ends) 1394 for n in nodes: 1395 for end in (nodes[0], nodes[-1]): 1396 wait_for(lambda: 'alias' in only_one(end.rpc.listnodes(n.info['id'])['nodes'])) 1397 1398 def line_graph(self, num_nodes, fundchannel=True, fundamount=FUNDAMOUNT, wait_for_announce=False, opts=None, announce_channels=True): 1399 """ Create nodes, connect them and optionally fund channels. 1400 """ 1401 nodes = self.get_nodes(num_nodes, opts=opts) 1402 1403 self.join_nodes(nodes, fundchannel, fundamount, wait_for_announce, announce_channels) 1404 return nodes 1405 1406 def killall(self, expected_successes): 1407 """Returns true if every node we expected to succeed actually succeeded""" 1408 unexpected_fail = False 1409 err_msgs = [] 1410 for i in range(len(self.nodes)): 1411 leaks = None 1412 # leak detection upsets VALGRIND by reading uninitialized mem. 1413 # If it's dead, we'll catch it below. 1414 if not self.valgrind and DEVELOPER: 1415 try: 1416 # This also puts leaks in log. 1417 leaks = self.nodes[i].rpc.dev_memleak()['leaks'] 1418 except Exception: 1419 pass 1420 1421 try: 1422 self.nodes[i].stop() 1423 except Exception: 1424 if expected_successes[i]: 1425 unexpected_fail = True 1426 1427 if leaks is not None and len(leaks) != 0: 1428 unexpected_fail = True 1429 err_msgs.append("Node {} has memory leaks: {}".format( 1430 self.nodes[i].daemon.lightning_dir, 1431 json.dumps(leaks, sort_keys=True, indent=4) 1432 )) 1433 1434 return not unexpected_fail, err_msgs 1435