1 /*
2  * This file is part of PowerDNS or dnsdist.
3  * Copyright -- PowerDNS.COM B.V. and its contributors
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of version 2 of the GNU General Public License as
7  * published by the Free Software Foundation.
8  *
9  * In addition, for the avoidance of any doubt, permission is granted to
10  * link this program with OpenSSL and to (re)distribute the binaries
11  * produced as the result of such linking.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21  */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25 #include "remotebackend.hh"
26 #ifdef REMOTEBACKEND_ZEROMQ
27 
ZeroMQConnector(std::map<std::string,std::string> options)28 ZeroMQConnector::ZeroMQConnector(std::map<std::string, std::string> options) :
29   d_ctx(std::unique_ptr<void, int (*)(void*)>(zmq_init(2), zmq_close)), d_sock(std::unique_ptr<void, int (*)(void*)>(zmq_socket(d_ctx.get(), ZMQ_REQ), zmq_close))
30 {
31   int opt = 0;
32 
33   // lookup timeout, target and stuff
34   if (options.count("endpoint") == 0) {
35     g_log << Logger::Error << "Cannot find 'endpoint' option in connection string" << endl;
36     throw PDNSException("Cannot find 'endpoint' option in connection string");
37   }
38   this->d_endpoint = options.find("endpoint")->second;
39   this->d_options = options;
40   this->d_timeout = 2000;
41 
42   if (options.find("timeout") != options.end()) {
43     this->d_timeout = std::stoi(options.find("timeout")->second);
44   }
45 
46   zmq_setsockopt(d_sock.get(), ZMQ_LINGER, &opt, sizeof(opt));
47 
48   if (zmq_connect(this->d_sock.get(), this->d_endpoint.c_str()) < 0) {
49     g_log << Logger::Error << "zmq_connect() failed" << zmq_strerror(errno) << std::endl;
50     ;
51     throw PDNSException("Cannot find 'endpoint' option in connection string");
52   }
53 
54   Json::array parameters;
55   Json msg = Json(Json::object{
56     {"method", "initialize"},
57     {"parameters", Json(options)},
58   });
59 
60   this->send(msg);
61   msg = nullptr;
62   if (this->recv(msg) == false) {
63     g_log << Logger::Error << "Failed to initialize zeromq" << std::endl;
64     throw PDNSException("Failed to initialize zeromq");
65   }
66 };
67 
~ZeroMQConnector()68 ZeroMQConnector::~ZeroMQConnector() {}
69 
send_message(const Json & input)70 int ZeroMQConnector::send_message(const Json& input)
71 {
72   auto line = input.dump();
73   zmq_msg_t message;
74 
75   zmq_msg_init_size(&message, line.size() + 1);
76   line.copy(reinterpret_cast<char*>(zmq_msg_data(&message)), line.size());
77   ((char*)zmq_msg_data(&message))[line.size()] = '\0';
78 
79   try {
80     zmq_pollitem_t item;
81     item.socket = d_sock.get();
82     item.events = ZMQ_POLLOUT;
83     // poll until it's sent or timeout is spent. try to leave
84     // leave few cycles for read. just in case.
85     for (d_timespent = 0; d_timespent < d_timeout - 5; d_timespent++) {
86       if (zmq_poll(&item, 1, 1) > 0) {
87         if (zmq_msg_send(&message, this->d_sock.get(), 0) == -1) {
88           // message was not sent
89           g_log << Logger::Error << "Cannot send to " << this->d_endpoint << ": " << zmq_strerror(errno) << std::endl;
90         }
91         else
92           return line.size();
93       }
94     }
95   }
96   catch (std::exception& ex) {
97     g_log << Logger::Error << "Cannot send to " << this->d_endpoint << ": " << ex.what() << std::endl;
98     throw PDNSException(ex.what());
99   }
100 
101   return 0;
102 }
103 
recv_message(Json & output)104 int ZeroMQConnector::recv_message(Json& output)
105 {
106   int rv = 0;
107   // try to receive message
108   zmq_pollitem_t item;
109   zmq_msg_t message;
110 
111   item.socket = d_sock.get();
112   item.events = ZMQ_POLLIN;
113 
114   try {
115     // do zmq::poll few times
116     // d_timespent should always be initialized by send_message, recv should never
117     // be called without send first.
118     for (; d_timespent < d_timeout; d_timespent++) {
119       if (zmq_poll(&item, 1, 1) > 0) {
120         // we have an event
121         if ((item.revents & ZMQ_POLLIN) == ZMQ_POLLIN) {
122           string data;
123           size_t msg_size;
124           zmq_msg_init(&message);
125           // read something
126           if (zmq_msg_recv(&message, this->d_sock.get(), ZMQ_NOBLOCK) > 0) {
127             string err;
128             msg_size = zmq_msg_size(&message);
129             data.assign(reinterpret_cast<const char*>(zmq_msg_data(&message)), msg_size);
130             zmq_msg_close(&message);
131             output = Json::parse(data, err);
132             if (output != nullptr)
133               rv = msg_size;
134             else
135               g_log << Logger::Error << "Cannot parse JSON reply from " << this->d_endpoint << ": " << err << endl;
136             break;
137           }
138           else if (errno == EAGAIN) {
139             continue; // try again }
140           }
141           else {
142             break;
143           }
144         }
145       }
146     }
147   }
148   catch (std::exception& ex) {
149     g_log << Logger::Error << "Cannot receive from " << this->d_endpoint << ": " << ex.what() << std::endl;
150     throw PDNSException(ex.what());
151   }
152 
153   return rv;
154 }
155 
156 #endif
157