1 // Copyright (c) 2015-2020 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5 #include <zmq/zmqpublishnotifier.h>
6
7 #include <chain.h>
8 #include <chainparams.h>
9 #include <rpc/server.h>
10 #include <streams.h>
11 #include <util/system.h>
12 #include <validation.h>
13 #include <zmq/zmqutil.h>
14
15 #include <zmq.h>
16
17 #include <cstdarg>
18 #include <cstddef>
19 #include <map>
20 #include <string>
21 #include <utility>
22
23 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
24
25 static const char *MSG_HASHBLOCK = "hashblock";
26 static const char *MSG_HASHTX = "hashtx";
27 static const char *MSG_RAWBLOCK = "rawblock";
28 static const char *MSG_RAWTX = "rawtx";
29 static const char *MSG_SEQUENCE = "sequence";
30
31 // Internal function to send multipart message
zmq_send_multipart(void * sock,const void * data,size_t size,...)32 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
33 {
34 va_list args;
35 va_start(args, size);
36
37 while (1)
38 {
39 zmq_msg_t msg;
40
41 int rc = zmq_msg_init_size(&msg, size);
42 if (rc != 0)
43 {
44 zmqError("Unable to initialize ZMQ msg");
45 va_end(args);
46 return -1;
47 }
48
49 void *buf = zmq_msg_data(&msg);
50 memcpy(buf, data, size);
51
52 data = va_arg(args, const void*);
53
54 rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
55 if (rc == -1)
56 {
57 zmqError("Unable to send ZMQ msg");
58 zmq_msg_close(&msg);
59 va_end(args);
60 return -1;
61 }
62
63 zmq_msg_close(&msg);
64
65 if (!data)
66 break;
67
68 size = va_arg(args, size_t);
69 }
70 va_end(args);
71 return 0;
72 }
73
Initialize(void * pcontext)74 bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
75 {
76 assert(!psocket);
77
78 // check if address is being used by other publish notifier
79 std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
80
81 if (i==mapPublishNotifiers.end())
82 {
83 psocket = zmq_socket(pcontext, ZMQ_PUB);
84 if (!psocket)
85 {
86 zmqError("Failed to create socket");
87 return false;
88 }
89
90 LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
91
92 int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
93 if (rc != 0)
94 {
95 zmqError("Failed to set outbound message high water mark");
96 zmq_close(psocket);
97 return false;
98 }
99
100 const int so_keepalive_option {1};
101 rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option));
102 if (rc != 0) {
103 zmqError("Failed to set SO_KEEPALIVE");
104 zmq_close(psocket);
105 return false;
106 }
107
108 rc = zmq_bind(psocket, address.c_str());
109 if (rc != 0)
110 {
111 zmqError("Failed to bind address");
112 zmq_close(psocket);
113 return false;
114 }
115
116 // register this notifier for the address, so it can be reused for other publish notifier
117 mapPublishNotifiers.insert(std::make_pair(address, this));
118 return true;
119 }
120 else
121 {
122 LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
123 LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
124
125 psocket = i->second->psocket;
126 mapPublishNotifiers.insert(std::make_pair(address, this));
127
128 return true;
129 }
130 }
131
Shutdown()132 void CZMQAbstractPublishNotifier::Shutdown()
133 {
134 // Early return if Initialize was not called
135 if (!psocket) return;
136
137 int count = mapPublishNotifiers.count(address);
138
139 // remove this notifier from the list of publishers using this address
140 typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
141 std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
142
143 for (iterator it = iterpair.first; it != iterpair.second; ++it)
144 {
145 if (it->second==this)
146 {
147 mapPublishNotifiers.erase(it);
148 break;
149 }
150 }
151
152 if (count == 1)
153 {
154 LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
155 int linger = 0;
156 zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
157 zmq_close(psocket);
158 }
159
160 psocket = nullptr;
161 }
162
SendZmqMessage(const char * command,const void * data,size_t size)163 bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size)
164 {
165 assert(psocket);
166
167 /* send three parts, command & data & a LE 4byte sequence number */
168 unsigned char msgseq[sizeof(uint32_t)];
169 WriteLE32(&msgseq[0], nSequence);
170 int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
171 if (rc == -1)
172 return false;
173
174 /* increment memory only sequence number after sending */
175 nSequence++;
176
177 return true;
178 }
179
NotifyBlock(const CBlockIndex * pindex)180 bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
181 {
182 uint256 hash = pindex->GetBlockHash();
183 LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address);
184 char data[32];
185 for (unsigned int i = 0; i < 32; i++)
186 data[31 - i] = hash.begin()[i];
187 return SendZmqMessage(MSG_HASHBLOCK, data, 32);
188 }
189
NotifyTransaction(const CTransaction & transaction)190 bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
191 {
192 uint256 hash = transaction.GetHash();
193 LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address);
194 char data[32];
195 for (unsigned int i = 0; i < 32; i++)
196 data[31 - i] = hash.begin()[i];
197 return SendZmqMessage(MSG_HASHTX, data, 32);
198 }
199
NotifyBlock(const CBlockIndex * pindex)200 bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
201 {
202 LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
203
204 const Consensus::Params& consensusParams = Params().GetConsensus();
205 CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
206 {
207 LOCK(cs_main);
208 CBlock block;
209 if(!ReadBlockFromDisk(block, pindex, consensusParams))
210 {
211 zmqError("Can't read block from disk");
212 return false;
213 }
214
215 ss << block;
216 }
217
218 return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
219 }
220
NotifyTransaction(const CTransaction & transaction)221 bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
222 {
223 uint256 hash = transaction.GetHash();
224 LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address);
225 CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
226 ss << transaction;
227 return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
228 }
229
230
231 // TODO: Dedup this code to take label char, log string
NotifyBlockConnect(const CBlockIndex * pindex)232 bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
233 {
234 uint256 hash = pindex->GetBlockHash();
235 LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
236 char data[sizeof(uint256)+1];
237 for (unsigned int i = 0; i < sizeof(uint256); i++)
238 data[sizeof(uint256) - 1 - i] = hash.begin()[i];
239 data[sizeof(data) - 1] = 'C'; // Block (C)onnect
240 return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
241 }
242
NotifyBlockDisconnect(const CBlockIndex * pindex)243 bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
244 {
245 uint256 hash = pindex->GetBlockHash();
246 LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
247 char data[sizeof(uint256)+1];
248 for (unsigned int i = 0; i < sizeof(uint256); i++)
249 data[sizeof(uint256) - 1 - i] = hash.begin()[i];
250 data[sizeof(data) - 1] = 'D'; // Block (D)isconnect
251 return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
252 }
253
NotifyTransactionAcceptance(const CTransaction & transaction,uint64_t mempool_sequence)254 bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
255 {
256 uint256 hash = transaction.GetHash();
257 LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
258 unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
259 for (unsigned int i = 0; i < sizeof(uint256); i++)
260 data[sizeof(uint256) - 1 - i] = hash.begin()[i];
261 data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance
262 WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
263 return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
264 }
265
NotifyTransactionRemoval(const CTransaction & transaction,uint64_t mempool_sequence)266 bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
267 {
268 uint256 hash = transaction.GetHash();
269 LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
270 unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
271 for (unsigned int i = 0; i < sizeof(uint256); i++)
272 data[sizeof(uint256) - 1 - i] = hash.begin()[i];
273 data[sizeof(uint256)] = 'R'; // Mempool (R)emoval
274 WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
275 return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
276 }
277