1 // Copyright (c) 2015-2020 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <chain.h>
6 #include <chainparams.h>
7 #include <streams.h>
8 #include <zmq/zmqpublishnotifier.h>
9 #include <validation.h>
10 #include <util/system.h>
11 #include <rpc/server.h>
12 
13 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
14 
15 static const char *MSG_HASHBLOCK = "hashblock";
16 static const char *MSG_HASHTX    = "hashtx";
17 static const char *MSG_RAWBLOCK  = "rawblock";
18 static const char *MSG_RAWTX     = "rawtx";
19 
20 // Internal function to send multipart message
zmq_send_multipart(void * sock,const void * data,size_t size,...)21 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
22 {
23     va_list args;
24     va_start(args, size);
25 
26     while (1)
27     {
28         zmq_msg_t msg;
29 
30         int rc = zmq_msg_init_size(&msg, size);
31         if (rc != 0)
32         {
33             zmqError("Unable to initialize ZMQ msg");
34             va_end(args);
35             return -1;
36         }
37 
38         void *buf = zmq_msg_data(&msg);
39         memcpy(buf, data, size);
40 
41         data = va_arg(args, const void*);
42 
43         rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
44         if (rc == -1)
45         {
46             zmqError("Unable to send ZMQ msg");
47             zmq_msg_close(&msg);
48             va_end(args);
49             return -1;
50         }
51 
52         zmq_msg_close(&msg);
53 
54         if (!data)
55             break;
56 
57         size = va_arg(args, size_t);
58     }
59     va_end(args);
60     return 0;
61 }
62 
Initialize(void * pcontext)63 bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
64 {
65     assert(!psocket);
66 
67     // check if address is being used by other publish notifier
68     std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
69 
70     if (i==mapPublishNotifiers.end())
71     {
72         psocket = zmq_socket(pcontext, ZMQ_PUB);
73         if (!psocket)
74         {
75             zmqError("Failed to create socket");
76             return false;
77         }
78 
79         LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
80 
81         int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
82         if (rc != 0)
83         {
84             zmqError("Failed to set outbound message high water mark");
85             zmq_close(psocket);
86             return false;
87         }
88 
89         rc = zmq_bind(psocket, address.c_str());
90         if (rc != 0)
91         {
92             zmqError("Failed to bind address");
93             zmq_close(psocket);
94             return false;
95         }
96 
97         // register this notifier for the address, so it can be reused for other publish notifier
98         mapPublishNotifiers.insert(std::make_pair(address, this));
99         return true;
100     }
101     else
102     {
103         LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
104         LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
105 
106         psocket = i->second->psocket;
107         mapPublishNotifiers.insert(std::make_pair(address, this));
108 
109         return true;
110     }
111 }
112 
Shutdown()113 void CZMQAbstractPublishNotifier::Shutdown()
114 {
115     // Early return if Initialize was not called
116     if (!psocket) return;
117 
118     int count = mapPublishNotifiers.count(address);
119 
120     // remove this notifier from the list of publishers using this address
121     typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
122     std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
123 
124     for (iterator it = iterpair.first; it != iterpair.second; ++it)
125     {
126         if (it->second==this)
127         {
128             mapPublishNotifiers.erase(it);
129             break;
130         }
131     }
132 
133     if (count == 1)
134     {
135         LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
136         int linger = 0;
137         zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
138         zmq_close(psocket);
139     }
140 
141     psocket = nullptr;
142 }
143 
SendMessage(const char * command,const void * data,size_t size)144 bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
145 {
146     assert(psocket);
147 
148     /* send three parts, command & data & a LE 4byte sequence number */
149     unsigned char msgseq[sizeof(uint32_t)];
150     WriteLE32(&msgseq[0], nSequence);
151     int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
152     if (rc == -1)
153         return false;
154 
155     /* increment memory only sequence number after sending */
156     nSequence++;
157 
158     return true;
159 }
160 
NotifyBlock(const CBlockIndex * pindex)161 bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
162 {
163     uint256 hash = pindex->GetBlockHash();
164     LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
165     char data[32];
166     for (unsigned int i = 0; i < 32; i++)
167         data[31 - i] = hash.begin()[i];
168     return SendMessage(MSG_HASHBLOCK, data, 32);
169 }
170 
NotifyTransaction(const CTransaction & transaction)171 bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
172 {
173     uint256 hash = transaction.GetHash();
174     LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
175     char data[32];
176     for (unsigned int i = 0; i < 32; i++)
177         data[31 - i] = hash.begin()[i];
178     return SendMessage(MSG_HASHTX, data, 32);
179 }
180 
NotifyBlock(const CBlockIndex * pindex)181 bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
182 {
183     LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
184 
185     const Consensus::Params& consensusParams = Params().GetConsensus();
186     CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
187     {
188         LOCK(cs_main);
189         CBlock block;
190         if(!ReadBlockFromDisk(block, pindex, consensusParams))
191         {
192             zmqError("Can't read block from disk");
193             return false;
194         }
195 
196         ss << block;
197     }
198 
199     return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
200 }
201 
NotifyTransaction(const CTransaction & transaction)202 bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
203 {
204     uint256 hash = transaction.GetHash();
205     LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
206     CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
207     ss << transaction;
208     return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
209 }
210