1#!/usr/bin/env python3
2# Copyright (c) 2019-2020 The Bitcoin Core developers
3# Distributed under the MIT software license, see the accompanying
4# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5"""
6Test transaction download behavior
7"""
8
9from test_framework.messages import (
10    CInv,
11    MSG_TX,
12    MSG_TYPE_MASK,
13    MSG_WTX,
14    msg_inv,
15    msg_notfound,
16    tx_from_hex,
17)
18from test_framework.p2p import (
19    P2PInterface,
20    p2p_lock,
21)
22from test_framework.test_framework import BitcoinTestFramework
23from test_framework.util import (
24    assert_equal,
25)
26from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE
27
28import time
29
30
31class TestP2PConn(P2PInterface):
32    def __init__(self, wtxidrelay=True):
33        super().__init__(wtxidrelay=wtxidrelay)
34        self.tx_getdata_count = 0
35
36    def on_getdata(self, message):
37        for i in message.inv:
38            if i.type & MSG_TYPE_MASK == MSG_TX or i.type & MSG_TYPE_MASK == MSG_WTX:
39                self.tx_getdata_count += 1
40
41
42# Constants from net_processing
43GETDATA_TX_INTERVAL = 60  # seconds
44INBOUND_PEER_TX_DELAY = 2  # seconds
45TXID_RELAY_DELAY = 2 # seconds
46OVERLOADED_PEER_DELAY = 2 # seconds
47MAX_GETDATA_IN_FLIGHT = 100
48MAX_PEER_TX_ANNOUNCEMENTS = 5000
49NONPREF_PEER_TX_DELAY = 2
50
51# Python test constants
52NUM_INBOUND = 10
53MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY
54
55
56class TxDownloadTest(BitcoinTestFramework):
57    def set_test_params(self):
58        self.num_nodes = 2
59
60    def test_tx_requests(self):
61        self.log.info("Test that we request transactions from all our peers, eventually")
62
63        txid = 0xdeadbeef
64
65        self.log.info("Announce the txid from each incoming peer to node 0")
66        msg = msg_inv([CInv(t=MSG_WTX, h=txid)])
67        for p in self.nodes[0].p2ps:
68            p.send_and_ping(msg)
69
70        outstanding_peer_index = [i for i in range(len(self.nodes[0].p2ps))]
71
72        def getdata_found(peer_index):
73            p = self.nodes[0].p2ps[peer_index]
74            with p2p_lock:
75                return p.last_message.get("getdata") and p.last_message["getdata"].inv[-1].hash == txid
76
77        node_0_mocktime = int(time.time())
78        while outstanding_peer_index:
79            node_0_mocktime += MAX_GETDATA_INBOUND_WAIT
80            self.nodes[0].setmocktime(node_0_mocktime)
81            self.wait_until(lambda: any(getdata_found(i) for i in outstanding_peer_index))
82            for i in outstanding_peer_index:
83                if getdata_found(i):
84                    outstanding_peer_index.remove(i)
85
86        self.nodes[0].setmocktime(0)
87        self.log.info("All outstanding peers received a getdata")
88
89    def test_inv_block(self):
90        self.log.info("Generate a transaction on node 0")
91        tx = self.nodes[0].createrawtransaction(
92            inputs=[{  # coinbase
93                "txid": self.nodes[0].getblock(self.nodes[0].getblockhash(1))['tx'][0],
94                "vout": 0
95            }],
96            outputs={ADDRESS_BCRT1_UNSPENDABLE: 50 - 0.00025},
97        )
98        tx = self.nodes[0].signrawtransactionwithkey(
99            hexstring=tx,
100            privkeys=[self.nodes[0].get_deterministic_priv_key().key],
101        )['hex']
102        ctx = tx_from_hex(tx)
103        txid = int(ctx.rehash(), 16)
104
105        self.log.info(
106            "Announce the transaction to all nodes from all {} incoming peers, but never send it".format(NUM_INBOUND))
107        msg = msg_inv([CInv(t=MSG_TX, h=txid)])
108        for p in self.peers:
109            p.send_and_ping(msg)
110
111        self.log.info("Put the tx in node 0's mempool")
112        self.nodes[0].sendrawtransaction(tx)
113
114        # Since node 1 is connected outbound to an honest peer (node 0), it
115        # should get the tx within a timeout. (Assuming that node 0
116        # announced the tx within the timeout)
117        # The timeout is the sum of
118        # * the worst case until the tx is first requested from an inbound
119        #   peer, plus
120        # * the first time it is re-requested from the outbound peer, plus
121        # * 2 seconds to avoid races
122        assert self.nodes[1].getpeerinfo()[0]['inbound'] == False
123        timeout = 2 + INBOUND_PEER_TX_DELAY + GETDATA_TX_INTERVAL
124        self.log.info("Tx should be received at node 1 after {} seconds".format(timeout))
125        self.sync_mempools(timeout=timeout)
126
127    def test_in_flight_max(self):
128        self.log.info("Test that we don't load peers with more than {} transaction requests immediately".format(MAX_GETDATA_IN_FLIGHT))
129        txids = [i for i in range(MAX_GETDATA_IN_FLIGHT + 2)]
130
131        p = self.nodes[0].p2ps[0]
132
133        with p2p_lock:
134            p.tx_getdata_count = 0
135
136        mock_time = int(time.time() + 1)
137        self.nodes[0].setmocktime(mock_time)
138        for i in range(MAX_GETDATA_IN_FLIGHT):
139            p.send_message(msg_inv([CInv(t=MSG_WTX, h=txids[i])]))
140        p.sync_with_ping()
141        mock_time += INBOUND_PEER_TX_DELAY
142        self.nodes[0].setmocktime(mock_time)
143        p.wait_until(lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT)
144        for i in range(MAX_GETDATA_IN_FLIGHT, len(txids)):
145            p.send_message(msg_inv([CInv(t=MSG_WTX, h=txids[i])]))
146        p.sync_with_ping()
147        self.log.info("No more than {} requests should be seen within {} seconds after announcement".format(MAX_GETDATA_IN_FLIGHT, INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY - 1))
148        self.nodes[0].setmocktime(mock_time + INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY - 1)
149        p.sync_with_ping()
150        with p2p_lock:
151            assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT)
152        self.log.info("If we wait {} seconds after announcement, we should eventually get more requests".format(INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY))
153        self.nodes[0].setmocktime(mock_time + INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY)
154        p.wait_until(lambda: p.tx_getdata_count == len(txids))
155
156    def test_expiry_fallback(self):
157        self.log.info('Check that expiry will select another peer for download')
158        WTXID = 0xffaa
159        peer1 = self.nodes[0].add_p2p_connection(TestP2PConn())
160        peer2 = self.nodes[0].add_p2p_connection(TestP2PConn())
161        for p in [peer1, peer2]:
162            p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)]))
163        # One of the peers is asked for the tx
164        peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1)
165        with p2p_lock:
166            peer_expiry, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1)
167            assert_equal(peer_fallback.tx_getdata_count, 0)
168        self.nodes[0].setmocktime(int(time.time()) + GETDATA_TX_INTERVAL + 1)  # Wait for request to peer_expiry to expire
169        peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1)
170        self.restart_node(0)  # reset mocktime
171
172    def test_disconnect_fallback(self):
173        self.log.info('Check that disconnect will select another peer for download')
174        WTXID = 0xffbb
175        peer1 = self.nodes[0].add_p2p_connection(TestP2PConn())
176        peer2 = self.nodes[0].add_p2p_connection(TestP2PConn())
177        for p in [peer1, peer2]:
178            p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)]))
179        # One of the peers is asked for the tx
180        peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1)
181        with p2p_lock:
182            peer_disconnect, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1)
183            assert_equal(peer_fallback.tx_getdata_count, 0)
184        peer_disconnect.peer_disconnect()
185        peer_disconnect.wait_for_disconnect()
186        peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1)
187
188    def test_notfound_fallback(self):
189        self.log.info('Check that notfounds will select another peer for download immediately')
190        WTXID = 0xffdd
191        peer1 = self.nodes[0].add_p2p_connection(TestP2PConn())
192        peer2 = self.nodes[0].add_p2p_connection(TestP2PConn())
193        for p in [peer1, peer2]:
194            p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)]))
195        # One of the peers is asked for the tx
196        peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1)
197        with p2p_lock:
198            peer_notfound, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1)
199            assert_equal(peer_fallback.tx_getdata_count, 0)
200        peer_notfound.send_and_ping(msg_notfound(vec=[CInv(MSG_WTX, WTXID)]))  # Send notfound, so that fallback peer is selected
201        peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1)
202
203    def test_preferred_inv(self, preferred=False):
204        if preferred:
205            self.log.info('Check invs from preferred peers are downloaded immediately')
206            self.restart_node(0, extra_args=['-whitelist=noban@127.0.0.1'])
207        else:
208            self.log.info('Check invs from non-preferred peers are downloaded after {} s'.format(NONPREF_PEER_TX_DELAY))
209        mock_time = int(time.time() + 1)
210        self.nodes[0].setmocktime(mock_time)
211        peer = self.nodes[0].add_p2p_connection(TestP2PConn())
212        peer.send_message(msg_inv([CInv(t=MSG_WTX, h=0xff00ff00)]))
213        peer.sync_with_ping()
214        if preferred:
215            peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1)
216        else:
217            with p2p_lock:
218                assert_equal(peer.tx_getdata_count, 0)
219            self.nodes[0].setmocktime(mock_time + NONPREF_PEER_TX_DELAY)
220            peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1)
221
222    def test_txid_inv_delay(self, glob_wtxid=False):
223        self.log.info('Check that inv from a txid-relay peers are delayed by {} s, with a wtxid peer {}'.format(TXID_RELAY_DELAY, glob_wtxid))
224        self.restart_node(0, extra_args=['-whitelist=noban@127.0.0.1'])
225        mock_time = int(time.time() + 1)
226        self.nodes[0].setmocktime(mock_time)
227        peer = self.nodes[0].add_p2p_connection(TestP2PConn(wtxidrelay=False))
228        if glob_wtxid:
229            # Add a second wtxid-relay connection otherwise TXID_RELAY_DELAY is waived in
230            # lack of wtxid-relay peers
231            self.nodes[0].add_p2p_connection(TestP2PConn(wtxidrelay=True))
232        peer.send_message(msg_inv([CInv(t=MSG_TX, h=0xff11ff11)]))
233        peer.sync_with_ping()
234        with p2p_lock:
235            assert_equal(peer.tx_getdata_count, 0 if glob_wtxid else 1)
236        self.nodes[0].setmocktime(mock_time + TXID_RELAY_DELAY)
237        peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1)
238
239    def test_large_inv_batch(self):
240        self.log.info('Test how large inv batches are handled with relay permission')
241        self.restart_node(0, extra_args=['-whitelist=relay@127.0.0.1'])
242        peer = self.nodes[0].add_p2p_connection(TestP2PConn())
243        peer.send_message(msg_inv([CInv(t=MSG_WTX, h=wtxid) for wtxid in range(MAX_PEER_TX_ANNOUNCEMENTS + 1)]))
244        peer.wait_until(lambda: peer.tx_getdata_count == MAX_PEER_TX_ANNOUNCEMENTS + 1)
245
246        self.log.info('Test how large inv batches are handled without relay permission')
247        self.restart_node(0)
248        peer = self.nodes[0].add_p2p_connection(TestP2PConn())
249        peer.send_message(msg_inv([CInv(t=MSG_WTX, h=wtxid) for wtxid in range(MAX_PEER_TX_ANNOUNCEMENTS + 1)]))
250        peer.wait_until(lambda: peer.tx_getdata_count == MAX_PEER_TX_ANNOUNCEMENTS)
251        peer.sync_with_ping()
252
253    def test_spurious_notfound(self):
254        self.log.info('Check that spurious notfound is ignored')
255        self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(MSG_TX, 1)]))
256
257    def run_test(self):
258        # Run tests without mocktime that only need one peer-connection first, to avoid restarting the nodes
259        self.test_expiry_fallback()
260        self.test_disconnect_fallback()
261        self.test_notfound_fallback()
262        self.test_preferred_inv()
263        self.test_preferred_inv(True)
264        self.test_txid_inv_delay()
265        self.test_txid_inv_delay(True)
266        self.test_large_inv_batch()
267        self.test_spurious_notfound()
268
269        # Run each test against new bitcoind instances, as setting mocktimes has long-term effects on when
270        # the next trickle relay event happens.
271        for test in [self.test_in_flight_max, self.test_inv_block, self.test_tx_requests]:
272            self.stop_nodes()
273            self.start_nodes()
274            self.connect_nodes(1, 0)
275            # Setup the p2p connections
276            self.peers = []
277            for node in self.nodes:
278                for _ in range(NUM_INBOUND):
279                    self.peers.append(node.add_p2p_connection(TestP2PConn()))
280            self.log.info("Nodes are setup with {} incoming connections each".format(NUM_INBOUND))
281            test()
282
283
284if __name__ == '__main__':
285    TxDownloadTest().main()
286