1#!/usr/bin/env python3 2# Copyright (c) 2015-2018 The Bitcoin Core developers 3# Distributed under the MIT software license, see the accompanying 4# file COPYING or http://www.opensource.org/licenses/mit-license.php. 5"""Test the ZMQ notification interface.""" 6import struct 7 8from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE 9from test_framework.test_framework import BitcoinTestFramework 10from test_framework.messages import CTransaction 11from test_framework.util import ( 12 assert_equal, 13 bytes_to_hex_str, 14 hash256, 15) 16from io import BytesIO 17 18ADDRESS = "tcp://127.0.0.1:28332" 19 20class ZMQSubscriber: 21 def __init__(self, socket, topic): 22 self.sequence = 0 23 self.socket = socket 24 self.topic = topic 25 26 import zmq 27 self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) 28 29 def receive(self): 30 topic, body, seq = self.socket.recv_multipart() 31 # Topic should match the subscriber topic. 32 assert_equal(topic, self.topic) 33 # Sequence should be incremental. 34 assert_equal(struct.unpack('<I', seq)[-1], self.sequence) 35 self.sequence += 1 36 return body 37 38 39class ZMQTest (BitcoinTestFramework): 40 def set_test_params(self): 41 self.num_nodes = 2 42 43 def skip_test_if_missing_module(self): 44 self.skip_if_no_py3_zmq() 45 self.skip_if_no_bitcoind_zmq() 46 47 def setup_nodes(self): 48 import zmq 49 50 # Initialize ZMQ context and socket. 51 # All messages are received in the same socket which means 52 # that this test fails if the publishing order changes. 53 # Note that the publishing order is not defined in the documentation and 54 # is subject to change. 55 self.zmq_context = zmq.Context() 56 socket = self.zmq_context.socket(zmq.SUB) 57 socket.set(zmq.RCVTIMEO, 60000) 58 socket.connect(ADDRESS) 59 60 # Subscribe to all available topics. 61 self.hashblock = ZMQSubscriber(socket, b"hashblock") 62 self.hashtx = ZMQSubscriber(socket, b"hashtx") 63 self.rawblock = ZMQSubscriber(socket, b"rawblock") 64 self.rawtx = ZMQSubscriber(socket, b"rawtx") 65 66 self.extra_args = [ 67 ["-zmqpub%s=%s" % (sub.topic.decode(), ADDRESS) for sub in [self.hashblock, self.hashtx, self.rawblock, self.rawtx]], 68 [], 69 ] 70 self.add_nodes(self.num_nodes, self.extra_args) 71 self.start_nodes() 72 self.import_deterministic_coinbase_privkeys() 73 74 def run_test(self): 75 try: 76 self._zmq_test() 77 finally: 78 # Destroy the ZMQ context. 79 self.log.debug("Destroying ZMQ context") 80 self.zmq_context.destroy(linger=None) 81 82 def _zmq_test(self): 83 num_blocks = 5 84 self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks}) 85 genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE) 86 self.sync_all() 87 88 for x in range(num_blocks): 89 # Should receive the coinbase txid. 90 txid = self.hashtx.receive() 91 92 # Should receive the coinbase raw transaction. 93 hex = self.rawtx.receive() 94 tx = CTransaction() 95 tx.deserialize(BytesIO(hex)) 96 tx.calc_sha256() 97 assert_equal(tx.hash, bytes_to_hex_str(txid)) 98 99 # Should receive the generated block hash. 100 hash = bytes_to_hex_str(self.hashblock.receive()) 101 assert_equal(genhashes[x], hash) 102 # The block should only have the coinbase txid. 103 assert_equal([bytes_to_hex_str(txid)], self.nodes[1].getblock(hash)["tx"]) 104 105 # Should receive the generated raw block. 106 block = self.rawblock.receive() 107 assert_equal(genhashes[x], bytes_to_hex_str(hash256(block[:80]))) 108 109 if self.is_wallet_compiled(): 110 self.log.info("Wait for tx from second node") 111 payment_txid = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) 112 self.sync_all() 113 114 # Should receive the broadcasted txid. 115 txid = self.hashtx.receive() 116 assert_equal(payment_txid, bytes_to_hex_str(txid)) 117 118 # Should receive the broadcasted raw transaction. 119 hex = self.rawtx.receive() 120 assert_equal(payment_txid, bytes_to_hex_str(hash256(hex))) 121 122 123 self.log.info("Test the getzmqnotifications RPC") 124 assert_equal(self.nodes[0].getzmqnotifications(), [ 125 {"type": "pubhashblock", "address": ADDRESS, "hwm": 1000}, 126 {"type": "pubhashtx", "address": ADDRESS, "hwm": 1000}, 127 {"type": "pubrawblock", "address": ADDRESS, "hwm": 1000}, 128 {"type": "pubrawtx", "address": ADDRESS, "hwm": 1000}, 129 ]) 130 131 assert_equal(self.nodes[1].getzmqnotifications(), []) 132 133if __name__ == '__main__': 134 ZMQTest().main() 135