1#!/usr/bin/env python3 2# Copyright (c) 2015-2016 The Bitcoin Core developers 3# Distributed under the MIT software license, see the accompanying 4# file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 6from .mininode import * 7from .blockstore import BlockStore, TxStore 8from .util import p2p_port 9 10''' 11This is a tool for comparing two or more bitcoinds to each other 12using a script provided. 13 14To use, create a class that implements get_tests(), and pass it in 15as the test generator to TestManager. get_tests() should be a python 16generator that returns TestInstance objects. See below for definition. 17''' 18 19# TestNode behaves as follows: 20# Configure with a BlockStore and TxStore 21# on_inv: log the message but don't request 22# on_headers: log the chain tip 23# on_pong: update ping response map (for synchronization) 24# on_getheaders: provide headers via BlockStore 25# on_getdata: provide blocks via BlockStore 26 27global mininode_lock 28 29class RejectResult(object): 30 ''' 31 Outcome that expects rejection of a transaction or block. 32 ''' 33 def __init__(self, code, reason=b''): 34 self.code = code 35 self.reason = reason 36 def match(self, other): 37 if self.code != other.code: 38 return False 39 return other.reason.startswith(self.reason) 40 def __repr__(self): 41 return '%i:%s' % (self.code,self.reason or '*') 42 43class TestNode(NodeConnCB): 44 45 def __init__(self, block_store, tx_store): 46 NodeConnCB.__init__(self) 47 self.conn = None 48 self.bestblockhash = None 49 self.block_store = block_store 50 self.block_request_map = {} 51 self.tx_store = tx_store 52 self.tx_request_map = {} 53 self.block_reject_map = {} 54 self.tx_reject_map = {} 55 56 # When the pingmap is non-empty we're waiting for 57 # a response 58 self.pingMap = {} 59 self.lastInv = [] 60 self.closed = False 61 62 def on_close(self, conn): 63 self.closed = True 64 65 def add_connection(self, conn): 66 self.conn = conn 67 68 def on_headers(self, conn, message): 69 if len(message.headers) > 0: 70 best_header = message.headers[-1] 71 best_header.calc_sha256() 72 self.bestblockhash = best_header.sha256 73 74 def on_getheaders(self, conn, message): 75 response = self.block_store.headers_for(message.locator, message.hashstop) 76 if response is not None: 77 conn.send_message(response) 78 79 def on_getdata(self, conn, message): 80 [conn.send_message(r) for r in self.block_store.get_blocks(message.inv)] 81 [conn.send_message(r) for r in self.tx_store.get_transactions(message.inv)] 82 83 for i in message.inv: 84 if i.type == 1: 85 self.tx_request_map[i.hash] = True 86 elif i.type == 2: 87 self.block_request_map[i.hash] = True 88 89 def on_inv(self, conn, message): 90 self.lastInv = [x.hash for x in message.inv] 91 92 def on_pong(self, conn, message): 93 try: 94 del self.pingMap[message.nonce] 95 except KeyError: 96 raise AssertionError("Got pong for unknown ping [%s]" % repr(message)) 97 98 def on_reject(self, conn, message): 99 if message.message == b'tx': 100 self.tx_reject_map[message.data] = RejectResult(message.code, message.reason) 101 if message.message == b'block': 102 self.block_reject_map[message.data] = RejectResult(message.code, message.reason) 103 104 def send_inv(self, obj): 105 mtype = 2 if isinstance(obj, CBlock) else 1 106 self.conn.send_message(msg_inv([CInv(mtype, obj.sha256)])) 107 108 def send_getheaders(self): 109 # We ask for headers from their last tip. 110 m = msg_getheaders() 111 m.locator = self.block_store.get_locator(self.bestblockhash) 112 self.conn.send_message(m) 113 114 # This assumes BIP31 115 def send_ping(self, nonce): 116 self.pingMap[nonce] = True 117 self.conn.send_message(msg_ping(nonce)) 118 119 def received_ping_response(self, nonce): 120 return nonce not in self.pingMap 121 122 def send_mempool(self): 123 self.lastInv = [] 124 self.conn.send_message(msg_mempool()) 125 126# TestInstance: 127# 128# Instances of these are generated by the test generator, and fed into the 129# comptool. 130# 131# "blocks_and_transactions" should be an array of 132# [obj, True/False/None, hash/None]: 133# - obj is either a CBlock, CBlockHeader, or a CTransaction, and 134# - the second value indicates whether the object should be accepted 135# into the blockchain or mempool (for tests where we expect a certain 136# answer), or "None" if we don't expect a certain answer and are just 137# comparing the behavior of the nodes being tested. 138# - the third value is the hash to test the tip against (if None or omitted, 139# use the hash of the block) 140# - NOTE: if a block header, no test is performed; instead the header is 141# just added to the block_store. This is to facilitate block delivery 142# when communicating with headers-first clients (when withholding an 143# intermediate block). 144# sync_every_block: if True, then each block will be inv'ed, synced, and 145# nodes will be tested based on the outcome for the block. If False, 146# then inv's accumulate until all blocks are processed (or max inv size 147# is reached) and then sent out in one inv message. Then the final block 148# will be synced across all connections, and the outcome of the final 149# block will be tested. 150# sync_every_tx: analogous to behavior for sync_every_block, except if outcome 151# on the final tx is None, then contents of entire mempool are compared 152# across all connections. (If outcome of final tx is specified as true 153# or false, then only the last tx is tested against outcome.) 154 155class TestInstance(object): 156 def __init__(self, objects=None, sync_every_block=True, sync_every_tx=False): 157 self.blocks_and_transactions = objects if objects else [] 158 self.sync_every_block = sync_every_block 159 self.sync_every_tx = sync_every_tx 160 161class TestManager(object): 162 163 def __init__(self, testgen, datadir): 164 self.test_generator = testgen 165 self.connections = [] 166 self.test_nodes = [] 167 self.block_store = BlockStore(datadir) 168 self.tx_store = TxStore(datadir) 169 self.ping_counter = 1 170 171 def add_all_connections(self, nodes): 172 for i in range(len(nodes)): 173 # Create a p2p connection to each node 174 test_node = TestNode(self.block_store, self.tx_store) 175 self.test_nodes.append(test_node) 176 self.connections.append(NodeConn('127.0.0.1', p2p_port(i), nodes[i], test_node)) 177 # Make sure the TestNode (callback class) has a reference to its 178 # associated NodeConn 179 test_node.add_connection(self.connections[-1]) 180 181 def clear_all_connections(self): 182 self.connections = [] 183 self.test_nodes = [] 184 185 def wait_for_disconnections(self): 186 def disconnected(): 187 return all(node.closed for node in self.test_nodes) 188 return wait_until(disconnected, timeout=10) 189 190 def wait_for_verack(self): 191 def veracked(): 192 return all(node.verack_received for node in self.test_nodes) 193 return wait_until(veracked, timeout=10) 194 195 def wait_for_pings(self, counter): 196 def received_pongs(): 197 return all(node.received_ping_response(counter) for node in self.test_nodes) 198 return wait_until(received_pongs) 199 200 # sync_blocks: Wait for all connections to request the blockhash given 201 # then send get_headers to find out the tip of each node, and synchronize 202 # the response by using a ping (and waiting for pong with same nonce). 203 def sync_blocks(self, blockhash, num_blocks): 204 def blocks_requested(): 205 return all( 206 blockhash in node.block_request_map and node.block_request_map[blockhash] 207 for node in self.test_nodes 208 ) 209 210 # --> error if not requested 211 if not wait_until(blocks_requested, attempts=20*num_blocks): 212 # print [ c.cb.block_request_map for c in self.connections ] 213 raise AssertionError("Not all nodes requested block") 214 215 # Send getheaders message 216 [ c.cb.send_getheaders() for c in self.connections ] 217 218 # Send ping and wait for response -- synchronization hack 219 [ c.cb.send_ping(self.ping_counter) for c in self.connections ] 220 self.wait_for_pings(self.ping_counter) 221 self.ping_counter += 1 222 223 # Analogous to sync_block (see above) 224 def sync_transaction(self, txhash, num_events): 225 # Wait for nodes to request transaction (50ms sleep * 20 tries * num_events) 226 def transaction_requested(): 227 return all( 228 txhash in node.tx_request_map and node.tx_request_map[txhash] 229 for node in self.test_nodes 230 ) 231 232 # --> error if not requested 233 if not wait_until(transaction_requested, attempts=20*num_events): 234 # print [ c.cb.tx_request_map for c in self.connections ] 235 raise AssertionError("Not all nodes requested transaction") 236 237 # Get the mempool 238 [ c.cb.send_mempool() for c in self.connections ] 239 240 # Send ping and wait for response -- synchronization hack 241 [ c.cb.send_ping(self.ping_counter) for c in self.connections ] 242 self.wait_for_pings(self.ping_counter) 243 self.ping_counter += 1 244 245 # Sort inv responses from each node 246 with mininode_lock: 247 [ c.cb.lastInv.sort() for c in self.connections ] 248 249 # Verify that the tip of each connection all agree with each other, and 250 # with the expected outcome (if given) 251 def check_results(self, blockhash, outcome): 252 with mininode_lock: 253 for c in self.connections: 254 if outcome is None: 255 if c.cb.bestblockhash != self.connections[0].cb.bestblockhash: 256 return False 257 elif isinstance(outcome, RejectResult): # Check that block was rejected w/ code 258 if c.cb.bestblockhash == blockhash: 259 return False 260 if blockhash not in c.cb.block_reject_map: 261 print('Block not in reject map: %064x' % (blockhash)) 262 return False 263 if not outcome.match(c.cb.block_reject_map[blockhash]): 264 print('Block rejected with %s instead of expected %s: %064x' % (c.cb.block_reject_map[blockhash], outcome, blockhash)) 265 return False 266 elif ((c.cb.bestblockhash == blockhash) != outcome): 267 # print c.cb.bestblockhash, blockhash, outcome 268 return False 269 return True 270 271 # Either check that the mempools all agree with each other, or that 272 # txhash's presence in the mempool matches the outcome specified. 273 # This is somewhat of a strange comparison, in that we're either comparing 274 # a particular tx to an outcome, or the entire mempools altogether; 275 # perhaps it would be useful to add the ability to check explicitly that 276 # a particular tx's existence in the mempool is the same across all nodes. 277 def check_mempool(self, txhash, outcome): 278 with mininode_lock: 279 for c in self.connections: 280 if outcome is None: 281 # Make sure the mempools agree with each other 282 if c.cb.lastInv != self.connections[0].cb.lastInv: 283 # print c.rpc.getrawmempool() 284 return False 285 elif isinstance(outcome, RejectResult): # Check that tx was rejected w/ code 286 if txhash in c.cb.lastInv: 287 return False 288 if txhash not in c.cb.tx_reject_map: 289 print('Tx not in reject map: %064x' % (txhash)) 290 return False 291 if not outcome.match(c.cb.tx_reject_map[txhash]): 292 print('Tx rejected with %s instead of expected %s: %064x' % (c.cb.tx_reject_map[txhash], outcome, txhash)) 293 return False 294 elif ((txhash in c.cb.lastInv) != outcome): 295 # print c.rpc.getrawmempool(), c.cb.lastInv 296 return False 297 return True 298 299 def run(self): 300 # Wait until verack is received 301 self.wait_for_verack() 302 303 test_number = 1 304 for test_instance in self.test_generator.get_tests(): 305 # We use these variables to keep track of the last block 306 # and last transaction in the tests, which are used 307 # if we're not syncing on every block or every tx. 308 [ block, block_outcome, tip ] = [ None, None, None ] 309 [ tx, tx_outcome ] = [ None, None ] 310 invqueue = [] 311 312 for test_obj in test_instance.blocks_and_transactions: 313 b_or_t = test_obj[0] 314 outcome = test_obj[1] 315 # Determine if we're dealing with a block or tx 316 if isinstance(b_or_t, CBlock): # Block test runner 317 block = b_or_t 318 block_outcome = outcome 319 tip = block.sha256 320 # each test_obj can have an optional third argument 321 # to specify the tip we should compare with 322 # (default is to use the block being tested) 323 if len(test_obj) >= 3: 324 tip = test_obj[2] 325 326 # Add to shared block_store, set as current block 327 # If there was an open getdata request for the block 328 # previously, and we didn't have an entry in the 329 # block_store, then immediately deliver, because the 330 # node wouldn't send another getdata request while 331 # the earlier one is outstanding. 332 first_block_with_hash = True 333 if self.block_store.get(block.sha256) is not None: 334 first_block_with_hash = False 335 with mininode_lock: 336 self.block_store.add_block(block) 337 for c in self.connections: 338 if first_block_with_hash and block.sha256 in c.cb.block_request_map and c.cb.block_request_map[block.sha256] == True: 339 # There was a previous request for this block hash 340 # Most likely, we delivered a header for this block 341 # but never had the block to respond to the getdata 342 c.send_message(msg_block(block)) 343 else: 344 c.cb.block_request_map[block.sha256] = False 345 # Either send inv's to each node and sync, or add 346 # to invqueue for later inv'ing. 347 if (test_instance.sync_every_block): 348 [ c.cb.send_inv(block) for c in self.connections ] 349 self.sync_blocks(block.sha256, 1) 350 if (not self.check_results(tip, outcome)): 351 raise AssertionError("Test failed at test %d" % test_number) 352 else: 353 invqueue.append(CInv(2, block.sha256)) 354 elif isinstance(b_or_t, CBlockHeader): 355 block_header = b_or_t 356 self.block_store.add_header(block_header) 357 else: # Tx test runner 358 assert(isinstance(b_or_t, CTransaction)) 359 tx = b_or_t 360 tx_outcome = outcome 361 # Add to shared tx store and clear map entry 362 with mininode_lock: 363 self.tx_store.add_transaction(tx) 364 for c in self.connections: 365 c.cb.tx_request_map[tx.sha256] = False 366 # Again, either inv to all nodes or save for later 367 if (test_instance.sync_every_tx): 368 [ c.cb.send_inv(tx) for c in self.connections ] 369 self.sync_transaction(tx.sha256, 1) 370 if (not self.check_mempool(tx.sha256, outcome)): 371 raise AssertionError("Test failed at test %d" % test_number) 372 else: 373 invqueue.append(CInv(1, tx.sha256)) 374 # Ensure we're not overflowing the inv queue 375 if len(invqueue) == MAX_INV_SZ: 376 [ c.send_message(msg_inv(invqueue)) for c in self.connections ] 377 invqueue = [] 378 379 # Do final sync if we weren't syncing on every block or every tx. 380 if (not test_instance.sync_every_block and block is not None): 381 if len(invqueue) > 0: 382 [ c.send_message(msg_inv(invqueue)) for c in self.connections ] 383 invqueue = [] 384 self.sync_blocks(block.sha256, len(test_instance.blocks_and_transactions)) 385 if (not self.check_results(tip, block_outcome)): 386 raise AssertionError("Block test failed at test %d" % test_number) 387 if (not test_instance.sync_every_tx and tx is not None): 388 if len(invqueue) > 0: 389 [ c.send_message(msg_inv(invqueue)) for c in self.connections ] 390 invqueue = [] 391 self.sync_transaction(tx.sha256, len(test_instance.blocks_and_transactions)) 392 if (not self.check_mempool(tx.sha256, tx_outcome)): 393 raise AssertionError("Mempool test failed at test %d" % test_number) 394 395 print("Test %d: PASS" % test_number, [ c.rpc.getblockcount() for c in self.connections ]) 396 test_number += 1 397 398 [ c.disconnect_node() for c in self.connections ] 399 self.wait_for_disconnections() 400 self.block_store.close() 401 self.tx_store.close() 402