1 // Copyright (c) 2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include "zmqnotificationinterface.h"
6 #include "zmqpublishnotifier.h"
7 
8 #include "version.h"
9 #include "main.h"
10 #include "streams.h"
11 #include "util.h"
12 
zmqError(const char * str)13 void zmqError(const char *str)
14 {
15     LogPrint("zmq", "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
16 }
17 
CZMQNotificationInterface()18 CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
19 {
20 }
21 
~CZMQNotificationInterface()22 CZMQNotificationInterface::~CZMQNotificationInterface()
23 {
24     Shutdown();
25 
26     for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
27     {
28         delete *i;
29     }
30 }
31 
CreateWithArguments(const std::map<std::string,std::string> & args)32 CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
33 {
34     CZMQNotificationInterface* notificationInterface = NULL;
35     std::map<std::string, CZMQNotifierFactory> factories;
36     std::list<CZMQAbstractNotifier*> notifiers;
37 
38     factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
39     factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
40     factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
41     factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
42 
43     for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
44     {
45         std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first);
46         if (j!=args.end())
47         {
48             CZMQNotifierFactory factory = i->second;
49             std::string address = j->second;
50             CZMQAbstractNotifier *notifier = factory();
51             notifier->SetType(i->first);
52             notifier->SetAddress(address);
53             notifiers.push_back(notifier);
54         }
55     }
56 
57     if (!notifiers.empty())
58     {
59         notificationInterface = new CZMQNotificationInterface();
60         notificationInterface->notifiers = notifiers;
61 
62         if (!notificationInterface->Initialize())
63         {
64             delete notificationInterface;
65             notificationInterface = NULL;
66         }
67     }
68 
69     return notificationInterface;
70 }
71 
72 // Called at startup to conditionally set up ZMQ socket(s)
Initialize()73 bool CZMQNotificationInterface::Initialize()
74 {
75     LogPrint("zmq", "zmq: Initialize notification interface\n");
76     assert(!pcontext);
77 
78     pcontext = zmq_init(1);
79 
80     if (!pcontext)
81     {
82         zmqError("Unable to initialize context");
83         return false;
84     }
85 
86     std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
87     for (; i!=notifiers.end(); ++i)
88     {
89         CZMQAbstractNotifier *notifier = *i;
90         if (notifier->Initialize(pcontext))
91         {
92             LogPrint("zmq", "  Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
93         }
94         else
95         {
96             LogPrint("zmq", "  Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
97             break;
98         }
99     }
100 
101     if (i!=notifiers.end())
102     {
103         return false;
104     }
105 
106     return true;
107 }
108 
109 // Called during shutdown sequence
Shutdown()110 void CZMQNotificationInterface::Shutdown()
111 {
112     LogPrint("zmq", "zmq: Shutdown notification interface\n");
113     if (pcontext)
114     {
115         for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
116         {
117             CZMQAbstractNotifier *notifier = *i;
118             LogPrint("zmq", "   Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
119             notifier->Shutdown();
120         }
121         zmq_ctx_destroy(pcontext);
122 
123         pcontext = 0;
124     }
125 }
126 
UpdatedBlockTip(const CBlockIndex * pindex)127 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindex)
128 {
129     for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
130     {
131         CZMQAbstractNotifier *notifier = *i;
132         if (notifier->NotifyBlock(pindex))
133         {
134             i++;
135         }
136         else
137         {
138             notifier->Shutdown();
139             i = notifiers.erase(i);
140         }
141     }
142 }
143 
SyncTransaction(const CTransaction & tx,const CBlockIndex * pindex,const CBlock * pblock)144 void CZMQNotificationInterface::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, const CBlock* pblock)
145 {
146     for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
147     {
148         CZMQAbstractNotifier *notifier = *i;
149         if (notifier->NotifyTransaction(tx))
150         {
151             i++;
152         }
153         else
154         {
155             notifier->Shutdown();
156             i = notifiers.erase(i);
157         }
158     }
159 }
160