1#!/usr/bin/env python3 2# Copyright (c) 2015-2019 The Bitcoin Core developers 3# Distributed under the MIT software license, see the accompanying 4# file COPYING or http://www.opensource.org/licenses/mit-license.php. 5"""Test the ZMQ notification interface.""" 6import struct 7 8from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE, ADDRESS_BCRT1_P2WSH_OP_TRUE 9from test_framework.blocktools import create_block, create_coinbase, add_witness_commitment 10from test_framework.test_framework import BitcoinTestFramework 11from test_framework.messages import CTransaction, hash256, FromHex 12from test_framework.util import ( 13 assert_equal, 14 assert_raises_rpc_error, 15) 16from io import BytesIO 17from time import sleep 18 19# Test may be skipped and not have zmq installed 20try: 21 import zmq 22except ImportError: 23 pass 24 25def hash256_reversed(byte_str): 26 return hash256(byte_str)[::-1] 27 28class ZMQSubscriber: 29 def __init__(self, socket, topic): 30 self.sequence = 0 31 self.socket = socket 32 self.topic = topic 33 34 self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) 35 36 def receive(self): 37 topic, body, seq = self.socket.recv_multipart() 38 # Topic should match the subscriber topic. 39 assert_equal(topic, self.topic) 40 # Sequence should be incremental. 41 assert_equal(struct.unpack('<I', seq)[-1], self.sequence) 42 self.sequence += 1 43 return body 44 45 def receive_sequence(self): 46 topic, body, seq = self.socket.recv_multipart() 47 # Topic should match the subscriber topic. 48 assert_equal(topic, self.topic) 49 # Sequence should be incremental. 50 assert_equal(struct.unpack('<I', seq)[-1], self.sequence) 51 self.sequence += 1 52 hash = body[:32].hex() 53 label = chr(body[32]) 54 mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0] 55 if mempool_sequence is not None: 56 assert label == "A" or label == "R" 57 else: 58 assert label == "D" or label == "C" 59 return (hash, label, mempool_sequence) 60 61 62class ZMQTest (BitcoinTestFramework): 63 def set_test_params(self): 64 self.num_nodes = 2 65 66 def skip_test_if_missing_module(self): 67 self.skip_if_no_py3_zmq() 68 self.skip_if_no_bitcoind_zmq() 69 70 def run_test(self): 71 self.ctx = zmq.Context() 72 try: 73 self.test_basic() 74 self.test_sequence() 75 self.test_mempool_sync() 76 self.test_reorg() 77 self.test_multiple_interfaces() 78 finally: 79 # Destroy the ZMQ context. 80 self.log.debug("Destroying ZMQ context") 81 self.ctx.destroy(linger=None) 82 83 def test_basic(self): 84 85 # Invalid zmq arguments don't take down the node, see #17185. 86 self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"]) 87 88 address = 'tcp://127.0.0.1:28332' 89 sockets = [] 90 subs = [] 91 services = [b"hashblock", b"hashtx", b"rawblock", b"rawtx"] 92 for service in services: 93 sockets.append(self.ctx.socket(zmq.SUB)) 94 sockets[-1].set(zmq.RCVTIMEO, 60000) 95 subs.append(ZMQSubscriber(sockets[-1], service)) 96 97 # Subscribe to all available topics. 98 hashblock = subs[0] 99 hashtx = subs[1] 100 rawblock = subs[2] 101 rawtx = subs[3] 102 103 self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]]) 104 self.connect_nodes(0, 1) 105 for socket in sockets: 106 socket.connect(address) 107 108 # Relax so that the subscriber is ready before publishing zmq messages 109 sleep(0.2) 110 111 num_blocks = 5 112 self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks}) 113 genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE) 114 115 self.sync_all() 116 117 for x in range(num_blocks): 118 # Should receive the coinbase txid. 119 txid = hashtx.receive() 120 121 # Should receive the coinbase raw transaction. 122 hex = rawtx.receive() 123 tx = CTransaction() 124 tx.deserialize(BytesIO(hex)) 125 tx.calc_sha256() 126 assert_equal(tx.hash, txid.hex()) 127 128 # Should receive the generated raw block. 129 block = rawblock.receive() 130 assert_equal(genhashes[x], hash256_reversed(block[:80]).hex()) 131 132 # Should receive the generated block hash. 133 hash = hashblock.receive().hex() 134 assert_equal(genhashes[x], hash) 135 # The block should only have the coinbase txid. 136 assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"]) 137 138 139 if self.is_wallet_compiled(): 140 self.log.info("Wait for tx from second node") 141 payment_txid = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) 142 self.sync_all() 143 144 # Should receive the broadcasted txid. 145 txid = hashtx.receive() 146 assert_equal(payment_txid, txid.hex()) 147 148 # Should receive the broadcasted raw transaction. 149 hex = rawtx.receive() 150 assert_equal(payment_txid, hash256_reversed(hex).hex()) 151 152 # Mining the block with this tx should result in second notification 153 # after coinbase tx notification 154 self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) 155 hashtx.receive() 156 txid = hashtx.receive() 157 assert_equal(payment_txid, txid.hex()) 158 159 160 self.log.info("Test the getzmqnotifications RPC") 161 assert_equal(self.nodes[0].getzmqnotifications(), [ 162 {"type": "pubhashblock", "address": address, "hwm": 1000}, 163 {"type": "pubhashtx", "address": address, "hwm": 1000}, 164 {"type": "pubrawblock", "address": address, "hwm": 1000}, 165 {"type": "pubrawtx", "address": address, "hwm": 1000}, 166 ]) 167 168 assert_equal(self.nodes[1].getzmqnotifications(), []) 169 170 def test_reorg(self): 171 if not self.is_wallet_compiled(): 172 self.log.info("Skipping reorg test because wallet is disabled") 173 return 174 175 address = 'tcp://127.0.0.1:28333' 176 177 services = [b"hashblock", b"hashtx"] 178 sockets = [] 179 subs = [] 180 for service in services: 181 sockets.append(self.ctx.socket(zmq.SUB)) 182 # 2 second timeout to check end of notifications 183 sockets[-1].set(zmq.RCVTIMEO, 2000) 184 subs.append(ZMQSubscriber(sockets[-1], service)) 185 186 # Subscribe to all available topics. 187 hashblock = subs[0] 188 hashtx = subs[1] 189 190 # Should only notify the tip if a reorg occurs 191 self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx]]) 192 for socket in sockets: 193 socket.connect(address) 194 # Relax so that the subscriber is ready before publishing zmq messages 195 sleep(0.2) 196 197 # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications 198 payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) 199 disconnect_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0] 200 disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0] 201 assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex()) 202 assert_equal(hashtx.receive().hex(), payment_txid) 203 assert_equal(hashtx.receive().hex(), disconnect_cb) 204 205 # Generate 2 blocks in nodes[1] to a different address to ensure split 206 connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2WSH_OP_TRUE) 207 208 # nodes[0] will reorg chain after connecting back nodes[1] 209 self.connect_nodes(0, 1) 210 self.sync_blocks() # tx in mempool valid but not advertised 211 212 # Should receive nodes[1] tip 213 assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex()) 214 215 # During reorg: 216 # Get old payment transaction notification from disconnect and disconnected cb 217 assert_equal(hashtx.receive().hex(), payment_txid) 218 assert_equal(hashtx.receive().hex(), disconnect_cb) 219 # And the payment transaction again due to mempool entry 220 assert_equal(hashtx.receive().hex(), payment_txid) 221 assert_equal(hashtx.receive().hex(), payment_txid) 222 # And the new connected coinbases 223 for i in [0, 1]: 224 assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0]) 225 226 # If we do a simple invalidate we announce the disconnected coinbase 227 self.nodes[0].invalidateblock(connect_blocks[1]) 228 assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0]) 229 # And the current tip 230 assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0]) 231 232 def test_sequence(self): 233 """ 234 Sequence zmq notifications give every blockhash and txhash in order 235 of processing, regardless of IBD, re-orgs, etc. 236 Format of messages: 237 <32-byte hash>C : Blockhash connected 238 <32-byte hash>D : Blockhash disconnected 239 <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason 240 <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool 241 """ 242 self.log.info("Testing 'sequence' publisher") 243 address = 'tcp://127.0.0.1:28333' 244 socket = self.ctx.socket(zmq.SUB) 245 socket.set(zmq.RCVTIMEO, 60000) 246 seq = ZMQSubscriber(socket, b'sequence') 247 248 self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)]) 249 socket.connect(address) 250 # Relax so that the subscriber is ready before publishing zmq messages 251 sleep(0.2) 252 253 # Mempool sequence number starts at 1 254 seq_num = 1 255 256 # Generate 1 block in nodes[0] and receive all notifications 257 dc_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0] 258 259 # Note: We are not notified of any block transactions, coinbase or mined 260 assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence()) 261 262 # Generate 2 blocks in nodes[1] to a different address to ensure a chain split 263 self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2WSH_OP_TRUE) 264 265 # nodes[0] will reorg chain after connecting back nodes[1] 266 self.connect_nodes(0, 1) 267 268 # Then we receive all block (dis)connect notifications for the 2 block reorg 269 assert_equal((dc_block, "D", None), seq.receive_sequence()) 270 block_count = self.nodes[1].getblockcount() 271 assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence()) 272 assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence()) 273 274 # Rest of test requires wallet functionality 275 if self.is_wallet_compiled(): 276 self.log.info("Wait for tx from second node") 277 payment_txid = self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=5.0, replaceable=True) 278 self.sync_all() 279 self.log.info("Testing sequence notifications with mempool sequence values") 280 281 # Should receive the broadcasted txid. 282 assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) 283 seq_num += 1 284 285 self.log.info("Testing RBF notification") 286 # Replace it to test eviction/addition notification 287 rbf_info = self.nodes[1].bumpfee(payment_txid) 288 self.sync_all() 289 assert_equal((payment_txid, "R", seq_num), seq.receive_sequence()) 290 seq_num += 1 291 assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence()) 292 seq_num += 1 293 294 # Doesn't get published when mined, make a block and tx to "flush" the possibility 295 # though the mempool sequence number does go up by the number of transactions 296 # removed from the mempool by the block mining it. 297 mempool_size = len(self.nodes[0].getrawmempool()) 298 c_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0] 299 self.sync_all() 300 # Make sure the number of mined transactions matches the number of txs out of mempool 301 mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool()) 302 assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta) 303 seq_num += mempool_size_delta 304 payment_txid_2 = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) 305 self.sync_all() 306 assert_equal((c_block, "C", None), seq.receive_sequence()) 307 assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence()) 308 seq_num += 1 309 310 # Spot check getrawmempool results that they only show up when asked for 311 assert type(self.nodes[0].getrawmempool()) is list 312 assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list 313 assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True) 314 assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True) 315 assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num) 316 317 self.log.info("Testing reorg notifications") 318 # Manually invalidate the last block to test mempool re-entry 319 # N.B. This part could be made more lenient in exact ordering 320 # since it greatly depends on inner-workings of blocks/mempool 321 # during "deep" re-orgs. Probably should "re-construct" 322 # blockchain/mempool state from notifications instead. 323 block_count = self.nodes[0].getblockcount() 324 best_hash = self.nodes[0].getbestblockhash() 325 self.nodes[0].invalidateblock(best_hash) 326 sleep(2) # Bit of room to make sure transaction things happened 327 328 # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective 329 # of the time they were gathered. 330 assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num 331 332 assert_equal((best_hash, "D", None), seq.receive_sequence()) 333 assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence()) 334 seq_num += 1 335 336 # Other things may happen but aren't wallet-deterministic so we don't test for them currently 337 self.nodes[0].reconsiderblock(best_hash) 338 self.nodes[1].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) 339 self.sync_all() 340 341 self.log.info("Evict mempool transaction by block conflict") 342 orig_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True) 343 344 # More to be simply mined 345 more_tx = [] 346 for _ in range(5): 347 more_tx.append(self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1)) 348 349 raw_tx = self.nodes[0].getrawtransaction(orig_txid) 350 bump_info = self.nodes[0].bumpfee(orig_txid) 351 # Mine the pre-bump tx 352 block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1)) 353 tx = FromHex(CTransaction(), raw_tx) 354 block.vtx.append(tx) 355 for txid in more_tx: 356 tx = FromHex(CTransaction(), self.nodes[0].getrawtransaction(txid)) 357 block.vtx.append(tx) 358 add_witness_commitment(block) 359 block.solve() 360 assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None) 361 tip = self.nodes[0].getbestblockhash() 362 assert_equal(int(tip, 16), block.sha256) 363 orig_txid_2 = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True) 364 365 # Flush old notifications until evicted tx original entry 366 (hash_str, label, mempool_seq) = seq.receive_sequence() 367 while hash_str != orig_txid: 368 (hash_str, label, mempool_seq) = seq.receive_sequence() 369 mempool_seq += 1 370 371 # Added original tx 372 assert_equal(label, "A") 373 # More transactions to be simply mined 374 for i in range(len(more_tx)): 375 assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence()) 376 mempool_seq += 1 377 # Bumped by rbf 378 assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence()) 379 mempool_seq += 1 380 assert_equal((bump_info["txid"], "A", mempool_seq), seq.receive_sequence()) 381 mempool_seq += 1 382 # Conflict announced first, then block 383 assert_equal((bump_info["txid"], "R", mempool_seq), seq.receive_sequence()) 384 mempool_seq += 1 385 assert_equal((tip, "C", None), seq.receive_sequence()) 386 mempool_seq += len(more_tx) 387 # Last tx 388 assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence()) 389 mempool_seq += 1 390 self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) 391 self.sync_all() # want to make sure we didn't break "consensus" for other tests 392 393 def test_mempool_sync(self): 394 """ 395 Use sequence notification plus getrawmempool sequence results to "sync mempool" 396 """ 397 if not self.is_wallet_compiled(): 398 self.log.info("Skipping mempool sync test") 399 return 400 401 self.log.info("Testing 'mempool sync' usage of sequence notifier") 402 address = 'tcp://127.0.0.1:28333' 403 socket = self.ctx.socket(zmq.SUB) 404 socket.set(zmq.RCVTIMEO, 60000) 405 seq = ZMQSubscriber(socket, b'sequence') 406 407 self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)]) 408 self.connect_nodes(0, 1) 409 socket.connect(address) 410 # Relax so that the subscriber is ready before publishing zmq messages 411 sleep(0.2) 412 413 # In-memory counter, should always start at 1 414 next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] 415 assert_equal(next_mempool_seq, 1) 416 417 # Some transactions have been happening but we aren't consuming zmq notifications yet 418 # or we lost a ZMQ message somehow and want to start over 419 txids = [] 420 num_txs = 5 421 for _ in range(num_txs): 422 txids.append(self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True)) 423 self.sync_all() 424 425 # 1) Consume backlog until we get a mempool sequence number 426 (hash_str, label, zmq_mem_seq) = seq.receive_sequence() 427 while zmq_mem_seq is None: 428 (hash_str, label, zmq_mem_seq) = seq.receive_sequence() 429 430 assert label == "A" or label == "R" 431 assert hash_str is not None 432 433 # 2) We need to "seed" our view of the mempool 434 mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) 435 mempool_view = set(mempool_snapshot["txids"]) 436 get_raw_seq = mempool_snapshot["mempool_sequence"] 437 assert_equal(get_raw_seq, 6) 438 # Snapshot may be too old compared to zmq message we read off latest 439 while zmq_mem_seq >= get_raw_seq: 440 sleep(2) 441 mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) 442 mempool_view = set(mempool_snapshot["txids"]) 443 get_raw_seq = mempool_snapshot["mempool_sequence"] 444 445 # Things continue to happen in the "interim" while waiting for snapshot results 446 # We have node 0 do all these to avoid p2p races with RBF announcements 447 for _ in range(num_txs): 448 txids.append(self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True)) 449 self.nodes[0].bumpfee(txids[-1]) 450 self.sync_all() 451 self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) 452 final_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True) 453 454 # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot 455 while True: 456 if zmq_mem_seq == get_raw_seq - 1: 457 break 458 (hash_str, label, mempool_sequence) = seq.receive_sequence() 459 if mempool_sequence is not None: 460 zmq_mem_seq = mempool_sequence 461 if zmq_mem_seq > get_raw_seq: 462 raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq)) 463 464 # 4) Moving forward, we apply the delta to our local view 465 # remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx 466 expected_sequence = get_raw_seq 467 r_gap = 0 468 for _ in range(num_txs + 2 + 1 + 1): 469 (hash_str, label, mempool_sequence) = seq.receive_sequence() 470 if mempool_sequence is not None: 471 if mempool_sequence != expected_sequence: 472 # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its 473 # position in the incoming block message "C" 474 if label == "R": 475 assert mempool_sequence > expected_sequence 476 r_gap += mempool_sequence - expected_sequence 477 else: 478 raise Exception("WARNING: txhash has unexpected mempool sequence value: {} vs expected {}".format(mempool_sequence, expected_sequence)) 479 if label == "A": 480 assert hash_str not in mempool_view 481 mempool_view.add(hash_str) 482 expected_sequence = mempool_sequence + 1 483 elif label == "R": 484 assert hash_str in mempool_view 485 mempool_view.remove(hash_str) 486 expected_sequence = mempool_sequence + 1 487 elif label == "C": 488 # (Attempt to) remove all txids from known block connects 489 block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] 490 for txid in block_txids: 491 if txid in mempool_view: 492 expected_sequence += 1 493 mempool_view.remove(txid) 494 expected_sequence -= r_gap 495 r_gap = 0 496 elif label == "D": 497 # Not useful for mempool tracking per se 498 continue 499 else: 500 raise Exception("Unexpected ZMQ sequence label!") 501 502 assert_equal(self.nodes[0].getrawmempool(), [final_txid]) 503 assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence) 504 505 # 5) If you miss a zmq/mempool sequence number, go back to step (2) 506 507 self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) 508 509 def test_multiple_interfaces(self): 510 # Set up two subscribers with different addresses 511 subscribers = [] 512 for i in range(2): 513 address = 'tcp://127.0.0.1:%d' % (28334 + i) 514 socket = self.ctx.socket(zmq.SUB) 515 socket.set(zmq.RCVTIMEO, 60000) 516 hashblock = ZMQSubscriber(socket, b"hashblock") 517 socket.connect(address) 518 subscribers.append({'address': address, 'hashblock': hashblock}) 519 520 self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers]) 521 522 # Relax so that the subscriber is ready before publishing zmq messages 523 sleep(0.2) 524 525 # Generate 1 block in nodes[0] and receive all notifications 526 self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) 527 528 # Should receive the same block hash on both subscribers 529 assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex()) 530 assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex()) 531 532if __name__ == '__main__': 533 ZMQTest().main() 534