1 // Copyright (c) 2015-2019 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <zmq/zmqnotificationinterface.h>
6 #include <zmq/zmqpublishnotifier.h>
7 #include <zmq/zmqutil.h>
8 
9 #include <zmq.h>
10 
11 #include <validation.h>
12 #include <util/system.h>
13 
CZMQNotificationInterface()14 CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr)
15 {
16 }
17 
~CZMQNotificationInterface()18 CZMQNotificationInterface::~CZMQNotificationInterface()
19 {
20     Shutdown();
21 }
22 
GetActiveNotifiers() const23 std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
24 {
25     std::list<const CZMQAbstractNotifier*> result;
26     for (const auto& n : notifiers) {
27         result.push_back(n.get());
28     }
29     return result;
30 }
31 
Create()32 CZMQNotificationInterface* CZMQNotificationInterface::Create()
33 {
34     std::map<std::string, CZMQNotifierFactory> factories;
35     factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
36     factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
37     factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
38     factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
39     factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
40 
41     std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
42     for (const auto& entry : factories)
43     {
44         std::string arg("-zmq" + entry.first);
45         const auto& factory = entry.second;
46         for (const std::string& address : gArgs.GetArgs(arg)) {
47             std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
48             notifier->SetType(entry.first);
49             notifier->SetAddress(address);
50             notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
51             notifiers.push_back(std::move(notifier));
52         }
53     }
54 
55     if (!notifiers.empty())
56     {
57         std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
58         notificationInterface->notifiers = std::move(notifiers);
59 
60         if (notificationInterface->Initialize()) {
61             return notificationInterface.release();
62         }
63     }
64 
65     return nullptr;
66 }
67 
68 // Called at startup to conditionally set up ZMQ socket(s)
Initialize()69 bool CZMQNotificationInterface::Initialize()
70 {
71     int major = 0, minor = 0, patch = 0;
72     zmq_version(&major, &minor, &patch);
73     LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
74 
75     LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
76     assert(!pcontext);
77 
78     pcontext = zmq_ctx_new();
79 
80     if (!pcontext)
81     {
82         zmqError("Unable to initialize context");
83         return false;
84     }
85 
86     for (auto& notifier : notifiers) {
87         if (notifier->Initialize(pcontext)) {
88             LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
89         } else {
90             LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
91             return false;
92         }
93     }
94 
95     return true;
96 }
97 
98 // Called during shutdown sequence
Shutdown()99 void CZMQNotificationInterface::Shutdown()
100 {
101     LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
102     if (pcontext)
103     {
104         for (auto& notifier : notifiers) {
105             LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
106             notifier->Shutdown();
107         }
108         zmq_ctx_term(pcontext);
109 
110         pcontext = nullptr;
111     }
112 }
113 
114 namespace {
115 
116 template <typename Function>
TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>> & notifiers,const Function & func)117 void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
118 {
119     for (auto i = notifiers.begin(); i != notifiers.end(); ) {
120         CZMQAbstractNotifier* notifier = i->get();
121         if (func(notifier)) {
122             ++i;
123         } else {
124             notifier->Shutdown();
125             i = notifiers.erase(i);
126         }
127     }
128 }
129 
130 } // anonymous namespace
131 
UpdatedBlockTip(const CBlockIndex * pindexNew,const CBlockIndex * pindexFork,bool fInitialDownload)132 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
133 {
134     if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
135         return;
136 
137     TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
138         return notifier->NotifyBlock(pindexNew);
139     });
140 }
141 
TransactionAddedToMempool(const CTransactionRef & ptx,uint64_t mempool_sequence)142 void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, uint64_t mempool_sequence)
143 {
144     const CTransaction& tx = *ptx;
145 
146     TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
147         return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
148     });
149 }
150 
TransactionRemovedFromMempool(const CTransactionRef & ptx,MemPoolRemovalReason reason,uint64_t mempool_sequence)151 void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
152 {
153     // Called for all non-block inclusion reasons
154     const CTransaction& tx = *ptx;
155 
156     TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
157         return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
158     });
159 }
160 
BlockConnected(const std::shared_ptr<const CBlock> & pblock,const CBlockIndex * pindexConnected)161 void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
162 {
163     for (const CTransactionRef& ptx : pblock->vtx) {
164         const CTransaction& tx = *ptx;
165         TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
166             return notifier->NotifyTransaction(tx);
167         });
168     }
169 
170     // Next we notify BlockConnect listeners for *all* blocks
171     TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
172         return notifier->NotifyBlockConnect(pindexConnected);
173     });
174 }
175 
BlockDisconnected(const std::shared_ptr<const CBlock> & pblock,const CBlockIndex * pindexDisconnected)176 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
177 {
178     for (const CTransactionRef& ptx : pblock->vtx) {
179         const CTransaction& tx = *ptx;
180         TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
181             return notifier->NotifyTransaction(tx);
182         });
183     }
184 
185     // Next we notify BlockDisconnect listeners for *all* blocks
186     TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
187         return notifier->NotifyBlockDisconnect(pindexDisconnected);
188     });
189 }
190 
191 CZMQNotificationInterface* g_zmq_notification_interface = nullptr;
192