1#!/usr/bin/env python3
2# Copyright (c) 2015-2016 The Bitcoin Core developers
3# Distributed under the MIT software license, see the accompanying
4# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6from .mininode import *
7from .blockstore import BlockStore, TxStore
8from .util import p2p_port
9
10'''
11This is a tool for comparing two or more bitcoinds to each other
12using a script provided.
13
14To use, create a class that implements get_tests(), and pass it in
15as the test generator to TestManager.  get_tests() should be a python
16generator that returns TestInstance objects.  See below for definition.
17'''
18
19# TestNode behaves as follows:
20# Configure with a BlockStore and TxStore
21# on_inv: log the message but don't request
22# on_headers: log the chain tip
23# on_pong: update ping response map (for synchronization)
24# on_getheaders: provide headers via BlockStore
25# on_getdata: provide blocks via BlockStore
26
27global mininode_lock
28
29class RejectResult(object):
30    '''
31    Outcome that expects rejection of a transaction or block.
32    '''
33    def __init__(self, code, reason=b''):
34        self.code = code
35        self.reason = reason
36    def match(self, other):
37        if self.code != other.code:
38            return False
39        return other.reason.startswith(self.reason)
40    def __repr__(self):
41        return '%i:%s' % (self.code,self.reason or '*')
42
43class TestNode(NodeConnCB):
44
45    def __init__(self, block_store, tx_store):
46        NodeConnCB.__init__(self)
47        self.conn = None
48        self.bestblockhash = None
49        self.block_store = block_store
50        self.block_request_map = {}
51        self.tx_store = tx_store
52        self.tx_request_map = {}
53        self.block_reject_map = {}
54        self.tx_reject_map = {}
55
56        # When the pingmap is non-empty we're waiting for
57        # a response
58        self.pingMap = {}
59        self.lastInv = []
60        self.closed = False
61
62    def on_close(self, conn):
63        self.closed = True
64
65    def add_connection(self, conn):
66        self.conn = conn
67
68    def on_headers(self, conn, message):
69        if len(message.headers) > 0:
70            best_header = message.headers[-1]
71            best_header.calc_sha256()
72            self.bestblockhash = best_header.sha256
73
74    def on_getheaders(self, conn, message):
75        response = self.block_store.headers_for(message.locator, message.hashstop)
76        if response is not None:
77            conn.send_message(response)
78
79    def on_getdata(self, conn, message):
80        [conn.send_message(r) for r in self.block_store.get_blocks(message.inv)]
81        [conn.send_message(r) for r in self.tx_store.get_transactions(message.inv)]
82
83        for i in message.inv:
84            if i.type == 1:
85                self.tx_request_map[i.hash] = True
86            elif i.type == 2:
87                self.block_request_map[i.hash] = True
88
89    def on_inv(self, conn, message):
90        self.lastInv = [x.hash for x in message.inv]
91
92    def on_pong(self, conn, message):
93        try:
94            del self.pingMap[message.nonce]
95        except KeyError:
96            raise AssertionError("Got pong for unknown ping [%s]" % repr(message))
97
98    def on_reject(self, conn, message):
99        if message.message == b'tx':
100            self.tx_reject_map[message.data] = RejectResult(message.code, message.reason)
101        if message.message == b'block':
102            self.block_reject_map[message.data] = RejectResult(message.code, message.reason)
103
104    def send_inv(self, obj):
105        mtype = 2 if isinstance(obj, CBlock) else 1
106        self.conn.send_message(msg_inv([CInv(mtype, obj.sha256)]))
107
108    def send_getheaders(self):
109        # We ask for headers from their last tip.
110        m = msg_getheaders()
111        m.locator = self.block_store.get_locator(self.bestblockhash)
112        self.conn.send_message(m)
113
114    # This assumes BIP31
115    def send_ping(self, nonce):
116        self.pingMap[nonce] = True
117        self.conn.send_message(msg_ping(nonce))
118
119    def received_ping_response(self, nonce):
120        return nonce not in self.pingMap
121
122    def send_mempool(self):
123        self.lastInv = []
124        self.conn.send_message(msg_mempool())
125
126# TestInstance:
127#
128# Instances of these are generated by the test generator, and fed into the
129# comptool.
130#
131# "blocks_and_transactions" should be an array of
132#    [obj, True/False/None, hash/None]:
133#  - obj is either a CBlock, CBlockHeader, or a CTransaction, and
134#  - the second value indicates whether the object should be accepted
135#    into the blockchain or mempool (for tests where we expect a certain
136#    answer), or "None" if we don't expect a certain answer and are just
137#    comparing the behavior of the nodes being tested.
138#  - the third value is the hash to test the tip against (if None or omitted,
139#    use the hash of the block)
140#  - NOTE: if a block header, no test is performed; instead the header is
141#    just added to the block_store.  This is to facilitate block delivery
142#    when communicating with headers-first clients (when withholding an
143#    intermediate block).
144# sync_every_block: if True, then each block will be inv'ed, synced, and
145#    nodes will be tested based on the outcome for the block.  If False,
146#    then inv's accumulate until all blocks are processed (or max inv size
147#    is reached) and then sent out in one inv message.  Then the final block
148#    will be synced across all connections, and the outcome of the final
149#    block will be tested.
150# sync_every_tx: analogous to behavior for sync_every_block, except if outcome
151#    on the final tx is None, then contents of entire mempool are compared
152#    across all connections.  (If outcome of final tx is specified as true
153#    or false, then only the last tx is tested against outcome.)
154
155class TestInstance(object):
156    def __init__(self, objects=None, sync_every_block=True, sync_every_tx=False):
157        self.blocks_and_transactions = objects if objects else []
158        self.sync_every_block = sync_every_block
159        self.sync_every_tx = sync_every_tx
160
161class TestManager(object):
162
163    def __init__(self, testgen, datadir):
164        self.test_generator = testgen
165        self.connections    = []
166        self.test_nodes     = []
167        self.block_store    = BlockStore(datadir)
168        self.tx_store       = TxStore(datadir)
169        self.ping_counter   = 1
170
171    def add_all_connections(self, nodes):
172        for i in range(len(nodes)):
173            # Create a p2p connection to each node
174            test_node = TestNode(self.block_store, self.tx_store)
175            self.test_nodes.append(test_node)
176            self.connections.append(NodeConn('127.0.0.1', p2p_port(i), nodes[i], test_node))
177            # Make sure the TestNode (callback class) has a reference to its
178            # associated NodeConn
179            test_node.add_connection(self.connections[-1])
180
181    def clear_all_connections(self):
182        self.connections    = []
183        self.test_nodes     = []
184
185    def wait_for_disconnections(self):
186        def disconnected():
187            return all(node.closed for node in self.test_nodes)
188        return wait_until(disconnected, timeout=10)
189
190    def wait_for_verack(self):
191        def veracked():
192            return all(node.verack_received for node in self.test_nodes)
193        return wait_until(veracked, timeout=10)
194
195    def wait_for_pings(self, counter):
196        def received_pongs():
197            return all(node.received_ping_response(counter) for node in self.test_nodes)
198        return wait_until(received_pongs)
199
200    # sync_blocks: Wait for all connections to request the blockhash given
201    # then send get_headers to find out the tip of each node, and synchronize
202    # the response by using a ping (and waiting for pong with same nonce).
203    def sync_blocks(self, blockhash, num_blocks):
204        def blocks_requested():
205            return all(
206                blockhash in node.block_request_map and node.block_request_map[blockhash]
207                for node in self.test_nodes
208            )
209
210        # --> error if not requested
211        if not wait_until(blocks_requested, attempts=20*num_blocks):
212            # print [ c.cb.block_request_map for c in self.connections ]
213            raise AssertionError("Not all nodes requested block")
214
215        # Send getheaders message
216        [ c.cb.send_getheaders() for c in self.connections ]
217
218        # Send ping and wait for response -- synchronization hack
219        [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
220        self.wait_for_pings(self.ping_counter)
221        self.ping_counter += 1
222
223    # Analogous to sync_block (see above)
224    def sync_transaction(self, txhash, num_events):
225        # Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
226        def transaction_requested():
227            return all(
228                txhash in node.tx_request_map and node.tx_request_map[txhash]
229                for node in self.test_nodes
230            )
231
232        # --> error if not requested
233        if not wait_until(transaction_requested, attempts=20*num_events):
234            # print [ c.cb.tx_request_map for c in self.connections ]
235            raise AssertionError("Not all nodes requested transaction")
236
237        # Get the mempool
238        [ c.cb.send_mempool() for c in self.connections ]
239
240        # Send ping and wait for response -- synchronization hack
241        [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
242        self.wait_for_pings(self.ping_counter)
243        self.ping_counter += 1
244
245        # Sort inv responses from each node
246        with mininode_lock:
247            [ c.cb.lastInv.sort() for c in self.connections ]
248
249    # Verify that the tip of each connection all agree with each other, and
250    # with the expected outcome (if given)
251    def check_results(self, blockhash, outcome):
252        with mininode_lock:
253            for c in self.connections:
254                if outcome is None:
255                    if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
256                        return False
257                elif isinstance(outcome, RejectResult): # Check that block was rejected w/ code
258                    if c.cb.bestblockhash == blockhash:
259                        return False
260                    if blockhash not in c.cb.block_reject_map:
261                        print('Block not in reject map: %064x' % (blockhash))
262                        return False
263                    if not outcome.match(c.cb.block_reject_map[blockhash]):
264                        print('Block rejected with %s instead of expected %s: %064x' % (c.cb.block_reject_map[blockhash], outcome, blockhash))
265                        return False
266                elif ((c.cb.bestblockhash == blockhash) != outcome):
267                    # print c.cb.bestblockhash, blockhash, outcome
268                    return False
269            return True
270
271    # Either check that the mempools all agree with each other, or that
272    # txhash's presence in the mempool matches the outcome specified.
273    # This is somewhat of a strange comparison, in that we're either comparing
274    # a particular tx to an outcome, or the entire mempools altogether;
275    # perhaps it would be useful to add the ability to check explicitly that
276    # a particular tx's existence in the mempool is the same across all nodes.
277    def check_mempool(self, txhash, outcome):
278        with mininode_lock:
279            for c in self.connections:
280                if outcome is None:
281                    # Make sure the mempools agree with each other
282                    if c.cb.lastInv != self.connections[0].cb.lastInv:
283                        # print c.rpc.getrawmempool()
284                        return False
285                elif isinstance(outcome, RejectResult): # Check that tx was rejected w/ code
286                    if txhash in c.cb.lastInv:
287                        return False
288                    if txhash not in c.cb.tx_reject_map:
289                        print('Tx not in reject map: %064x' % (txhash))
290                        return False
291                    if not outcome.match(c.cb.tx_reject_map[txhash]):
292                        print('Tx rejected with %s instead of expected %s: %064x' % (c.cb.tx_reject_map[txhash], outcome, txhash))
293                        return False
294                elif ((txhash in c.cb.lastInv) != outcome):
295                    # print c.rpc.getrawmempool(), c.cb.lastInv
296                    return False
297            return True
298
299    def run(self):
300        # Wait until verack is received
301        self.wait_for_verack()
302
303        test_number = 1
304        for test_instance in self.test_generator.get_tests():
305            # We use these variables to keep track of the last block
306            # and last transaction in the tests, which are used
307            # if we're not syncing on every block or every tx.
308            [ block, block_outcome, tip ] = [ None, None, None ]
309            [ tx, tx_outcome ] = [ None, None ]
310            invqueue = []
311
312            for test_obj in test_instance.blocks_and_transactions:
313                b_or_t = test_obj[0]
314                outcome = test_obj[1]
315                # Determine if we're dealing with a block or tx
316                if isinstance(b_or_t, CBlock):  # Block test runner
317                    block = b_or_t
318                    block_outcome = outcome
319                    tip = block.sha256
320                    # each test_obj can have an optional third argument
321                    # to specify the tip we should compare with
322                    # (default is to use the block being tested)
323                    if len(test_obj) >= 3:
324                        tip = test_obj[2]
325
326                    # Add to shared block_store, set as current block
327                    # If there was an open getdata request for the block
328                    # previously, and we didn't have an entry in the
329                    # block_store, then immediately deliver, because the
330                    # node wouldn't send another getdata request while
331                    # the earlier one is outstanding.
332                    first_block_with_hash = True
333                    if self.block_store.get(block.sha256) is not None:
334                        first_block_with_hash = False
335                    with mininode_lock:
336                        self.block_store.add_block(block)
337                        for c in self.connections:
338                            if first_block_with_hash and block.sha256 in c.cb.block_request_map and c.cb.block_request_map[block.sha256] == True:
339                                # There was a previous request for this block hash
340                                # Most likely, we delivered a header for this block
341                                # but never had the block to respond to the getdata
342                                c.send_message(msg_block(block))
343                            else:
344                                c.cb.block_request_map[block.sha256] = False
345                    # Either send inv's to each node and sync, or add
346                    # to invqueue for later inv'ing.
347                    if (test_instance.sync_every_block):
348                        [ c.cb.send_inv(block) for c in self.connections ]
349                        self.sync_blocks(block.sha256, 1)
350                        if (not self.check_results(tip, outcome)):
351                            raise AssertionError("Test failed at test %d" % test_number)
352                    else:
353                        invqueue.append(CInv(2, block.sha256))
354                elif isinstance(b_or_t, CBlockHeader):
355                    block_header = b_or_t
356                    self.block_store.add_header(block_header)
357                else:  # Tx test runner
358                    assert(isinstance(b_or_t, CTransaction))
359                    tx = b_or_t
360                    tx_outcome = outcome
361                    # Add to shared tx store and clear map entry
362                    with mininode_lock:
363                        self.tx_store.add_transaction(tx)
364                        for c in self.connections:
365                            c.cb.tx_request_map[tx.sha256] = False
366                    # Again, either inv to all nodes or save for later
367                    if (test_instance.sync_every_tx):
368                        [ c.cb.send_inv(tx) for c in self.connections ]
369                        self.sync_transaction(tx.sha256, 1)
370                        if (not self.check_mempool(tx.sha256, outcome)):
371                            raise AssertionError("Test failed at test %d" % test_number)
372                    else:
373                        invqueue.append(CInv(1, tx.sha256))
374                # Ensure we're not overflowing the inv queue
375                if len(invqueue) == MAX_INV_SZ:
376                    [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
377                    invqueue = []
378
379            # Do final sync if we weren't syncing on every block or every tx.
380            if (not test_instance.sync_every_block and block is not None):
381                if len(invqueue) > 0:
382                    [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
383                    invqueue = []
384                self.sync_blocks(block.sha256, len(test_instance.blocks_and_transactions))
385                if (not self.check_results(tip, block_outcome)):
386                    raise AssertionError("Block test failed at test %d" % test_number)
387            if (not test_instance.sync_every_tx and tx is not None):
388                if len(invqueue) > 0:
389                    [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
390                    invqueue = []
391                self.sync_transaction(tx.sha256, len(test_instance.blocks_and_transactions))
392                if (not self.check_mempool(tx.sha256, tx_outcome)):
393                    raise AssertionError("Mempool test failed at test %d" % test_number)
394
395            print("Test %d: PASS" % test_number, [ c.rpc.getblockcount() for c in self.connections ])
396            test_number += 1
397
398        [ c.disconnect_node() for c in self.connections ]
399        self.wait_for_disconnections()
400        self.block_store.close()
401        self.tx_store.close()
402