1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25 #include "remotebackend.hh"
26 #ifdef REMOTEBACKEND_ZEROMQ
27
ZeroMQConnector(std::map<std::string,std::string> options)28 ZeroMQConnector::ZeroMQConnector(std::map<std::string, std::string> options) :
29 d_ctx(std::unique_ptr<void, int (*)(void*)>(zmq_init(2), zmq_close)), d_sock(std::unique_ptr<void, int (*)(void*)>(zmq_socket(d_ctx.get(), ZMQ_REQ), zmq_close))
30 {
31 int opt = 0;
32
33 // lookup timeout, target and stuff
34 if (options.count("endpoint") == 0) {
35 g_log << Logger::Error << "Cannot find 'endpoint' option in connection string" << endl;
36 throw PDNSException("Cannot find 'endpoint' option in connection string");
37 }
38 this->d_endpoint = options.find("endpoint")->second;
39 this->d_options = options;
40 this->d_timeout = 2000;
41
42 if (options.find("timeout") != options.end()) {
43 this->d_timeout = std::stoi(options.find("timeout")->second);
44 }
45
46 zmq_setsockopt(d_sock.get(), ZMQ_LINGER, &opt, sizeof(opt));
47
48 if (zmq_connect(this->d_sock.get(), this->d_endpoint.c_str()) < 0) {
49 g_log << Logger::Error << "zmq_connect() failed" << zmq_strerror(errno) << std::endl;
50 ;
51 throw PDNSException("Cannot find 'endpoint' option in connection string");
52 }
53
54 Json::array parameters;
55 Json msg = Json(Json::object{
56 {"method", "initialize"},
57 {"parameters", Json(options)},
58 });
59
60 this->send(msg);
61 msg = nullptr;
62 if (this->recv(msg) == false) {
63 g_log << Logger::Error << "Failed to initialize zeromq" << std::endl;
64 throw PDNSException("Failed to initialize zeromq");
65 }
66 };
67
~ZeroMQConnector()68 ZeroMQConnector::~ZeroMQConnector() {}
69
send_message(const Json & input)70 int ZeroMQConnector::send_message(const Json& input)
71 {
72 auto line = input.dump();
73 zmq_msg_t message;
74
75 zmq_msg_init_size(&message, line.size() + 1);
76 line.copy(reinterpret_cast<char*>(zmq_msg_data(&message)), line.size());
77 ((char*)zmq_msg_data(&message))[line.size()] = '\0';
78
79 try {
80 zmq_pollitem_t item;
81 item.socket = d_sock.get();
82 item.events = ZMQ_POLLOUT;
83 // poll until it's sent or timeout is spent. try to leave
84 // leave few cycles for read. just in case.
85 for (d_timespent = 0; d_timespent < d_timeout - 5; d_timespent++) {
86 if (zmq_poll(&item, 1, 1) > 0) {
87 if (zmq_msg_send(&message, this->d_sock.get(), 0) == -1) {
88 // message was not sent
89 g_log << Logger::Error << "Cannot send to " << this->d_endpoint << ": " << zmq_strerror(errno) << std::endl;
90 }
91 else
92 return line.size();
93 }
94 }
95 }
96 catch (std::exception& ex) {
97 g_log << Logger::Error << "Cannot send to " << this->d_endpoint << ": " << ex.what() << std::endl;
98 throw PDNSException(ex.what());
99 }
100
101 return 0;
102 }
103
recv_message(Json & output)104 int ZeroMQConnector::recv_message(Json& output)
105 {
106 int rv = 0;
107 // try to receive message
108 zmq_pollitem_t item;
109 zmq_msg_t message;
110
111 item.socket = d_sock.get();
112 item.events = ZMQ_POLLIN;
113
114 try {
115 // do zmq::poll few times
116 // d_timespent should always be initialized by send_message, recv should never
117 // be called without send first.
118 for (; d_timespent < d_timeout; d_timespent++) {
119 if (zmq_poll(&item, 1, 1) > 0) {
120 // we have an event
121 if ((item.revents & ZMQ_POLLIN) == ZMQ_POLLIN) {
122 string data;
123 size_t msg_size;
124 zmq_msg_init(&message);
125 // read something
126 if (zmq_msg_recv(&message, this->d_sock.get(), ZMQ_NOBLOCK) > 0) {
127 string err;
128 msg_size = zmq_msg_size(&message);
129 data.assign(reinterpret_cast<const char*>(zmq_msg_data(&message)), msg_size);
130 zmq_msg_close(&message);
131 output = Json::parse(data, err);
132 if (output != nullptr)
133 rv = msg_size;
134 else
135 g_log << Logger::Error << "Cannot parse JSON reply from " << this->d_endpoint << ": " << err << endl;
136 break;
137 }
138 else if (errno == EAGAIN) {
139 continue; // try again }
140 }
141 else {
142 break;
143 }
144 }
145 }
146 }
147 }
148 catch (std::exception& ex) {
149 g_log << Logger::Error << "Cannot receive from " << this->d_endpoint << ": " << ex.what() << std::endl;
150 throw PDNSException(ex.what());
151 }
152
153 return rv;
154 }
155
156 #endif
157