1 // Copyright (c) 2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5 #include "zmqnotificationinterface.h"
6 #include "zmqpublishnotifier.h"
7
8 #include "version.h"
9 #include "main.h"
10 #include "streams.h"
11 #include "util.h"
12
zmqError(const char * str)13 void zmqError(const char *str)
14 {
15 LogPrint("zmq", "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
16 }
17
CZMQNotificationInterface()18 CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
19 {
20 }
21
~CZMQNotificationInterface()22 CZMQNotificationInterface::~CZMQNotificationInterface()
23 {
24 Shutdown();
25
26 for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
27 {
28 delete *i;
29 }
30 }
31
CreateWithArguments(const std::map<std::string,std::string> & args)32 CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
33 {
34 CZMQNotificationInterface* notificationInterface = NULL;
35 std::map<std::string, CZMQNotifierFactory> factories;
36 std::list<CZMQAbstractNotifier*> notifiers;
37
38 factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
39 factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
40 factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
41 factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
42
43 for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
44 {
45 std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first);
46 if (j!=args.end())
47 {
48 CZMQNotifierFactory factory = i->second;
49 std::string address = j->second;
50 CZMQAbstractNotifier *notifier = factory();
51 notifier->SetType(i->first);
52 notifier->SetAddress(address);
53 notifiers.push_back(notifier);
54 }
55 }
56
57 if (!notifiers.empty())
58 {
59 notificationInterface = new CZMQNotificationInterface();
60 notificationInterface->notifiers = notifiers;
61
62 if (!notificationInterface->Initialize())
63 {
64 delete notificationInterface;
65 notificationInterface = NULL;
66 }
67 }
68
69 return notificationInterface;
70 }
71
72 // Called at startup to conditionally set up ZMQ socket(s)
Initialize()73 bool CZMQNotificationInterface::Initialize()
74 {
75 LogPrint("zmq", "zmq: Initialize notification interface\n");
76 assert(!pcontext);
77
78 pcontext = zmq_init(1);
79
80 if (!pcontext)
81 {
82 zmqError("Unable to initialize context");
83 return false;
84 }
85
86 std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
87 for (; i!=notifiers.end(); ++i)
88 {
89 CZMQAbstractNotifier *notifier = *i;
90 if (notifier->Initialize(pcontext))
91 {
92 LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
93 }
94 else
95 {
96 LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
97 break;
98 }
99 }
100
101 if (i!=notifiers.end())
102 {
103 return false;
104 }
105
106 return true;
107 }
108
109 // Called during shutdown sequence
Shutdown()110 void CZMQNotificationInterface::Shutdown()
111 {
112 LogPrint("zmq", "zmq: Shutdown notification interface\n");
113 if (pcontext)
114 {
115 for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
116 {
117 CZMQAbstractNotifier *notifier = *i;
118 LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
119 notifier->Shutdown();
120 }
121 zmq_ctx_destroy(pcontext);
122
123 pcontext = 0;
124 }
125 }
126
UpdatedBlockTip(const CBlockIndex * pindex)127 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindex)
128 {
129 for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
130 {
131 CZMQAbstractNotifier *notifier = *i;
132 if (notifier->NotifyBlock(pindex))
133 {
134 i++;
135 }
136 else
137 {
138 notifier->Shutdown();
139 i = notifiers.erase(i);
140 }
141 }
142 }
143
SyncTransaction(const CTransaction & tx,const CBlockIndex * pindex,const CBlock * pblock)144 void CZMQNotificationInterface::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, const CBlock* pblock)
145 {
146 for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
147 {
148 CZMQAbstractNotifier *notifier = *i;
149 if (notifier->NotifyTransaction(tx))
150 {
151 i++;
152 }
153 else
154 {
155 notifier->Shutdown();
156 i = notifiers.erase(i);
157 }
158 }
159 }
160