1#!/usr/bin/env python3 2# Copyright (c) 2019-2020 The Bitcoin Core developers 3# Distributed under the MIT software license, see the accompanying 4# file COPYING or http://www.opensource.org/licenses/mit-license.php. 5""" 6Test transaction download behavior 7""" 8 9from test_framework.messages import ( 10 CInv, 11 MSG_TX, 12 MSG_TYPE_MASK, 13 MSG_WTX, 14 msg_inv, 15 msg_notfound, 16 tx_from_hex, 17) 18from test_framework.p2p import ( 19 P2PInterface, 20 p2p_lock, 21) 22from test_framework.test_framework import BitcoinTestFramework 23from test_framework.util import ( 24 assert_equal, 25) 26from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE 27 28import time 29 30 31class TestP2PConn(P2PInterface): 32 def __init__(self, wtxidrelay=True): 33 super().__init__(wtxidrelay=wtxidrelay) 34 self.tx_getdata_count = 0 35 36 def on_getdata(self, message): 37 for i in message.inv: 38 if i.type & MSG_TYPE_MASK == MSG_TX or i.type & MSG_TYPE_MASK == MSG_WTX: 39 self.tx_getdata_count += 1 40 41 42# Constants from net_processing 43GETDATA_TX_INTERVAL = 60 # seconds 44INBOUND_PEER_TX_DELAY = 2 # seconds 45TXID_RELAY_DELAY = 2 # seconds 46OVERLOADED_PEER_DELAY = 2 # seconds 47MAX_GETDATA_IN_FLIGHT = 100 48MAX_PEER_TX_ANNOUNCEMENTS = 5000 49NONPREF_PEER_TX_DELAY = 2 50 51# Python test constants 52NUM_INBOUND = 10 53MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY 54 55 56class TxDownloadTest(BitcoinTestFramework): 57 def set_test_params(self): 58 self.num_nodes = 2 59 60 def test_tx_requests(self): 61 self.log.info("Test that we request transactions from all our peers, eventually") 62 63 txid = 0xdeadbeef 64 65 self.log.info("Announce the txid from each incoming peer to node 0") 66 msg = msg_inv([CInv(t=MSG_WTX, h=txid)]) 67 for p in self.nodes[0].p2ps: 68 p.send_and_ping(msg) 69 70 outstanding_peer_index = [i for i in range(len(self.nodes[0].p2ps))] 71 72 def getdata_found(peer_index): 73 p = self.nodes[0].p2ps[peer_index] 74 with p2p_lock: 75 return p.last_message.get("getdata") and p.last_message["getdata"].inv[-1].hash == txid 76 77 node_0_mocktime = int(time.time()) 78 while outstanding_peer_index: 79 node_0_mocktime += MAX_GETDATA_INBOUND_WAIT 80 self.nodes[0].setmocktime(node_0_mocktime) 81 self.wait_until(lambda: any(getdata_found(i) for i in outstanding_peer_index)) 82 for i in outstanding_peer_index: 83 if getdata_found(i): 84 outstanding_peer_index.remove(i) 85 86 self.nodes[0].setmocktime(0) 87 self.log.info("All outstanding peers received a getdata") 88 89 def test_inv_block(self): 90 self.log.info("Generate a transaction on node 0") 91 tx = self.nodes[0].createrawtransaction( 92 inputs=[{ # coinbase 93 "txid": self.nodes[0].getblock(self.nodes[0].getblockhash(1))['tx'][0], 94 "vout": 0 95 }], 96 outputs={ADDRESS_BCRT1_UNSPENDABLE: 50 - 0.00025}, 97 ) 98 tx = self.nodes[0].signrawtransactionwithkey( 99 hexstring=tx, 100 privkeys=[self.nodes[0].get_deterministic_priv_key().key], 101 )['hex'] 102 ctx = tx_from_hex(tx) 103 txid = int(ctx.rehash(), 16) 104 105 self.log.info( 106 "Announce the transaction to all nodes from all {} incoming peers, but never send it".format(NUM_INBOUND)) 107 msg = msg_inv([CInv(t=MSG_TX, h=txid)]) 108 for p in self.peers: 109 p.send_and_ping(msg) 110 111 self.log.info("Put the tx in node 0's mempool") 112 self.nodes[0].sendrawtransaction(tx) 113 114 # Since node 1 is connected outbound to an honest peer (node 0), it 115 # should get the tx within a timeout. (Assuming that node 0 116 # announced the tx within the timeout) 117 # The timeout is the sum of 118 # * the worst case until the tx is first requested from an inbound 119 # peer, plus 120 # * the first time it is re-requested from the outbound peer, plus 121 # * 2 seconds to avoid races 122 assert self.nodes[1].getpeerinfo()[0]['inbound'] == False 123 timeout = 2 + INBOUND_PEER_TX_DELAY + GETDATA_TX_INTERVAL 124 self.log.info("Tx should be received at node 1 after {} seconds".format(timeout)) 125 self.sync_mempools(timeout=timeout) 126 127 def test_in_flight_max(self): 128 self.log.info("Test that we don't load peers with more than {} transaction requests immediately".format(MAX_GETDATA_IN_FLIGHT)) 129 txids = [i for i in range(MAX_GETDATA_IN_FLIGHT + 2)] 130 131 p = self.nodes[0].p2ps[0] 132 133 with p2p_lock: 134 p.tx_getdata_count = 0 135 136 mock_time = int(time.time() + 1) 137 self.nodes[0].setmocktime(mock_time) 138 for i in range(MAX_GETDATA_IN_FLIGHT): 139 p.send_message(msg_inv([CInv(t=MSG_WTX, h=txids[i])])) 140 p.sync_with_ping() 141 mock_time += INBOUND_PEER_TX_DELAY 142 self.nodes[0].setmocktime(mock_time) 143 p.wait_until(lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT) 144 for i in range(MAX_GETDATA_IN_FLIGHT, len(txids)): 145 p.send_message(msg_inv([CInv(t=MSG_WTX, h=txids[i])])) 146 p.sync_with_ping() 147 self.log.info("No more than {} requests should be seen within {} seconds after announcement".format(MAX_GETDATA_IN_FLIGHT, INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY - 1)) 148 self.nodes[0].setmocktime(mock_time + INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY - 1) 149 p.sync_with_ping() 150 with p2p_lock: 151 assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT) 152 self.log.info("If we wait {} seconds after announcement, we should eventually get more requests".format(INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY)) 153 self.nodes[0].setmocktime(mock_time + INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY) 154 p.wait_until(lambda: p.tx_getdata_count == len(txids)) 155 156 def test_expiry_fallback(self): 157 self.log.info('Check that expiry will select another peer for download') 158 WTXID = 0xffaa 159 peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) 160 peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) 161 for p in [peer1, peer2]: 162 p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)])) 163 # One of the peers is asked for the tx 164 peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1) 165 with p2p_lock: 166 peer_expiry, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1) 167 assert_equal(peer_fallback.tx_getdata_count, 0) 168 self.nodes[0].setmocktime(int(time.time()) + GETDATA_TX_INTERVAL + 1) # Wait for request to peer_expiry to expire 169 peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) 170 self.restart_node(0) # reset mocktime 171 172 def test_disconnect_fallback(self): 173 self.log.info('Check that disconnect will select another peer for download') 174 WTXID = 0xffbb 175 peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) 176 peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) 177 for p in [peer1, peer2]: 178 p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)])) 179 # One of the peers is asked for the tx 180 peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1) 181 with p2p_lock: 182 peer_disconnect, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1) 183 assert_equal(peer_fallback.tx_getdata_count, 0) 184 peer_disconnect.peer_disconnect() 185 peer_disconnect.wait_for_disconnect() 186 peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) 187 188 def test_notfound_fallback(self): 189 self.log.info('Check that notfounds will select another peer for download immediately') 190 WTXID = 0xffdd 191 peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) 192 peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) 193 for p in [peer1, peer2]: 194 p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)])) 195 # One of the peers is asked for the tx 196 peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1) 197 with p2p_lock: 198 peer_notfound, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1) 199 assert_equal(peer_fallback.tx_getdata_count, 0) 200 peer_notfound.send_and_ping(msg_notfound(vec=[CInv(MSG_WTX, WTXID)])) # Send notfound, so that fallback peer is selected 201 peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) 202 203 def test_preferred_inv(self, preferred=False): 204 if preferred: 205 self.log.info('Check invs from preferred peers are downloaded immediately') 206 self.restart_node(0, extra_args=['-whitelist=noban@127.0.0.1']) 207 else: 208 self.log.info('Check invs from non-preferred peers are downloaded after {} s'.format(NONPREF_PEER_TX_DELAY)) 209 mock_time = int(time.time() + 1) 210 self.nodes[0].setmocktime(mock_time) 211 peer = self.nodes[0].add_p2p_connection(TestP2PConn()) 212 peer.send_message(msg_inv([CInv(t=MSG_WTX, h=0xff00ff00)])) 213 peer.sync_with_ping() 214 if preferred: 215 peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1) 216 else: 217 with p2p_lock: 218 assert_equal(peer.tx_getdata_count, 0) 219 self.nodes[0].setmocktime(mock_time + NONPREF_PEER_TX_DELAY) 220 peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1) 221 222 def test_txid_inv_delay(self, glob_wtxid=False): 223 self.log.info('Check that inv from a txid-relay peers are delayed by {} s, with a wtxid peer {}'.format(TXID_RELAY_DELAY, glob_wtxid)) 224 self.restart_node(0, extra_args=['-whitelist=noban@127.0.0.1']) 225 mock_time = int(time.time() + 1) 226 self.nodes[0].setmocktime(mock_time) 227 peer = self.nodes[0].add_p2p_connection(TestP2PConn(wtxidrelay=False)) 228 if glob_wtxid: 229 # Add a second wtxid-relay connection otherwise TXID_RELAY_DELAY is waived in 230 # lack of wtxid-relay peers 231 self.nodes[0].add_p2p_connection(TestP2PConn(wtxidrelay=True)) 232 peer.send_message(msg_inv([CInv(t=MSG_TX, h=0xff11ff11)])) 233 peer.sync_with_ping() 234 with p2p_lock: 235 assert_equal(peer.tx_getdata_count, 0 if glob_wtxid else 1) 236 self.nodes[0].setmocktime(mock_time + TXID_RELAY_DELAY) 237 peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1) 238 239 def test_large_inv_batch(self): 240 self.log.info('Test how large inv batches are handled with relay permission') 241 self.restart_node(0, extra_args=['-whitelist=relay@127.0.0.1']) 242 peer = self.nodes[0].add_p2p_connection(TestP2PConn()) 243 peer.send_message(msg_inv([CInv(t=MSG_WTX, h=wtxid) for wtxid in range(MAX_PEER_TX_ANNOUNCEMENTS + 1)])) 244 peer.wait_until(lambda: peer.tx_getdata_count == MAX_PEER_TX_ANNOUNCEMENTS + 1) 245 246 self.log.info('Test how large inv batches are handled without relay permission') 247 self.restart_node(0) 248 peer = self.nodes[0].add_p2p_connection(TestP2PConn()) 249 peer.send_message(msg_inv([CInv(t=MSG_WTX, h=wtxid) for wtxid in range(MAX_PEER_TX_ANNOUNCEMENTS + 1)])) 250 peer.wait_until(lambda: peer.tx_getdata_count == MAX_PEER_TX_ANNOUNCEMENTS) 251 peer.sync_with_ping() 252 253 def test_spurious_notfound(self): 254 self.log.info('Check that spurious notfound is ignored') 255 self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(MSG_TX, 1)])) 256 257 def run_test(self): 258 # Run tests without mocktime that only need one peer-connection first, to avoid restarting the nodes 259 self.test_expiry_fallback() 260 self.test_disconnect_fallback() 261 self.test_notfound_fallback() 262 self.test_preferred_inv() 263 self.test_preferred_inv(True) 264 self.test_txid_inv_delay() 265 self.test_txid_inv_delay(True) 266 self.test_large_inv_batch() 267 self.test_spurious_notfound() 268 269 # Run each test against new bitcoind instances, as setting mocktimes has long-term effects on when 270 # the next trickle relay event happens. 271 for test in [self.test_in_flight_max, self.test_inv_block, self.test_tx_requests]: 272 self.stop_nodes() 273 self.start_nodes() 274 self.connect_nodes(1, 0) 275 # Setup the p2p connections 276 self.peers = [] 277 for node in self.nodes: 278 for _ in range(NUM_INBOUND): 279 self.peers.append(node.add_p2p_connection(TestP2PConn())) 280 self.log.info("Nodes are setup with {} incoming connections each".format(NUM_INBOUND)) 281 test() 282 283 284if __name__ == '__main__': 285 TxDownloadTest().main() 286