1 // Copyright (C) 2013-2020 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 #include <asiolink/asio_wrapper.h>
9 #include <dhcp_ddns/dhcp_ddns_log.h>
10 #include <dhcp_ddns/ncr_io.h>
11 #include <util/multi_threading_mgr.h>
12 
13 #include <boost/algorithm/string/predicate.hpp>
14 
15 #include <mutex>
16 
17 namespace isc {
18 namespace dhcp_ddns {
19 
20 using namespace isc::util;
21 using namespace std;
22 
stringToNcrProtocol(const std::string & protocol_str)23 NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str) {
24     if (boost::iequals(protocol_str, "UDP")) {
25         return (NCR_UDP);
26     }
27 
28     if (boost::iequals(protocol_str, "TCP")) {
29         return (NCR_TCP);
30     }
31 
32     isc_throw(BadValue,
33               "Invalid NameChangeRequest protocol: " << protocol_str);
34 }
35 
ncrProtocolToString(NameChangeProtocol protocol)36 std::string ncrProtocolToString(NameChangeProtocol protocol) {
37     switch (protocol) {
38     case NCR_UDP:
39         return ("UDP");
40     case NCR_TCP:
41         return ("TCP");
42     default:
43         break;
44     }
45 
46     std::ostringstream stream;
47     stream  << "UNKNOWN(" << protocol << ")";
48     return (stream.str());
49 }
50 
51 
52 //************************** NameChangeListener ***************************
53 
NameChangeListener(RequestReceiveHandler & recv_handler)54 NameChangeListener::NameChangeListener(RequestReceiveHandler&
55                                        recv_handler)
56     : listening_(false), io_pending_(false), recv_handler_(recv_handler) {
57 };
58 
59 
60 void
startListening(isc::asiolink::IOService & io_service)61 NameChangeListener::startListening(isc::asiolink::IOService& io_service) {
62     if (amListening()) {
63         // This amounts to a programmatic error.
64         isc_throw(NcrListenerError, "NameChangeListener is already listening");
65     }
66 
67     // Call implementation dependent open.
68     try {
69         open(io_service);
70     } catch (const isc::Exception& ex) {
71         stopListening();
72         isc_throw(NcrListenerOpenError, "Open failed: " << ex.what());
73     }
74 
75     // Set our status to listening.
76     setListening(true);
77 
78     // Start the first asynchronous receive.
79     try {
80         receiveNext();
81     } catch (const isc::Exception& ex) {
82         stopListening();
83         isc_throw(NcrListenerReceiveError, "doReceive failed: " << ex.what());
84     }
85 }
86 
87 void
receiveNext()88 NameChangeListener::receiveNext() {
89     io_pending_ = true;
90     doReceive();
91 }
92 
93 void
stopListening()94 NameChangeListener::stopListening() {
95     try {
96         // Call implementation dependent close.
97         close();
98     } catch (const isc::Exception &ex) {
99         // Swallow exceptions. If we have some sort of error we'll log
100         // it but we won't propagate the throw.
101         LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_LISTEN_CLOSE_ERROR)
102                   .arg(ex.what());
103     }
104 
105     // Set it false, no matter what.  This allows us to at least try to
106     // re-open via startListening().
107     setListening(false);
108 }
109 
110 void
invokeRecvHandler(const Result result,NameChangeRequestPtr & ncr)111 NameChangeListener::invokeRecvHandler(const Result result,
112                                       NameChangeRequestPtr& ncr) {
113     // Call the registered application layer handler.
114     // Surround the invocation with a try-catch. The invoked handler is
115     // not supposed to throw, but in the event it does we will at least
116     // report it.
117     try {
118         io_pending_ = false;
119         recv_handler_(result, ncr);
120     } catch (const std::exception& ex) {
121         LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR)
122                   .arg(ex.what());
123     }
124 
125     // Start the next IO layer asynchronous receive.
126     // In the event the handler above intervened and decided to stop listening
127     // we need to check that first.
128     if (amListening()) {
129         try {
130             receiveNext();
131         } catch (const isc::Exception& ex) {
132             // It is possible though unlikely, for doReceive to fail without
133             // scheduling the read. While, unlikely, it does mean the callback
134             // will not get called with a failure. A throw here would surface
135             // at the IOService::run (or run variant) invocation.  So we will
136             // close the window by invoking the application handler with
137             // a failed result, and let the application layer sort it out.
138             LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_RECV_NEXT_ERROR)
139                       .arg(ex.what());
140 
141             // Call the registered application layer handler.
142             // Surround the invocation with a try-catch. The invoked handler is
143             // not supposed to throw, but in the event it does we will at least
144             // report it.
145             NameChangeRequestPtr empty;
146             try {
147                 io_pending_ = false;
148                 recv_handler_(ERROR, empty);
149             } catch (const std::exception& ex) {
150                 LOG_ERROR(dhcp_ddns_logger,
151                           DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR)
152                           .arg(ex.what());
153             }
154         }
155     }
156 }
157 
158 //************************* NameChangeSender ******************************
159 
NameChangeSender(RequestSendHandler & send_handler,size_t send_queue_max)160 NameChangeSender::NameChangeSender(RequestSendHandler& send_handler,
161                                    size_t send_queue_max)
162     : sending_(false), send_handler_(send_handler),
163       send_queue_max_(send_queue_max), io_service_(NULL), mutex_(new mutex) {
164 
165     // Queue size must be big enough to hold at least 1 entry.
166     setQueueMaxSize(send_queue_max);
167 }
168 
169 void
startSending(isc::asiolink::IOService & io_service)170 NameChangeSender::startSending(isc::asiolink::IOService& io_service) {
171     if (amSending()) {
172         // This amounts to a programmatic error.
173         isc_throw(NcrSenderError, "NameChangeSender is already sending");
174     }
175 
176     // Call implementation dependent open.
177     try {
178         if (MultiThreadingMgr::instance().getMode()) {
179             lock_guard<mutex> lock(*mutex_);
180             startSendingInternal(io_service);
181         } else {
182             startSendingInternal(io_service);
183         }
184     } catch (const isc::Exception& ex) {
185         stopSending();
186         isc_throw(NcrSenderOpenError, "Open failed: " << ex.what());
187     }
188 }
189 
190 void
startSendingInternal(isc::asiolink::IOService & io_service)191 NameChangeSender::startSendingInternal(isc::asiolink::IOService& io_service) {
192     // Clear send marker.
193     ncr_to_send_.reset();
194 
195     // Remember io service we're given.
196     io_service_ = &io_service;
197     open(io_service);
198 
199     // Set our status to sending.
200     setSending(true);
201 
202     // If there's any queued already.. we'll start sending.
203     sendNext();
204 }
205 
206 void
stopSending()207 NameChangeSender::stopSending() {
208     // Set it send indicator to false, no matter what. This allows us to at
209     // least try to re-open via startSending(). Also, setting it false now,
210     // allows us to break sendNext() chain in invokeSendHandler.
211     setSending(false);
212 
213     // If there is an outstanding IO to complete, attempt to process it.
214     if (ioReady() && io_service_ != NULL) {
215         try {
216             runReadyIO();
217         } catch (const std::exception& ex) {
218             // Swallow exceptions. If we have some sort of error we'll log
219             // it but we won't propagate the throw.
220             LOG_ERROR(dhcp_ddns_logger,
221                   DHCP_DDNS_NCR_FLUSH_IO_ERROR).arg(ex.what());
222         }
223     }
224 
225     try {
226         // Call implementation dependent close.
227         close();
228     } catch (const isc::Exception &ex) {
229         // Swallow exceptions. If we have some sort of error we'll log
230         // it but we won't propagate the throw.
231         LOG_ERROR(dhcp_ddns_logger,
232                   DHCP_DDNS_NCR_SEND_CLOSE_ERROR).arg(ex.what());
233     }
234 
235     io_service_ = NULL;
236 }
237 
238 void
sendRequest(NameChangeRequestPtr & ncr)239 NameChangeSender::sendRequest(NameChangeRequestPtr& ncr) {
240     if (!amSending()) {
241         isc_throw(NcrSenderError, "sender is not ready to send");
242     }
243 
244     if (!ncr) {
245         isc_throw(NcrSenderError, "request to send is empty");
246     }
247 
248     if (MultiThreadingMgr::instance().getMode()) {
249         lock_guard<mutex> lock(*mutex_);
250         sendRequestInternal(ncr);
251     } else {
252         sendRequestInternal(ncr);
253     }
254 }
255 
256 void
sendRequestInternal(NameChangeRequestPtr & ncr)257 NameChangeSender::sendRequestInternal(NameChangeRequestPtr& ncr) {
258     if (send_queue_.size() >= send_queue_max_) {
259         isc_throw(NcrSenderQueueFull,
260                   "send queue has reached maximum capacity: "
261                   << send_queue_max_);
262     }
263 
264     // Put it on the queue.
265     send_queue_.push_back(ncr);
266 
267     // Call sendNext to schedule the next one to go.
268     sendNext();
269 }
270 
271 void
sendNext()272 NameChangeSender::sendNext() {
273     if (ncr_to_send_) {
274         // @todo Not sure if there is any risk of getting stuck here but
275         // an interval timer to defend would be good.
276         // In reality, the derivation should ensure they timeout themselves
277         return;
278     }
279 
280     // If queue isn't empty, then get one from the front. Note we leave
281     // it on the front of the queue until we successfully send it.
282     if (!send_queue_.empty()) {
283         ncr_to_send_ = send_queue_.front();
284 
285        // @todo start defense timer
286        // If a send were to hang and we timed it out, then timeout
287        // handler need to cycle thru open/close ?
288 
289        // Call implementation dependent send.
290        doSend(ncr_to_send_);
291     }
292 }
293 
294 void
invokeSendHandler(const NameChangeSender::Result result)295 NameChangeSender::invokeSendHandler(const NameChangeSender::Result result) {
296     if (MultiThreadingMgr::instance().getMode()) {
297         lock_guard<mutex> lock(*mutex_);
298         invokeSendHandlerInternal(result);
299     } else {
300         invokeSendHandlerInternal(result);
301     }
302 }
303 
304 void
invokeSendHandlerInternal(const NameChangeSender::Result result)305 NameChangeSender::invokeSendHandlerInternal(const NameChangeSender::Result result) {
306     // @todo reset defense timer
307     if (result == SUCCESS) {
308         // It shipped so pull it off the queue.
309         send_queue_.pop_front();
310     }
311 
312     // Invoke the completion handler passing in the result and a pointer
313     // the request involved.
314     // Surround the invocation with a try-catch. The invoked handler is
315     // not supposed to throw, but in the event it does we will at least
316     // report it.
317     try {
318         send_handler_(result, ncr_to_send_);
319     } catch (const std::exception& ex) {
320         LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_UNCAUGHT_NCR_SEND_HANDLER_ERROR)
321                   .arg(ex.what());
322     }
323 
324     // Clear the pending ncr pointer.
325     ncr_to_send_.reset();
326 
327     // Set up the next send
328     try {
329         if (amSending()) {
330             sendNext();
331         }
332     } catch (const isc::Exception& ex) {
333         // It is possible though unlikely, for sendNext to fail without
334         // scheduling the send. While, unlikely, it does mean the callback
335         // will not get called with a failure. A throw here would surface
336         // at the IOService::run (or run variant) invocation.  So we will
337         // close the window by invoking the application handler with
338         // a failed result, and let the application layer sort it out.
339         LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_SEND_NEXT_ERROR)
340                   .arg(ex.what());
341 
342         // Invoke the completion handler passing in failed result.
343         // Surround the invocation with a try-catch. The invoked handler is
344         // not supposed to throw, but in the event it does we will at least
345         // report it.
346         try {
347             send_handler_(ERROR, ncr_to_send_);
348         } catch (const std::exception& ex) {
349             LOG_ERROR(dhcp_ddns_logger,
350                       DHCP_DDNS_UNCAUGHT_NCR_SEND_HANDLER_ERROR).arg(ex.what());
351         }
352     }
353 }
354 
355 void
skipNext()356 NameChangeSender::skipNext() {
357     if (MultiThreadingMgr::instance().getMode()) {
358         lock_guard<mutex> lock(*mutex_);
359         skipNextInternal();
360     } else {
361         skipNextInternal();
362     }
363 }
364 
365 void
skipNextInternal()366 NameChangeSender::skipNextInternal() {
367     if (!send_queue_.empty()) {
368         // Discards the request at the front of the queue.
369         send_queue_.pop_front();
370     }
371 }
372 
373 void
clearSendQueue()374 NameChangeSender::clearSendQueue() {
375     if (amSending()) {
376         isc_throw(NcrSenderError, "Cannot clear queue while sending");
377     }
378 
379     if (MultiThreadingMgr::instance().getMode()) {
380         lock_guard<mutex> lock(*mutex_);
381         send_queue_.clear();
382     } else {
383         send_queue_.clear();
384     }
385 }
386 
387 void
setQueueMaxSize(const size_t new_max)388 NameChangeSender::setQueueMaxSize(const size_t new_max) {
389     if (new_max == 0) {
390         isc_throw(NcrSenderError, "NameChangeSender:"
391                   " queue size must be greater than zero");
392     }
393 
394     send_queue_max_ = new_max;
395 }
396 
397 size_t
getQueueSize() const398 NameChangeSender::getQueueSize() const {
399     if (MultiThreadingMgr::instance().getMode()) {
400         lock_guard<mutex> lock(*mutex_);
401         return (getQueueSizeInternal());
402     } else {
403         return (getQueueSizeInternal());
404     }
405 }
406 
407 size_t
getQueueSizeInternal() const408 NameChangeSender::getQueueSizeInternal() const {
409     return (send_queue_.size());
410 }
411 
412 const NameChangeRequestPtr&
peekAt(const size_t index) const413 NameChangeSender::peekAt(const size_t index) const {
414     if (MultiThreadingMgr::instance().getMode()) {
415         lock_guard<mutex> lock(*mutex_);
416         return (peekAtInternal(index));
417     } else {
418         return (peekAtInternal(index));
419     }
420 }
421 
422 const NameChangeRequestPtr&
peekAtInternal(const size_t index) const423 NameChangeSender::peekAtInternal(const size_t index) const {
424     auto size = getQueueSizeInternal();
425     if (index >= size) {
426         isc_throw(NcrSenderError,
427                   "NameChangeSender::peekAt peek beyond end of queue attempted"
428                   << " index: " << index << " queue size: " << size);
429     }
430 
431     return (send_queue_.at(index));
432 }
433 
434 bool
isSendInProgress() const435 NameChangeSender::isSendInProgress() const {
436     if (MultiThreadingMgr::instance().getMode()) {
437         lock_guard<mutex> lock(*mutex_);
438         return ((ncr_to_send_) ? true : false);
439     } else {
440         return ((ncr_to_send_) ? true : false);
441     }
442 }
443 
444 void
assumeQueue(NameChangeSender & source_sender)445 NameChangeSender::assumeQueue(NameChangeSender& source_sender) {
446     if (source_sender.amSending()) {
447         isc_throw(NcrSenderError, "Cannot assume queue:"
448                   " source sender is actively sending");
449     }
450 
451     if (amSending()) {
452         isc_throw(NcrSenderError, "Cannot assume queue:"
453                   " target sender is actively sending");
454     }
455 
456     if (getQueueMaxSize() < source_sender.getQueueSize()) {
457         isc_throw(NcrSenderError, "Cannot assume queue:"
458                   " source queue count exceeds target queue max");
459     }
460 
461     if (MultiThreadingMgr::instance().getMode()) {
462         lock_guard<mutex> lock(*mutex_);
463         assumeQueueInternal(source_sender);
464     } else {
465         assumeQueueInternal(source_sender);
466     }
467 }
468 
469 void
assumeQueueInternal(NameChangeSender & source_sender)470 NameChangeSender::assumeQueueInternal(NameChangeSender& source_sender) {
471     if (!send_queue_.empty()) {
472         isc_throw(NcrSenderError, "Cannot assume queue:"
473                   " target queue is not empty");
474     }
475 
476     send_queue_.swap(source_sender.getSendQueue());
477 }
478 
479 int
getSelectFd()480 NameChangeSender::getSelectFd() {
481     isc_throw(NotImplemented, "NameChangeSender::getSelectFd is not supported");
482 }
483 
484 void
runReadyIO()485 NameChangeSender::runReadyIO() {
486     if (!io_service_) {
487         isc_throw(NcrSenderError, "NameChangeSender::runReadyIO"
488                   " sender io service is null");
489     }
490 
491     // We shouldn't be here if IO isn't ready to execute.
492     // By running poll we're guaranteed not to hang.
493     /// @todo Trac# 3325 requests that asiolink::IOService provide a
494     /// wrapper for poll().
495     io_service_->get_io_service().poll_one();
496 }
497 
498 }  // namespace dhcp_ddns
499 }  // namespace isc
500