1#!/usr/bin/env python3
2# Copyright (c) 2015-2019 The Bitcoin Core developers
3# Distributed under the MIT software license, see the accompanying
4# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5"""Test the ZMQ notification interface."""
6import struct
7
8from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE, ADDRESS_BCRT1_P2WSH_OP_TRUE
9from test_framework.blocktools import create_block, create_coinbase, add_witness_commitment
10from test_framework.test_framework import BitcoinTestFramework
11from test_framework.messages import CTransaction, hash256, FromHex
12from test_framework.util import (
13    assert_equal,
14    assert_raises_rpc_error,
15)
16from io import BytesIO
17from time import sleep
18
19# Test may be skipped and not have zmq installed
20try:
21    import zmq
22except ImportError:
23    pass
24
25def hash256_reversed(byte_str):
26    return hash256(byte_str)[::-1]
27
28class ZMQSubscriber:
29    def __init__(self, socket, topic):
30        self.sequence = 0
31        self.socket = socket
32        self.topic = topic
33
34        self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
35
36    def receive(self):
37        topic, body, seq = self.socket.recv_multipart()
38        # Topic should match the subscriber topic.
39        assert_equal(topic, self.topic)
40        # Sequence should be incremental.
41        assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
42        self.sequence += 1
43        return body
44
45    def receive_sequence(self):
46        topic, body, seq = self.socket.recv_multipart()
47        # Topic should match the subscriber topic.
48        assert_equal(topic, self.topic)
49        # Sequence should be incremental.
50        assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
51        self.sequence += 1
52        hash = body[:32].hex()
53        label = chr(body[32])
54        mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
55        if mempool_sequence is not None:
56            assert label == "A" or label == "R"
57        else:
58            assert label == "D" or label == "C"
59        return (hash, label, mempool_sequence)
60
61
62class ZMQTest (BitcoinTestFramework):
63    def set_test_params(self):
64        self.num_nodes = 2
65
66    def skip_test_if_missing_module(self):
67        self.skip_if_no_py3_zmq()
68        self.skip_if_no_bitcoind_zmq()
69
70    def run_test(self):
71        self.ctx = zmq.Context()
72        try:
73            self.test_basic()
74            self.test_sequence()
75            self.test_mempool_sync()
76            self.test_reorg()
77            self.test_multiple_interfaces()
78        finally:
79            # Destroy the ZMQ context.
80            self.log.debug("Destroying ZMQ context")
81            self.ctx.destroy(linger=None)
82
83    def test_basic(self):
84
85        # Invalid zmq arguments don't take down the node, see #17185.
86        self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
87
88        address = 'tcp://127.0.0.1:28332'
89        sockets = []
90        subs = []
91        services = [b"hashblock", b"hashtx", b"rawblock", b"rawtx"]
92        for service in services:
93            sockets.append(self.ctx.socket(zmq.SUB))
94            sockets[-1].set(zmq.RCVTIMEO, 60000)
95            subs.append(ZMQSubscriber(sockets[-1], service))
96
97        # Subscribe to all available topics.
98        hashblock = subs[0]
99        hashtx = subs[1]
100        rawblock = subs[2]
101        rawtx = subs[3]
102
103        self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]])
104        self.connect_nodes(0, 1)
105        for socket in sockets:
106            socket.connect(address)
107
108        # Relax so that the subscriber is ready before publishing zmq messages
109        sleep(0.2)
110
111        num_blocks = 5
112        self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks})
113        genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
114
115        self.sync_all()
116
117        for x in range(num_blocks):
118            # Should receive the coinbase txid.
119            txid = hashtx.receive()
120
121            # Should receive the coinbase raw transaction.
122            hex = rawtx.receive()
123            tx = CTransaction()
124            tx.deserialize(BytesIO(hex))
125            tx.calc_sha256()
126            assert_equal(tx.hash, txid.hex())
127
128            # Should receive the generated raw block.
129            block = rawblock.receive()
130            assert_equal(genhashes[x], hash256_reversed(block[:80]).hex())
131
132            # Should receive the generated block hash.
133            hash = hashblock.receive().hex()
134            assert_equal(genhashes[x], hash)
135            # The block should only have the coinbase txid.
136            assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"])
137
138
139        if self.is_wallet_compiled():
140            self.log.info("Wait for tx from second node")
141            payment_txid = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
142            self.sync_all()
143
144            # Should receive the broadcasted txid.
145            txid = hashtx.receive()
146            assert_equal(payment_txid, txid.hex())
147
148            # Should receive the broadcasted raw transaction.
149            hex = rawtx.receive()
150            assert_equal(payment_txid, hash256_reversed(hex).hex())
151
152            # Mining the block with this tx should result in second notification
153            # after coinbase tx notification
154            self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
155            hashtx.receive()
156            txid = hashtx.receive()
157            assert_equal(payment_txid, txid.hex())
158
159
160        self.log.info("Test the getzmqnotifications RPC")
161        assert_equal(self.nodes[0].getzmqnotifications(), [
162            {"type": "pubhashblock", "address": address, "hwm": 1000},
163            {"type": "pubhashtx", "address": address, "hwm": 1000},
164            {"type": "pubrawblock", "address": address, "hwm": 1000},
165            {"type": "pubrawtx", "address": address, "hwm": 1000},
166        ])
167
168        assert_equal(self.nodes[1].getzmqnotifications(), [])
169
170    def test_reorg(self):
171        if not self.is_wallet_compiled():
172            self.log.info("Skipping reorg test because wallet is disabled")
173            return
174
175        address = 'tcp://127.0.0.1:28333'
176
177        services = [b"hashblock", b"hashtx"]
178        sockets = []
179        subs = []
180        for service in services:
181            sockets.append(self.ctx.socket(zmq.SUB))
182            # 2 second timeout to check end of notifications
183            sockets[-1].set(zmq.RCVTIMEO, 2000)
184            subs.append(ZMQSubscriber(sockets[-1], service))
185
186        # Subscribe to all available topics.
187        hashblock = subs[0]
188        hashtx = subs[1]
189
190        # Should only notify the tip if a reorg occurs
191        self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx]])
192        for socket in sockets:
193            socket.connect(address)
194        # Relax so that the subscriber is ready before publishing zmq messages
195        sleep(0.2)
196
197        # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
198        payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
199        disconnect_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
200        disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0]
201        assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())
202        assert_equal(hashtx.receive().hex(), payment_txid)
203        assert_equal(hashtx.receive().hex(), disconnect_cb)
204
205        # Generate 2 blocks in nodes[1] to a different address to ensure split
206        connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2WSH_OP_TRUE)
207
208        # nodes[0] will reorg chain after connecting back nodes[1]
209        self.connect_nodes(0, 1)
210        self.sync_blocks() # tx in mempool valid but not advertised
211
212        # Should receive nodes[1] tip
213        assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())
214
215        # During reorg:
216        # Get old payment transaction notification from disconnect and disconnected cb
217        assert_equal(hashtx.receive().hex(), payment_txid)
218        assert_equal(hashtx.receive().hex(), disconnect_cb)
219        # And the payment transaction again due to mempool entry
220        assert_equal(hashtx.receive().hex(), payment_txid)
221        assert_equal(hashtx.receive().hex(), payment_txid)
222        # And the new connected coinbases
223        for i in [0, 1]:
224            assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0])
225
226        # If we do a simple invalidate we announce the disconnected coinbase
227        self.nodes[0].invalidateblock(connect_blocks[1])
228        assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0])
229        # And the current tip
230        assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
231
232    def test_sequence(self):
233        """
234        Sequence zmq notifications give every blockhash and txhash in order
235        of processing, regardless of IBD, re-orgs, etc.
236        Format of messages:
237        <32-byte hash>C :                 Blockhash connected
238        <32-byte hash>D :                 Blockhash disconnected
239        <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
240        <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
241        """
242        self.log.info("Testing 'sequence' publisher")
243        address = 'tcp://127.0.0.1:28333'
244        socket = self.ctx.socket(zmq.SUB)
245        socket.set(zmq.RCVTIMEO, 60000)
246        seq = ZMQSubscriber(socket, b'sequence')
247
248        self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
249        socket.connect(address)
250        # Relax so that the subscriber is ready before publishing zmq messages
251        sleep(0.2)
252
253        # Mempool sequence number starts at 1
254        seq_num = 1
255
256        # Generate 1 block in nodes[0] and receive all notifications
257        dc_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
258
259        # Note: We are not notified of any block transactions, coinbase or mined
260        assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())
261
262        # Generate 2 blocks in nodes[1] to a different address to ensure a chain split
263        self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2WSH_OP_TRUE)
264
265        # nodes[0] will reorg chain after connecting back nodes[1]
266        self.connect_nodes(0, 1)
267
268        # Then we receive all block (dis)connect notifications for the 2 block reorg
269        assert_equal((dc_block, "D", None), seq.receive_sequence())
270        block_count = self.nodes[1].getblockcount()
271        assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
272        assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())
273
274        # Rest of test requires wallet functionality
275        if self.is_wallet_compiled():
276            self.log.info("Wait for tx from second node")
277            payment_txid = self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=5.0, replaceable=True)
278            self.sync_all()
279            self.log.info("Testing sequence notifications with mempool sequence values")
280
281            # Should receive the broadcasted txid.
282            assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
283            seq_num += 1
284
285            self.log.info("Testing RBF notification")
286            # Replace it to test eviction/addition notification
287            rbf_info = self.nodes[1].bumpfee(payment_txid)
288            self.sync_all()
289            assert_equal((payment_txid, "R", seq_num), seq.receive_sequence())
290            seq_num += 1
291            assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence())
292            seq_num += 1
293
294            # Doesn't get published when mined, make a block and tx to "flush" the possibility
295            # though the mempool sequence number does go up by the number of transactions
296            # removed from the mempool by the block mining it.
297            mempool_size = len(self.nodes[0].getrawmempool())
298            c_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
299            self.sync_all()
300            # Make sure the number of mined transactions matches the number of txs out of mempool
301            mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
302            assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
303            seq_num += mempool_size_delta
304            payment_txid_2 = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
305            self.sync_all()
306            assert_equal((c_block, "C", None), seq.receive_sequence())
307            assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
308            seq_num += 1
309
310            # Spot check getrawmempool results that they only show up when asked for
311            assert type(self.nodes[0].getrawmempool()) is list
312            assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
313            assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
314            assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
315            assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)
316
317            self.log.info("Testing reorg notifications")
318            # Manually invalidate the last block to test mempool re-entry
319            # N.B. This part could be made more lenient in exact ordering
320            # since it greatly depends on inner-workings of blocks/mempool
321            # during "deep" re-orgs. Probably should "re-construct"
322            # blockchain/mempool state from notifications instead.
323            block_count = self.nodes[0].getblockcount()
324            best_hash = self.nodes[0].getbestblockhash()
325            self.nodes[0].invalidateblock(best_hash)
326            sleep(2) # Bit of room to make sure transaction things happened
327
328            # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
329            # of the time they were gathered.
330            assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num
331
332            assert_equal((best_hash, "D", None), seq.receive_sequence())
333            assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence())
334            seq_num += 1
335
336            # Other things may happen but aren't wallet-deterministic so we don't test for them currently
337            self.nodes[0].reconsiderblock(best_hash)
338            self.nodes[1].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
339            self.sync_all()
340
341            self.log.info("Evict mempool transaction by block conflict")
342            orig_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True)
343
344            # More to be simply mined
345            more_tx = []
346            for _ in range(5):
347                more_tx.append(self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1))
348
349            raw_tx = self.nodes[0].getrawtransaction(orig_txid)
350            bump_info = self.nodes[0].bumpfee(orig_txid)
351            # Mine the pre-bump tx
352            block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1))
353            tx = FromHex(CTransaction(), raw_tx)
354            block.vtx.append(tx)
355            for txid in more_tx:
356                tx = FromHex(CTransaction(), self.nodes[0].getrawtransaction(txid))
357                block.vtx.append(tx)
358            add_witness_commitment(block)
359            block.solve()
360            assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
361            tip = self.nodes[0].getbestblockhash()
362            assert_equal(int(tip, 16), block.sha256)
363            orig_txid_2 = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True)
364
365            # Flush old notifications until evicted tx original entry
366            (hash_str, label, mempool_seq) = seq.receive_sequence()
367            while hash_str != orig_txid:
368                (hash_str, label, mempool_seq) = seq.receive_sequence()
369            mempool_seq += 1
370
371            # Added original tx
372            assert_equal(label, "A")
373            # More transactions to be simply mined
374            for i in range(len(more_tx)):
375                    assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence())
376                    mempool_seq += 1
377            # Bumped by rbf
378            assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence())
379            mempool_seq += 1
380            assert_equal((bump_info["txid"], "A", mempool_seq), seq.receive_sequence())
381            mempool_seq += 1
382            # Conflict announced first, then block
383            assert_equal((bump_info["txid"], "R", mempool_seq), seq.receive_sequence())
384            mempool_seq += 1
385            assert_equal((tip, "C", None), seq.receive_sequence())
386            mempool_seq += len(more_tx)
387            # Last tx
388            assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
389            mempool_seq += 1
390            self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
391            self.sync_all() # want to make sure we didn't break "consensus" for other tests
392
393    def test_mempool_sync(self):
394        """
395        Use sequence notification plus getrawmempool sequence results to "sync mempool"
396        """
397        if not self.is_wallet_compiled():
398            self.log.info("Skipping mempool sync test")
399            return
400
401        self.log.info("Testing 'mempool sync' usage of sequence notifier")
402        address = 'tcp://127.0.0.1:28333'
403        socket = self.ctx.socket(zmq.SUB)
404        socket.set(zmq.RCVTIMEO, 60000)
405        seq = ZMQSubscriber(socket, b'sequence')
406
407        self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
408        self.connect_nodes(0, 1)
409        socket.connect(address)
410        # Relax so that the subscriber is ready before publishing zmq messages
411        sleep(0.2)
412
413        # In-memory counter, should always start at 1
414        next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
415        assert_equal(next_mempool_seq, 1)
416
417        # Some transactions have been happening but we aren't consuming zmq notifications yet
418        # or we lost a ZMQ message somehow and want to start over
419        txids = []
420        num_txs = 5
421        for _ in range(num_txs):
422            txids.append(self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True))
423        self.sync_all()
424
425        # 1) Consume backlog until we get a mempool sequence number
426        (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
427        while zmq_mem_seq is None:
428                (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
429
430        assert label == "A" or label == "R"
431        assert hash_str is not None
432
433        # 2) We need to "seed" our view of the mempool
434        mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
435        mempool_view = set(mempool_snapshot["txids"])
436        get_raw_seq = mempool_snapshot["mempool_sequence"]
437        assert_equal(get_raw_seq, 6)
438        # Snapshot may be too old compared to zmq message we read off latest
439        while zmq_mem_seq >= get_raw_seq:
440            sleep(2)
441            mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
442            mempool_view = set(mempool_snapshot["txids"])
443            get_raw_seq = mempool_snapshot["mempool_sequence"]
444
445        # Things continue to happen in the "interim" while waiting for snapshot results
446        # We have node 0 do all these to avoid p2p races with RBF announcements
447        for _ in range(num_txs):
448            txids.append(self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True))
449        self.nodes[0].bumpfee(txids[-1])
450        self.sync_all()
451        self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
452        final_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True)
453
454        # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
455        while True:
456            if zmq_mem_seq == get_raw_seq - 1:
457                break
458            (hash_str, label, mempool_sequence) = seq.receive_sequence()
459            if mempool_sequence is not None:
460                zmq_mem_seq = mempool_sequence
461                if zmq_mem_seq > get_raw_seq:
462                    raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq))
463
464        # 4) Moving forward, we apply the delta to our local view
465        #    remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx
466        expected_sequence = get_raw_seq
467        r_gap = 0
468        for _ in range(num_txs + 2 + 1 + 1):
469            (hash_str, label, mempool_sequence) = seq.receive_sequence()
470            if mempool_sequence is not None:
471                if mempool_sequence != expected_sequence:
472                    # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
473                    # position in the incoming block message "C"
474                    if label == "R":
475                        assert mempool_sequence > expected_sequence
476                        r_gap += mempool_sequence - expected_sequence
477                    else:
478                        raise Exception("WARNING: txhash has unexpected mempool sequence value: {} vs expected {}".format(mempool_sequence, expected_sequence))
479            if label == "A":
480                assert hash_str not in mempool_view
481                mempool_view.add(hash_str)
482                expected_sequence = mempool_sequence + 1
483            elif label == "R":
484                assert hash_str in mempool_view
485                mempool_view.remove(hash_str)
486                expected_sequence = mempool_sequence + 1
487            elif label == "C":
488                # (Attempt to) remove all txids from known block connects
489                block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
490                for txid in block_txids:
491                    if txid in mempool_view:
492                        expected_sequence += 1
493                        mempool_view.remove(txid)
494                expected_sequence -= r_gap
495                r_gap = 0
496            elif label == "D":
497                # Not useful for mempool tracking per se
498                continue
499            else:
500                raise Exception("Unexpected ZMQ sequence label!")
501
502        assert_equal(self.nodes[0].getrawmempool(), [final_txid])
503        assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)
504
505        # 5) If you miss a zmq/mempool sequence number, go back to step (2)
506
507        self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
508
509    def test_multiple_interfaces(self):
510        # Set up two subscribers with different addresses
511        subscribers = []
512        for i in range(2):
513            address = 'tcp://127.0.0.1:%d' % (28334 + i)
514            socket = self.ctx.socket(zmq.SUB)
515            socket.set(zmq.RCVTIMEO, 60000)
516            hashblock = ZMQSubscriber(socket, b"hashblock")
517            socket.connect(address)
518            subscribers.append({'address': address, 'hashblock': hashblock})
519
520        self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers])
521
522        # Relax so that the subscriber is ready before publishing zmq messages
523        sleep(0.2)
524
525        # Generate 1 block in nodes[0] and receive all notifications
526        self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
527
528        # Should receive the same block hash on both subscribers
529        assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex())
530        assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex())
531
532if __name__ == '__main__':
533    ZMQTest().main()
534