1 // Copyright (C) 2013-2020 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7 #include <config.h>
8 #include <asiolink/asio_wrapper.h>
9 #include <dhcp_ddns/dhcp_ddns_log.h>
10 #include <dhcp_ddns/ncr_io.h>
11 #include <util/multi_threading_mgr.h>
12
13 #include <boost/algorithm/string/predicate.hpp>
14
15 #include <mutex>
16
17 namespace isc {
18 namespace dhcp_ddns {
19
20 using namespace isc::util;
21 using namespace std;
22
stringToNcrProtocol(const std::string & protocol_str)23 NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str) {
24 if (boost::iequals(protocol_str, "UDP")) {
25 return (NCR_UDP);
26 }
27
28 if (boost::iequals(protocol_str, "TCP")) {
29 return (NCR_TCP);
30 }
31
32 isc_throw(BadValue,
33 "Invalid NameChangeRequest protocol: " << protocol_str);
34 }
35
ncrProtocolToString(NameChangeProtocol protocol)36 std::string ncrProtocolToString(NameChangeProtocol protocol) {
37 switch (protocol) {
38 case NCR_UDP:
39 return ("UDP");
40 case NCR_TCP:
41 return ("TCP");
42 default:
43 break;
44 }
45
46 std::ostringstream stream;
47 stream << "UNKNOWN(" << protocol << ")";
48 return (stream.str());
49 }
50
51
52 //************************** NameChangeListener ***************************
53
NameChangeListener(RequestReceiveHandler & recv_handler)54 NameChangeListener::NameChangeListener(RequestReceiveHandler&
55 recv_handler)
56 : listening_(false), io_pending_(false), recv_handler_(recv_handler) {
57 };
58
59
60 void
startListening(isc::asiolink::IOService & io_service)61 NameChangeListener::startListening(isc::asiolink::IOService& io_service) {
62 if (amListening()) {
63 // This amounts to a programmatic error.
64 isc_throw(NcrListenerError, "NameChangeListener is already listening");
65 }
66
67 // Call implementation dependent open.
68 try {
69 open(io_service);
70 } catch (const isc::Exception& ex) {
71 stopListening();
72 isc_throw(NcrListenerOpenError, "Open failed: " << ex.what());
73 }
74
75 // Set our status to listening.
76 setListening(true);
77
78 // Start the first asynchronous receive.
79 try {
80 receiveNext();
81 } catch (const isc::Exception& ex) {
82 stopListening();
83 isc_throw(NcrListenerReceiveError, "doReceive failed: " << ex.what());
84 }
85 }
86
87 void
receiveNext()88 NameChangeListener::receiveNext() {
89 io_pending_ = true;
90 doReceive();
91 }
92
93 void
stopListening()94 NameChangeListener::stopListening() {
95 try {
96 // Call implementation dependent close.
97 close();
98 } catch (const isc::Exception &ex) {
99 // Swallow exceptions. If we have some sort of error we'll log
100 // it but we won't propagate the throw.
101 LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_LISTEN_CLOSE_ERROR)
102 .arg(ex.what());
103 }
104
105 // Set it false, no matter what. This allows us to at least try to
106 // re-open via startListening().
107 setListening(false);
108 }
109
110 void
invokeRecvHandler(const Result result,NameChangeRequestPtr & ncr)111 NameChangeListener::invokeRecvHandler(const Result result,
112 NameChangeRequestPtr& ncr) {
113 // Call the registered application layer handler.
114 // Surround the invocation with a try-catch. The invoked handler is
115 // not supposed to throw, but in the event it does we will at least
116 // report it.
117 try {
118 io_pending_ = false;
119 recv_handler_(result, ncr);
120 } catch (const std::exception& ex) {
121 LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR)
122 .arg(ex.what());
123 }
124
125 // Start the next IO layer asynchronous receive.
126 // In the event the handler above intervened and decided to stop listening
127 // we need to check that first.
128 if (amListening()) {
129 try {
130 receiveNext();
131 } catch (const isc::Exception& ex) {
132 // It is possible though unlikely, for doReceive to fail without
133 // scheduling the read. While, unlikely, it does mean the callback
134 // will not get called with a failure. A throw here would surface
135 // at the IOService::run (or run variant) invocation. So we will
136 // close the window by invoking the application handler with
137 // a failed result, and let the application layer sort it out.
138 LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_RECV_NEXT_ERROR)
139 .arg(ex.what());
140
141 // Call the registered application layer handler.
142 // Surround the invocation with a try-catch. The invoked handler is
143 // not supposed to throw, but in the event it does we will at least
144 // report it.
145 NameChangeRequestPtr empty;
146 try {
147 io_pending_ = false;
148 recv_handler_(ERROR, empty);
149 } catch (const std::exception& ex) {
150 LOG_ERROR(dhcp_ddns_logger,
151 DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR)
152 .arg(ex.what());
153 }
154 }
155 }
156 }
157
158 //************************* NameChangeSender ******************************
159
NameChangeSender(RequestSendHandler & send_handler,size_t send_queue_max)160 NameChangeSender::NameChangeSender(RequestSendHandler& send_handler,
161 size_t send_queue_max)
162 : sending_(false), send_handler_(send_handler),
163 send_queue_max_(send_queue_max), io_service_(NULL), mutex_(new mutex) {
164
165 // Queue size must be big enough to hold at least 1 entry.
166 setQueueMaxSize(send_queue_max);
167 }
168
169 void
startSending(isc::asiolink::IOService & io_service)170 NameChangeSender::startSending(isc::asiolink::IOService& io_service) {
171 if (amSending()) {
172 // This amounts to a programmatic error.
173 isc_throw(NcrSenderError, "NameChangeSender is already sending");
174 }
175
176 // Call implementation dependent open.
177 try {
178 if (MultiThreadingMgr::instance().getMode()) {
179 lock_guard<mutex> lock(*mutex_);
180 startSendingInternal(io_service);
181 } else {
182 startSendingInternal(io_service);
183 }
184 } catch (const isc::Exception& ex) {
185 stopSending();
186 isc_throw(NcrSenderOpenError, "Open failed: " << ex.what());
187 }
188 }
189
190 void
startSendingInternal(isc::asiolink::IOService & io_service)191 NameChangeSender::startSendingInternal(isc::asiolink::IOService& io_service) {
192 // Clear send marker.
193 ncr_to_send_.reset();
194
195 // Remember io service we're given.
196 io_service_ = &io_service;
197 open(io_service);
198
199 // Set our status to sending.
200 setSending(true);
201
202 // If there's any queued already.. we'll start sending.
203 sendNext();
204 }
205
206 void
stopSending()207 NameChangeSender::stopSending() {
208 // Set it send indicator to false, no matter what. This allows us to at
209 // least try to re-open via startSending(). Also, setting it false now,
210 // allows us to break sendNext() chain in invokeSendHandler.
211 setSending(false);
212
213 // If there is an outstanding IO to complete, attempt to process it.
214 if (ioReady() && io_service_ != NULL) {
215 try {
216 runReadyIO();
217 } catch (const std::exception& ex) {
218 // Swallow exceptions. If we have some sort of error we'll log
219 // it but we won't propagate the throw.
220 LOG_ERROR(dhcp_ddns_logger,
221 DHCP_DDNS_NCR_FLUSH_IO_ERROR).arg(ex.what());
222 }
223 }
224
225 try {
226 // Call implementation dependent close.
227 close();
228 } catch (const isc::Exception &ex) {
229 // Swallow exceptions. If we have some sort of error we'll log
230 // it but we won't propagate the throw.
231 LOG_ERROR(dhcp_ddns_logger,
232 DHCP_DDNS_NCR_SEND_CLOSE_ERROR).arg(ex.what());
233 }
234
235 io_service_ = NULL;
236 }
237
238 void
sendRequest(NameChangeRequestPtr & ncr)239 NameChangeSender::sendRequest(NameChangeRequestPtr& ncr) {
240 if (!amSending()) {
241 isc_throw(NcrSenderError, "sender is not ready to send");
242 }
243
244 if (!ncr) {
245 isc_throw(NcrSenderError, "request to send is empty");
246 }
247
248 if (MultiThreadingMgr::instance().getMode()) {
249 lock_guard<mutex> lock(*mutex_);
250 sendRequestInternal(ncr);
251 } else {
252 sendRequestInternal(ncr);
253 }
254 }
255
256 void
sendRequestInternal(NameChangeRequestPtr & ncr)257 NameChangeSender::sendRequestInternal(NameChangeRequestPtr& ncr) {
258 if (send_queue_.size() >= send_queue_max_) {
259 isc_throw(NcrSenderQueueFull,
260 "send queue has reached maximum capacity: "
261 << send_queue_max_);
262 }
263
264 // Put it on the queue.
265 send_queue_.push_back(ncr);
266
267 // Call sendNext to schedule the next one to go.
268 sendNext();
269 }
270
271 void
sendNext()272 NameChangeSender::sendNext() {
273 if (ncr_to_send_) {
274 // @todo Not sure if there is any risk of getting stuck here but
275 // an interval timer to defend would be good.
276 // In reality, the derivation should ensure they timeout themselves
277 return;
278 }
279
280 // If queue isn't empty, then get one from the front. Note we leave
281 // it on the front of the queue until we successfully send it.
282 if (!send_queue_.empty()) {
283 ncr_to_send_ = send_queue_.front();
284
285 // @todo start defense timer
286 // If a send were to hang and we timed it out, then timeout
287 // handler need to cycle thru open/close ?
288
289 // Call implementation dependent send.
290 doSend(ncr_to_send_);
291 }
292 }
293
294 void
invokeSendHandler(const NameChangeSender::Result result)295 NameChangeSender::invokeSendHandler(const NameChangeSender::Result result) {
296 if (MultiThreadingMgr::instance().getMode()) {
297 lock_guard<mutex> lock(*mutex_);
298 invokeSendHandlerInternal(result);
299 } else {
300 invokeSendHandlerInternal(result);
301 }
302 }
303
304 void
invokeSendHandlerInternal(const NameChangeSender::Result result)305 NameChangeSender::invokeSendHandlerInternal(const NameChangeSender::Result result) {
306 // @todo reset defense timer
307 if (result == SUCCESS) {
308 // It shipped so pull it off the queue.
309 send_queue_.pop_front();
310 }
311
312 // Invoke the completion handler passing in the result and a pointer
313 // the request involved.
314 // Surround the invocation with a try-catch. The invoked handler is
315 // not supposed to throw, but in the event it does we will at least
316 // report it.
317 try {
318 send_handler_(result, ncr_to_send_);
319 } catch (const std::exception& ex) {
320 LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_UNCAUGHT_NCR_SEND_HANDLER_ERROR)
321 .arg(ex.what());
322 }
323
324 // Clear the pending ncr pointer.
325 ncr_to_send_.reset();
326
327 // Set up the next send
328 try {
329 if (amSending()) {
330 sendNext();
331 }
332 } catch (const isc::Exception& ex) {
333 // It is possible though unlikely, for sendNext to fail without
334 // scheduling the send. While, unlikely, it does mean the callback
335 // will not get called with a failure. A throw here would surface
336 // at the IOService::run (or run variant) invocation. So we will
337 // close the window by invoking the application handler with
338 // a failed result, and let the application layer sort it out.
339 LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_SEND_NEXT_ERROR)
340 .arg(ex.what());
341
342 // Invoke the completion handler passing in failed result.
343 // Surround the invocation with a try-catch. The invoked handler is
344 // not supposed to throw, but in the event it does we will at least
345 // report it.
346 try {
347 send_handler_(ERROR, ncr_to_send_);
348 } catch (const std::exception& ex) {
349 LOG_ERROR(dhcp_ddns_logger,
350 DHCP_DDNS_UNCAUGHT_NCR_SEND_HANDLER_ERROR).arg(ex.what());
351 }
352 }
353 }
354
355 void
skipNext()356 NameChangeSender::skipNext() {
357 if (MultiThreadingMgr::instance().getMode()) {
358 lock_guard<mutex> lock(*mutex_);
359 skipNextInternal();
360 } else {
361 skipNextInternal();
362 }
363 }
364
365 void
skipNextInternal()366 NameChangeSender::skipNextInternal() {
367 if (!send_queue_.empty()) {
368 // Discards the request at the front of the queue.
369 send_queue_.pop_front();
370 }
371 }
372
373 void
clearSendQueue()374 NameChangeSender::clearSendQueue() {
375 if (amSending()) {
376 isc_throw(NcrSenderError, "Cannot clear queue while sending");
377 }
378
379 if (MultiThreadingMgr::instance().getMode()) {
380 lock_guard<mutex> lock(*mutex_);
381 send_queue_.clear();
382 } else {
383 send_queue_.clear();
384 }
385 }
386
387 void
setQueueMaxSize(const size_t new_max)388 NameChangeSender::setQueueMaxSize(const size_t new_max) {
389 if (new_max == 0) {
390 isc_throw(NcrSenderError, "NameChangeSender:"
391 " queue size must be greater than zero");
392 }
393
394 send_queue_max_ = new_max;
395 }
396
397 size_t
getQueueSize() const398 NameChangeSender::getQueueSize() const {
399 if (MultiThreadingMgr::instance().getMode()) {
400 lock_guard<mutex> lock(*mutex_);
401 return (getQueueSizeInternal());
402 } else {
403 return (getQueueSizeInternal());
404 }
405 }
406
407 size_t
getQueueSizeInternal() const408 NameChangeSender::getQueueSizeInternal() const {
409 return (send_queue_.size());
410 }
411
412 const NameChangeRequestPtr&
peekAt(const size_t index) const413 NameChangeSender::peekAt(const size_t index) const {
414 if (MultiThreadingMgr::instance().getMode()) {
415 lock_guard<mutex> lock(*mutex_);
416 return (peekAtInternal(index));
417 } else {
418 return (peekAtInternal(index));
419 }
420 }
421
422 const NameChangeRequestPtr&
peekAtInternal(const size_t index) const423 NameChangeSender::peekAtInternal(const size_t index) const {
424 auto size = getQueueSizeInternal();
425 if (index >= size) {
426 isc_throw(NcrSenderError,
427 "NameChangeSender::peekAt peek beyond end of queue attempted"
428 << " index: " << index << " queue size: " << size);
429 }
430
431 return (send_queue_.at(index));
432 }
433
434 bool
isSendInProgress() const435 NameChangeSender::isSendInProgress() const {
436 if (MultiThreadingMgr::instance().getMode()) {
437 lock_guard<mutex> lock(*mutex_);
438 return ((ncr_to_send_) ? true : false);
439 } else {
440 return ((ncr_to_send_) ? true : false);
441 }
442 }
443
444 void
assumeQueue(NameChangeSender & source_sender)445 NameChangeSender::assumeQueue(NameChangeSender& source_sender) {
446 if (source_sender.amSending()) {
447 isc_throw(NcrSenderError, "Cannot assume queue:"
448 " source sender is actively sending");
449 }
450
451 if (amSending()) {
452 isc_throw(NcrSenderError, "Cannot assume queue:"
453 " target sender is actively sending");
454 }
455
456 if (getQueueMaxSize() < source_sender.getQueueSize()) {
457 isc_throw(NcrSenderError, "Cannot assume queue:"
458 " source queue count exceeds target queue max");
459 }
460
461 if (MultiThreadingMgr::instance().getMode()) {
462 lock_guard<mutex> lock(*mutex_);
463 assumeQueueInternal(source_sender);
464 } else {
465 assumeQueueInternal(source_sender);
466 }
467 }
468
469 void
assumeQueueInternal(NameChangeSender & source_sender)470 NameChangeSender::assumeQueueInternal(NameChangeSender& source_sender) {
471 if (!send_queue_.empty()) {
472 isc_throw(NcrSenderError, "Cannot assume queue:"
473 " target queue is not empty");
474 }
475
476 send_queue_.swap(source_sender.getSendQueue());
477 }
478
479 int
getSelectFd()480 NameChangeSender::getSelectFd() {
481 isc_throw(NotImplemented, "NameChangeSender::getSelectFd is not supported");
482 }
483
484 void
runReadyIO()485 NameChangeSender::runReadyIO() {
486 if (!io_service_) {
487 isc_throw(NcrSenderError, "NameChangeSender::runReadyIO"
488 " sender io service is null");
489 }
490
491 // We shouldn't be here if IO isn't ready to execute.
492 // By running poll we're guaranteed not to hang.
493 /// @todo Trac# 3325 requests that asiolink::IOService provide a
494 /// wrapper for poll().
495 io_service_->get_io_service().poll_one();
496 }
497
498 } // namespace dhcp_ddns
499 } // namespace isc
500