1 // Copyright (c) 2015-2020 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <zmq/zmqpublishnotifier.h>
6 
7 #include <chain.h>
8 #include <chainparams.h>
9 #include <rpc/server.h>
10 #include <streams.h>
11 #include <util/system.h>
12 #include <validation.h>
13 #include <zmq/zmqutil.h>
14 
15 #include <zmq.h>
16 
17 #include <cstdarg>
18 #include <cstddef>
19 #include <map>
20 #include <string>
21 #include <utility>
22 
23 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
24 
25 static const char *MSG_HASHBLOCK = "hashblock";
26 static const char *MSG_HASHTX    = "hashtx";
27 static const char *MSG_RAWBLOCK  = "rawblock";
28 static const char *MSG_RAWTX     = "rawtx";
29 static const char *MSG_SEQUENCE  = "sequence";
30 
31 // Internal function to send multipart message
zmq_send_multipart(void * sock,const void * data,size_t size,...)32 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
33 {
34     va_list args;
35     va_start(args, size);
36 
37     while (1)
38     {
39         zmq_msg_t msg;
40 
41         int rc = zmq_msg_init_size(&msg, size);
42         if (rc != 0)
43         {
44             zmqError("Unable to initialize ZMQ msg");
45             va_end(args);
46             return -1;
47         }
48 
49         void *buf = zmq_msg_data(&msg);
50         memcpy(buf, data, size);
51 
52         data = va_arg(args, const void*);
53 
54         rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
55         if (rc == -1)
56         {
57             zmqError("Unable to send ZMQ msg");
58             zmq_msg_close(&msg);
59             va_end(args);
60             return -1;
61         }
62 
63         zmq_msg_close(&msg);
64 
65         if (!data)
66             break;
67 
68         size = va_arg(args, size_t);
69     }
70     va_end(args);
71     return 0;
72 }
73 
Initialize(void * pcontext)74 bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
75 {
76     assert(!psocket);
77 
78     // check if address is being used by other publish notifier
79     std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
80 
81     if (i==mapPublishNotifiers.end())
82     {
83         psocket = zmq_socket(pcontext, ZMQ_PUB);
84         if (!psocket)
85         {
86             zmqError("Failed to create socket");
87             return false;
88         }
89 
90         LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
91 
92         int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
93         if (rc != 0)
94         {
95             zmqError("Failed to set outbound message high water mark");
96             zmq_close(psocket);
97             return false;
98         }
99 
100         const int so_keepalive_option {1};
101         rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option));
102         if (rc != 0) {
103             zmqError("Failed to set SO_KEEPALIVE");
104             zmq_close(psocket);
105             return false;
106         }
107 
108         rc = zmq_bind(psocket, address.c_str());
109         if (rc != 0)
110         {
111             zmqError("Failed to bind address");
112             zmq_close(psocket);
113             return false;
114         }
115 
116         // register this notifier for the address, so it can be reused for other publish notifier
117         mapPublishNotifiers.insert(std::make_pair(address, this));
118         return true;
119     }
120     else
121     {
122         LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
123         LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
124 
125         psocket = i->second->psocket;
126         mapPublishNotifiers.insert(std::make_pair(address, this));
127 
128         return true;
129     }
130 }
131 
Shutdown()132 void CZMQAbstractPublishNotifier::Shutdown()
133 {
134     // Early return if Initialize was not called
135     if (!psocket) return;
136 
137     int count = mapPublishNotifiers.count(address);
138 
139     // remove this notifier from the list of publishers using this address
140     typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
141     std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
142 
143     for (iterator it = iterpair.first; it != iterpair.second; ++it)
144     {
145         if (it->second==this)
146         {
147             mapPublishNotifiers.erase(it);
148             break;
149         }
150     }
151 
152     if (count == 1)
153     {
154         LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
155         int linger = 0;
156         zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
157         zmq_close(psocket);
158     }
159 
160     psocket = nullptr;
161 }
162 
SendZmqMessage(const char * command,const void * data,size_t size)163 bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size)
164 {
165     assert(psocket);
166 
167     /* send three parts, command & data & a LE 4byte sequence number */
168     unsigned char msgseq[sizeof(uint32_t)];
169     WriteLE32(&msgseq[0], nSequence);
170     int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
171     if (rc == -1)
172         return false;
173 
174     /* increment memory only sequence number after sending */
175     nSequence++;
176 
177     return true;
178 }
179 
NotifyBlock(const CBlockIndex * pindex)180 bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
181 {
182     uint256 hash = pindex->GetBlockHash();
183     LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address);
184     char data[32];
185     for (unsigned int i = 0; i < 32; i++)
186         data[31 - i] = hash.begin()[i];
187     return SendZmqMessage(MSG_HASHBLOCK, data, 32);
188 }
189 
NotifyTransaction(const CTransaction & transaction)190 bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
191 {
192     uint256 hash = transaction.GetHash();
193     LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address);
194     char data[32];
195     for (unsigned int i = 0; i < 32; i++)
196         data[31 - i] = hash.begin()[i];
197     return SendZmqMessage(MSG_HASHTX, data, 32);
198 }
199 
NotifyBlock(const CBlockIndex * pindex)200 bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
201 {
202     LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
203 
204     const Consensus::Params& consensusParams = Params().GetConsensus();
205     CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
206     {
207         LOCK(cs_main);
208         CBlock block;
209         if(!ReadBlockFromDisk(block, pindex, consensusParams))
210         {
211             zmqError("Can't read block from disk");
212             return false;
213         }
214 
215         ss << block;
216     }
217 
218     return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
219 }
220 
NotifyTransaction(const CTransaction & transaction)221 bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
222 {
223     uint256 hash = transaction.GetHash();
224     LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address);
225     CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
226     ss << transaction;
227     return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
228 }
229 
230 
231 // TODO: Dedup this code to take label char, log string
NotifyBlockConnect(const CBlockIndex * pindex)232 bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
233 {
234     uint256 hash = pindex->GetBlockHash();
235     LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
236     char data[sizeof(uint256)+1];
237     for (unsigned int i = 0; i < sizeof(uint256); i++)
238         data[sizeof(uint256) - 1 - i] = hash.begin()[i];
239     data[sizeof(data) - 1] = 'C'; // Block (C)onnect
240     return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
241 }
242 
NotifyBlockDisconnect(const CBlockIndex * pindex)243 bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
244 {
245     uint256 hash = pindex->GetBlockHash();
246     LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
247     char data[sizeof(uint256)+1];
248     for (unsigned int i = 0; i < sizeof(uint256); i++)
249         data[sizeof(uint256) - 1 - i] = hash.begin()[i];
250     data[sizeof(data) - 1] = 'D'; // Block (D)isconnect
251     return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
252 }
253 
NotifyTransactionAcceptance(const CTransaction & transaction,uint64_t mempool_sequence)254 bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
255 {
256     uint256 hash = transaction.GetHash();
257     LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
258     unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
259     for (unsigned int i = 0; i < sizeof(uint256); i++)
260         data[sizeof(uint256) - 1 - i] = hash.begin()[i];
261     data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance
262     WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
263     return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
264 }
265 
NotifyTransactionRemoval(const CTransaction & transaction,uint64_t mempool_sequence)266 bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
267 {
268     uint256 hash = transaction.GetHash();
269     LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
270     unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
271     for (unsigned int i = 0; i < sizeof(uint256); i++)
272         data[sizeof(uint256) - 1 - i] = hash.begin()[i];
273     data[sizeof(uint256)] = 'R'; // Mempool (R)emoval
274     WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
275     return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
276 }
277