1 // Copyright (c) 2015-2020 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5 #include <chain.h>
6 #include <chainparams.h>
7 #include <streams.h>
8 #include <zmq/zmqpublishnotifier.h>
9 #include <validation.h>
10 #include <util/system.h>
11 #include <rpc/server.h>
12
13 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
14
15 static const char *MSG_HASHBLOCK = "hashblock";
16 static const char *MSG_HASHTX = "hashtx";
17 static const char *MSG_RAWBLOCK = "rawblock";
18 static const char *MSG_RAWTX = "rawtx";
19
20 // Internal function to send multipart message
zmq_send_multipart(void * sock,const void * data,size_t size,...)21 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
22 {
23 va_list args;
24 va_start(args, size);
25
26 while (1)
27 {
28 zmq_msg_t msg;
29
30 int rc = zmq_msg_init_size(&msg, size);
31 if (rc != 0)
32 {
33 zmqError("Unable to initialize ZMQ msg");
34 va_end(args);
35 return -1;
36 }
37
38 void *buf = zmq_msg_data(&msg);
39 memcpy(buf, data, size);
40
41 data = va_arg(args, const void*);
42
43 rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
44 if (rc == -1)
45 {
46 zmqError("Unable to send ZMQ msg");
47 zmq_msg_close(&msg);
48 va_end(args);
49 return -1;
50 }
51
52 zmq_msg_close(&msg);
53
54 if (!data)
55 break;
56
57 size = va_arg(args, size_t);
58 }
59 va_end(args);
60 return 0;
61 }
62
Initialize(void * pcontext)63 bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
64 {
65 assert(!psocket);
66
67 // check if address is being used by other publish notifier
68 std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
69
70 if (i==mapPublishNotifiers.end())
71 {
72 psocket = zmq_socket(pcontext, ZMQ_PUB);
73 if (!psocket)
74 {
75 zmqError("Failed to create socket");
76 return false;
77 }
78
79 LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
80
81 int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
82 if (rc != 0)
83 {
84 zmqError("Failed to set outbound message high water mark");
85 zmq_close(psocket);
86 return false;
87 }
88
89 rc = zmq_bind(psocket, address.c_str());
90 if (rc != 0)
91 {
92 zmqError("Failed to bind address");
93 zmq_close(psocket);
94 return false;
95 }
96
97 // register this notifier for the address, so it can be reused for other publish notifier
98 mapPublishNotifiers.insert(std::make_pair(address, this));
99 return true;
100 }
101 else
102 {
103 LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
104 LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
105
106 psocket = i->second->psocket;
107 mapPublishNotifiers.insert(std::make_pair(address, this));
108
109 return true;
110 }
111 }
112
Shutdown()113 void CZMQAbstractPublishNotifier::Shutdown()
114 {
115 // Early return if Initialize was not called
116 if (!psocket) return;
117
118 int count = mapPublishNotifiers.count(address);
119
120 // remove this notifier from the list of publishers using this address
121 typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
122 std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
123
124 for (iterator it = iterpair.first; it != iterpair.second; ++it)
125 {
126 if (it->second==this)
127 {
128 mapPublishNotifiers.erase(it);
129 break;
130 }
131 }
132
133 if (count == 1)
134 {
135 LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
136 int linger = 0;
137 zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
138 zmq_close(psocket);
139 }
140
141 psocket = nullptr;
142 }
143
SendMessage(const char * command,const void * data,size_t size)144 bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
145 {
146 assert(psocket);
147
148 /* send three parts, command & data & a LE 4byte sequence number */
149 unsigned char msgseq[sizeof(uint32_t)];
150 WriteLE32(&msgseq[0], nSequence);
151 int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
152 if (rc == -1)
153 return false;
154
155 /* increment memory only sequence number after sending */
156 nSequence++;
157
158 return true;
159 }
160
NotifyBlock(const CBlockIndex * pindex)161 bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
162 {
163 uint256 hash = pindex->GetBlockHash();
164 LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
165 char data[32];
166 for (unsigned int i = 0; i < 32; i++)
167 data[31 - i] = hash.begin()[i];
168 return SendMessage(MSG_HASHBLOCK, data, 32);
169 }
170
NotifyTransaction(const CTransaction & transaction)171 bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
172 {
173 uint256 hash = transaction.GetHash();
174 LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
175 char data[32];
176 for (unsigned int i = 0; i < 32; i++)
177 data[31 - i] = hash.begin()[i];
178 return SendMessage(MSG_HASHTX, data, 32);
179 }
180
NotifyBlock(const CBlockIndex * pindex)181 bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
182 {
183 LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
184
185 const Consensus::Params& consensusParams = Params().GetConsensus();
186 CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
187 {
188 LOCK(cs_main);
189 CBlock block;
190 if(!ReadBlockFromDisk(block, pindex, consensusParams))
191 {
192 zmqError("Can't read block from disk");
193 return false;
194 }
195
196 ss << block;
197 }
198
199 return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
200 }
201
NotifyTransaction(const CTransaction & transaction)202 bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
203 {
204 uint256 hash = transaction.GetHash();
205 LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
206 CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
207 ss << transaction;
208 return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
209 }
210