1 // Copyright (C) 2018-2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
9 #include <communication_state.h>
10 #include <ha_log.h>
11 #include <ha_service_states.h>
12 #include <cc/data.h>
13 #include <exceptions/exceptions.h>
14 #include <dhcp/dhcp4.h>
15 #include <dhcp/dhcp6.h>
16 #include <dhcp/option_int.h>
17 #include <dhcp/pkt4.h>
18 #include <dhcp/pkt6.h>
19 #include <http/date_time.h>
20 #include <util/boost_time_utils.h>
21 #include <util/multi_threading_mgr.h>
22 
23 #include <boost/pointer_cast.hpp>
24 
25 #include <functional>
26 #include <limits>
27 #include <sstream>
28 #include <utility>
29 
30 using namespace isc::asiolink;
31 using namespace isc::data;
32 using namespace isc::dhcp;
33 using namespace isc::http;
34 using namespace isc::log;
35 using namespace isc::util;
36 
37 using namespace boost::posix_time;
38 using namespace std;
39 
40 namespace {
41 
42 /// @brief Warning is issued if the clock skew exceeds this value.
43 constexpr long WARN_CLOCK_SKEW = 30;
44 
45 /// @brief HA service terminates if the clock skew exceeds this value.
46 constexpr long TERM_CLOCK_SKEW = 60;
47 
48 /// @brief Minimum time between two consecutive clock skew warnings.
49 constexpr long MIN_TIME_SINCE_CLOCK_SKEW_WARN = 60;
50 
51 }
52 
53 namespace isc {
54 namespace ha {
55 
CommunicationState(const IOServicePtr & io_service,const HAConfigPtr & config)56 CommunicationState::CommunicationState(const IOServicePtr& io_service,
57                                        const HAConfigPtr& config)
58     : io_service_(io_service), config_(config), timer_(), interval_(0),
59       poke_time_(boost::posix_time::microsec_clock::universal_time()),
60       heartbeat_impl_(0), partner_state_(-1), partner_scopes_(),
61       clock_skew_(0, 0, 0, 0), last_clock_skew_warn_(),
62       my_time_at_skew_(), partner_time_at_skew_(),
63       analyzed_messages_count_(0), unsent_update_count_(0),
64       partner_unsent_update_count_{0, 0}, mutex_(new mutex()) {
65 }
66 
~CommunicationState()67 CommunicationState::~CommunicationState() {
68     stopHeartbeat();
69 }
70 
71 void
modifyPokeTime(const long secs)72 CommunicationState::modifyPokeTime(const long secs) {
73     if (MultiThreadingMgr::instance().getMode()) {
74         std::lock_guard<std::mutex> lk(*mutex_);
75         poke_time_ += boost::posix_time::seconds(secs);
76     } else {
77         poke_time_ += boost::posix_time::seconds(secs);
78     }
79 }
80 
81 int
getPartnerState() const82 CommunicationState::getPartnerState() const {
83     if (MultiThreadingMgr::instance().getMode()) {
84         std::lock_guard<std::mutex> lk(*mutex_);
85         return (partner_state_);
86     } else {
87         return (partner_state_);
88     }
89 }
90 
91 void
setPartnerState(const std::string & state)92 CommunicationState::setPartnerState(const std::string& state) {
93     if (MultiThreadingMgr::instance().getMode()) {
94         std::lock_guard<std::mutex> lk(*mutex_);
95         setPartnerStateInternal(state);
96     } else {
97         setPartnerStateInternal(state);
98     }
99 }
100 
101 void
setPartnerStateInternal(const std::string & state)102 CommunicationState::setPartnerStateInternal(const std::string& state) {
103     try {
104         partner_state_ = stringToState(state);
105     } catch (...) {
106         isc_throw(BadValue, "unsupported HA partner state returned "
107                   << state);
108     }
109 }
110 
111 std::set<std::string>
getPartnerScopes() const112 CommunicationState::getPartnerScopes() const {
113     if (MultiThreadingMgr::instance().getMode()) {
114         std::lock_guard<std::mutex> lk(*mutex_);
115         return (partner_scopes_);
116     } else {
117         return (partner_scopes_);
118     }
119 }
120 
121 void
setPartnerScopes(ConstElementPtr new_scopes)122 CommunicationState::setPartnerScopes(ConstElementPtr new_scopes) {
123     if (MultiThreadingMgr::instance().getMode()) {
124         std::lock_guard<std::mutex> lk(*mutex_);
125         setPartnerScopesInternal(new_scopes);
126     } else {
127         setPartnerScopesInternal(new_scopes);
128     }
129 }
130 
131 void
setPartnerScopesInternal(ConstElementPtr new_scopes)132 CommunicationState::setPartnerScopesInternal(ConstElementPtr new_scopes) {
133     if (!new_scopes || (new_scopes->getType() != Element::list)) {
134         isc_throw(BadValue, "unable to record partner's HA scopes because"
135                   " the received value is not a valid JSON list");
136     }
137 
138     std::set<std::string> partner_scopes;
139     for (auto i = 0; i < new_scopes->size(); ++i) {
140         auto scope = new_scopes->get(i);
141         if (scope->getType() != Element::string) {
142             isc_throw(BadValue, "unable to record partner's HA scopes because"
143                       " the received scope value is not a valid JSON string");
144         }
145         auto scope_str = scope->stringValue();
146         if (!scope_str.empty()) {
147             partner_scopes.insert(scope_str);
148         }
149     }
150     partner_scopes_ = partner_scopes;
151 }
152 
153 void
startHeartbeat(const long interval,const std::function<void ()> & heartbeat_impl)154 CommunicationState::startHeartbeat(const long interval,
155                                    const std::function<void()>& heartbeat_impl) {
156     if (MultiThreadingMgr::instance().getMode()) {
157         std::lock_guard<std::mutex> lk(*mutex_);
158         startHeartbeatInternal(interval, heartbeat_impl);
159     } else {
160         startHeartbeatInternal(interval, heartbeat_impl);
161     }
162 }
163 
164 void
startHeartbeatInternal(const long interval,const std::function<void ()> & heartbeat_impl)165 CommunicationState::startHeartbeatInternal(const long interval,
166                                            const std::function<void()>& heartbeat_impl) {
167     bool settings_modified = false;
168 
169     // If we're setting the heartbeat for the first time, it should
170     // be non-null.
171     if (heartbeat_impl) {
172         settings_modified = true;
173         heartbeat_impl_ = heartbeat_impl;
174 
175     } else if (!heartbeat_impl_) {
176         // The heartbeat is re-scheduled but we have no historic implementation
177         // pointer we could re-use. This is a programmatic issue.
178         isc_throw(BadValue, "unable to start heartbeat when pointer"
179                   " to the heartbeat implementation is not specified");
180     }
181 
182     // If we're setting the heartbeat for the first time, the interval
183     // should be greater than 0.
184     if (interval != 0) {
185         settings_modified |= (interval_ != interval);
186         interval_ = interval;
187 
188     } else if (interval_ <= 0) {
189         // The heartbeat is re-scheduled but we have no historic interval
190         // which we could re-use. This is a programmatic issue.
191         heartbeat_impl_ = 0;
192         isc_throw(BadValue, "unable to start heartbeat when interval"
193                   " for the heartbeat timer is not specified");
194     }
195 
196     if (!timer_) {
197         timer_.reset(new IntervalTimer(*io_service_));
198     }
199 
200     if (settings_modified) {
201         timer_->setup(heartbeat_impl_, interval_, IntervalTimer::ONE_SHOT);
202     }
203 }
204 
205 void
stopHeartbeat()206 CommunicationState::stopHeartbeat() {
207     if (MultiThreadingMgr::instance().getMode()) {
208         std::lock_guard<std::mutex> lk(*mutex_);
209         stopHeartbeatInternal();
210     } else {
211         stopHeartbeatInternal();
212     }
213 }
214 
215 void
stopHeartbeatInternal()216 CommunicationState::stopHeartbeatInternal() {
217     if (timer_) {
218         timer_->cancel();
219         timer_.reset();
220         interval_ = 0;
221         heartbeat_impl_ = 0;
222     }
223 }
224 
225 bool
isHeartbeatRunning() const226 CommunicationState::isHeartbeatRunning() const {
227     if (MultiThreadingMgr::instance().getMode()) {
228         std::lock_guard<std::mutex> lk(*mutex_);
229         return (static_cast<bool>(timer_));
230     } else {
231         return (static_cast<bool>(timer_));
232     }
233 }
234 
235 boost::posix_time::time_duration
updatePokeTime()236 CommunicationState::updatePokeTime() {
237     if (MultiThreadingMgr::instance().getMode()) {
238         std::lock_guard<std::mutex> lk(*mutex_);
239         return (updatePokeTimeInternal());
240     } else {
241         return (updatePokeTimeInternal());
242     }
243 }
244 
245 boost::posix_time::time_duration
updatePokeTimeInternal()246 CommunicationState::updatePokeTimeInternal() {
247     // Remember previous poke time.
248     boost::posix_time::ptime prev_poke_time = poke_time_;
249     // Set poke time to the current time.
250     poke_time_ = boost::posix_time::microsec_clock::universal_time();
251     return (poke_time_ - prev_poke_time);
252 }
253 
254 void
poke()255 CommunicationState::poke() {
256     if (MultiThreadingMgr::instance().getMode()) {
257         std::lock_guard<std::mutex> lk(*mutex_);
258         pokeInternal();
259     } else {
260         pokeInternal();
261     }
262 }
263 
264 void
pokeInternal()265 CommunicationState::pokeInternal() {
266     // Update poke time and compute duration.
267     boost::posix_time::time_duration duration_since_poke = updatePokeTimeInternal();
268 
269     // If we have been tracking the DHCP messages directed to the partner,
270     // we need to clear any gathered information because the connection
271     // seems to be (re)established.
272     clearConnectingClients();
273     analyzed_messages_count_ = 0;
274 
275     if (timer_) {
276         // Check the duration since last poke. If it is less than a second, we don't
277         // want to reschedule the timer. In order to avoid the overhead of
278         // re-scheduling the timer too frequently we reschedule it only if the
279         // duration is 1s or more. This matches the time resolution for heartbeats.
280         if (duration_since_poke.total_seconds() > 0) {
281             // A poke causes the timer to be re-scheduled to prevent it
282             // from triggering a heartbeat shortly after confirming the
283             // connection is ok.
284             startHeartbeatInternal();
285         }
286     }
287 }
288 
289 int64_t
getDurationInMillisecs() const290 CommunicationState::getDurationInMillisecs() const {
291     if (MultiThreadingMgr::instance().getMode()) {
292         std::lock_guard<std::mutex> lk(*mutex_);
293         return (getDurationInMillisecsInternal());
294     } else {
295         return (getDurationInMillisecsInternal());
296     }
297 }
298 
299 int64_t
getDurationInMillisecsInternal() const300 CommunicationState::getDurationInMillisecsInternal() const {
301     ptime now = boost::posix_time::microsec_clock::universal_time();
302     time_duration duration = now - poke_time_;
303     return (duration.total_milliseconds());
304 }
305 
306 bool
isCommunicationInterrupted() const307 CommunicationState::isCommunicationInterrupted() const {
308     return (getDurationInMillisecs() > config_->getMaxResponseDelay());
309 }
310 
311 size_t
getAnalyzedMessagesCount() const312 CommunicationState::getAnalyzedMessagesCount() const {
313     return (analyzed_messages_count_);
314 }
315 
316 bool
clockSkewShouldWarn()317 CommunicationState::clockSkewShouldWarn() {
318     if (MultiThreadingMgr::instance().getMode()) {
319         std::lock_guard<std::mutex> lk(*mutex_);
320         return (clockSkewShouldWarnInternal());
321     } else {
322         return (clockSkewShouldWarnInternal());
323     }
324 }
325 
326 bool
clockSkewShouldWarnInternal()327 CommunicationState::clockSkewShouldWarnInternal() {
328     // First check if the clock skew is beyond the threshold.
329     if (isClockSkewGreater(WARN_CLOCK_SKEW)) {
330 
331         // In order to prevent to frequent warnings we provide a gating mechanism
332         // which doesn't allow for issuing a warning earlier than 60 seconds after
333         // the previous one.
334 
335         // Find the current time and the duration since last warning.
336         ptime now = boost::posix_time::microsec_clock::universal_time();
337         time_duration since_warn_duration = now - last_clock_skew_warn_;
338 
339         // If the last warning was issued more than 60 seconds ago or it is a
340         // first warning, we need to update the last warning timestamp and return
341         // true to indicate that new warning should be issued.
342         if (last_clock_skew_warn_.is_not_a_date_time() ||
343             (since_warn_duration.total_seconds() > MIN_TIME_SINCE_CLOCK_SKEW_WARN)) {
344             last_clock_skew_warn_ = now;
345             LOG_WARN(ha_logger, HA_HIGH_CLOCK_SKEW)
346                      .arg(logFormatClockSkewInternal());
347             return (true);
348         }
349     }
350 
351     // The warning should not be issued.
352     return (false);
353 }
354 
355 bool
clockSkewShouldTerminate() const356 CommunicationState::clockSkewShouldTerminate() const {
357     if (MultiThreadingMgr::instance().getMode()) {
358         std::lock_guard<std::mutex> lk(*mutex_);
359         // Issue a warning if the clock skew is greater than 60s.
360         return (clockSkewShouldTerminateInternal());
361     } else {
362         return (clockSkewShouldTerminateInternal());
363     }
364 }
365 
366 bool
clockSkewShouldTerminateInternal() const367 CommunicationState::clockSkewShouldTerminateInternal() const {
368     if (isClockSkewGreater(TERM_CLOCK_SKEW)) {
369         LOG_ERROR(ha_logger, HA_HIGH_CLOCK_SKEW_CAUSES_TERMINATION)
370                   .arg(logFormatClockSkewInternal());
371         return (true);
372     }
373 
374     return (false);
375 }
376 
377 bool
isClockSkewGreater(const long seconds) const378 CommunicationState::isClockSkewGreater(const long seconds) const {
379     return ((clock_skew_.total_seconds() > seconds) ||
380             (clock_skew_.total_seconds() < -seconds));
381 }
382 
383 void
setPartnerTime(const std::string & time_text)384 CommunicationState::setPartnerTime(const std::string& time_text) {
385     if (MultiThreadingMgr::instance().getMode()) {
386         std::lock_guard<std::mutex> lk(*mutex_);
387         setPartnerTimeInternal(time_text);
388     } else {
389         setPartnerTimeInternal(time_text);
390     }
391 }
392 
393 void
setPartnerTimeInternal(const std::string & time_text)394 CommunicationState::setPartnerTimeInternal(const std::string& time_text) {
395     partner_time_at_skew_ = HttpDateTime().fromRfc1123(time_text).getPtime();
396     my_time_at_skew_ = HttpDateTime().getPtime();
397     clock_skew_ = partner_time_at_skew_ - my_time_at_skew_;
398 }
399 
400 std::string
logFormatClockSkew() const401 CommunicationState::logFormatClockSkew() const {
402     if (MultiThreadingMgr::instance().getMode()) {
403         std::lock_guard<std::mutex> lk(*mutex_);
404         return (logFormatClockSkewInternal());
405     } else {
406         return (logFormatClockSkewInternal());
407     }
408 }
409 
410 std::string
logFormatClockSkewInternal() const411 CommunicationState::logFormatClockSkewInternal() const {
412     std::ostringstream os;
413 
414     if ((my_time_at_skew_.is_not_a_date_time()) ||
415         (partner_time_at_skew_.is_not_a_date_time())) {
416         // Guard against being called before times have been set.
417         // Otherwise we'll get out-range exceptions.
418         return ("skew not initialized");
419     }
420 
421     // Note HttpTime resolution is only to seconds, so we use fractional
422     // precision of zero when logging.
423     os << "my time: " << util::ptimeToText(my_time_at_skew_, 0)
424        << ", partner's time: " << util::ptimeToText(partner_time_at_skew_, 0)
425        << ", partner's clock is ";
426 
427     // If negative clock skew, the partner's time is behind our time.
428     if (clock_skew_.is_negative()) {
429         os << clock_skew_.invert_sign().total_seconds() << "s behind";
430     } else {
431         // Partner's time is ahead of ours.
432         os << clock_skew_.total_seconds() << "s ahead";
433     }
434 
435     return (os.str());
436 }
437 
438 ElementPtr
getReport() const439 CommunicationState::getReport() const {
440     auto report = Element::createMap();
441 
442     auto in_touch = (getPartnerState() > 0);
443     report->set("in-touch", Element::create(in_touch));
444 
445     auto age = in_touch ? static_cast<long long int>(getDurationInMillisecs() / 1000) : 0;
446     report->set("age", Element::create(age));
447 
448     try {
449         report->set("last-state", Element::create(stateToString(getPartnerState())));
450 
451     } catch (...) {
452         report->set("last-state", Element::create(std::string()));
453     }
454 
455     auto list = Element::createList();
456     for (auto scope : getPartnerScopes()) {
457         list->add(Element::create(scope));
458     }
459     report->set("last-scopes", list);
460     report->set("communication-interrupted",
461                 Element::create(isCommunicationInterrupted()));
462     report->set("connecting-clients", Element::create(static_cast<long long>(getConnectingClientsCount())));
463     report->set("unacked-clients", Element::create(static_cast<long long>(getUnackedClientsCount())));
464 
465     long long unacked_clients_left = 0;
466     if (isCommunicationInterrupted() && (config_->getMaxUnackedClients() >= getUnackedClientsCount())) {
467         unacked_clients_left = static_cast<long long>(config_->getMaxUnackedClients() -
468                                                       getUnackedClientsCount() + 1);
469     }
470     report->set("unacked-clients-left", Element::create(unacked_clients_left));
471     report->set("analyzed-packets", Element::create(static_cast<long long>(getAnalyzedMessagesCount())));
472 
473     return (report);
474 }
475 
476 uint64_t
getUnsentUpdateCount() const477 CommunicationState::getUnsentUpdateCount() const {
478     if (MultiThreadingMgr::instance().getMode()) {
479         std::lock_guard<std::mutex> lk(*mutex_);
480         return (unsent_update_count_);
481     } else {
482         return (unsent_update_count_);
483     }
484 }
485 
486 void
increaseUnsentUpdateCount()487 CommunicationState::increaseUnsentUpdateCount() {
488     if (MultiThreadingMgr::instance().getMode()) {
489         std::lock_guard<std::mutex> lk(*mutex_);
490         increaseUnsentUpdateCountInternal();
491     } else {
492         increaseUnsentUpdateCountInternal();
493     }
494 }
495 
496 void
increaseUnsentUpdateCountInternal()497 CommunicationState::increaseUnsentUpdateCountInternal() {
498     // Protect against setting the incremented value to zero.
499     // The zero value is reserved for a server startup.
500     if (unsent_update_count_ < std::numeric_limits<uint64_t>::max()) {
501         ++unsent_update_count_;
502     } else {
503         unsent_update_count_ = 1;
504     }
505 }
506 
507 bool
hasPartnerNewUnsentUpdates() const508 CommunicationState::hasPartnerNewUnsentUpdates() const {
509     if (MultiThreadingMgr::instance().getMode()) {
510         std::lock_guard<std::mutex> lk(*mutex_);
511         return (hasPartnerNewUnsentUpdatesInternal());
512     } else {
513         return (hasPartnerNewUnsentUpdatesInternal());
514     }
515 }
516 
517 bool
hasPartnerNewUnsentUpdatesInternal() const518 CommunicationState::hasPartnerNewUnsentUpdatesInternal() const {
519     return (partner_unsent_update_count_.second > 0 &&
520             (partner_unsent_update_count_.first != partner_unsent_update_count_.second));
521 }
522 
523 void
setPartnerUnsentUpdateCount(uint64_t unsent_update_count)524 CommunicationState::setPartnerUnsentUpdateCount(uint64_t unsent_update_count) {
525     if (MultiThreadingMgr::instance().getMode()) {
526         std::lock_guard<std::mutex> lk(*mutex_);
527         setPartnerUnsentUpdateCountInternal(unsent_update_count);
528     } else {
529         setPartnerUnsentUpdateCountInternal(unsent_update_count);
530     }
531 }
532 
533 void
setPartnerUnsentUpdateCountInternal(uint64_t unsent_update_count)534 CommunicationState::setPartnerUnsentUpdateCountInternal(uint64_t unsent_update_count) {
535     partner_unsent_update_count_.first = partner_unsent_update_count_.second;
536     partner_unsent_update_count_.second = unsent_update_count;
537 }
538 
CommunicationState4(const IOServicePtr & io_service,const HAConfigPtr & config)539 CommunicationState4::CommunicationState4(const IOServicePtr& io_service,
540                                          const HAConfigPtr& config)
541     : CommunicationState(io_service, config), connecting_clients_() {
542 }
543 
544 void
analyzeMessage(const boost::shared_ptr<dhcp::Pkt> & message)545 CommunicationState4::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message) {
546     if (MultiThreadingMgr::instance().getMode()) {
547         std::lock_guard<std::mutex> lk(*mutex_);
548         analyzeMessageInternal(message);
549     } else {
550         analyzeMessageInternal(message);
551     }
552 }
553 
554 void
analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt> & message)555 CommunicationState4::analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt>& message) {
556     // The DHCP message must successfully cast to a Pkt4 object.
557     Pkt4Ptr msg = boost::dynamic_pointer_cast<Pkt4>(message);
558     if (!msg) {
559         isc_throw(BadValue, "DHCP message to be analyzed is not a DHCPv4 message");
560     }
561 
562     ++analyzed_messages_count_;
563 
564     // Check value of the "secs" field by comparing it with the configured
565     // threshold.
566     uint16_t secs = msg->getSecs();
567 
568     // It was observed that some Windows clients may send swapped bytes in the
569     // "secs" field. When the second byte is 0 and the first byte is non-zero
570     // we consider bytes to be swapped and so we correct them.
571     if ((secs > 255) && ((secs & 0xFF) == 0)) {
572         secs = ((secs >> 8) | (secs << 8));
573     }
574 
575     // Check the value of the "secs" field. The "secs" field holds a value in
576     // seconds, hence we have to multiple by 1000 to get a value in milliseconds.
577     // If the secs value is above the threshold, it means that the current
578     // client should be considered unacked.
579     auto unacked = (secs * 1000 > config_->getMaxAckDelay());
580 
581     // Client identifier will be stored together with the hardware address. It
582     // may remain empty if the client hasn't specified it.
583     std::vector<uint8_t> client_id;
584     OptionPtr opt_client_id = msg->getOption(DHO_DHCP_CLIENT_IDENTIFIER);
585     if (opt_client_id) {
586         client_id = opt_client_id->getData();
587     }
588 
589     bool log_unacked = false;
590 
591     // Check if the given client was already recorded.
592     auto& idx = connecting_clients_.get<0>();
593     auto existing_request = idx.find(boost::make_tuple(msg->getHWAddr()->hwaddr_, client_id));
594     if (existing_request != idx.end()) {
595         // If the client was recorded and was not considered unacked
596         // but it should be considered unacked as a result of processing
597         // this packet, let's update the recorded request to mark the
598         // client unacked.
599         if (!existing_request->unacked_ && unacked) {
600             ConnectingClient4 connecting_client{ msg->getHWAddr()->hwaddr_, client_id, unacked };
601             idx.replace(existing_request, connecting_client);
602             log_unacked = true;
603         }
604 
605     } else {
606         // This is the first time we see the packet from this client. Let's
607         // record it.
608         ConnectingClient4 connecting_client{ msg->getHWAddr()->hwaddr_, client_id, unacked };
609         idx.insert(connecting_client);
610         log_unacked = unacked;
611 
612         if (!unacked) {
613             // This is the first time we see this client after getting into the
614             // communication interrupted state. But, this client hasn't been
615             // yet trying log enough to be considered unacked.
616             LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT4)
617                 .arg(message->getLabel());
618         }
619     }
620 
621     // Only log the first time we detect a client is unacked.
622     if (log_unacked) {
623         unsigned unacked_left = 0;
624         unsigned unacked_total = connecting_clients_.get<1>().count(true);
625         if (config_->getMaxUnackedClients() >= unacked_total) {
626             unacked_left = config_->getMaxUnackedClients() - unacked_total + 1;
627         }
628         LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT4_UNACKED)
629             .arg(message->getLabel())
630             .arg(unacked_total)
631             .arg(unacked_left);
632     }
633 }
634 
635 bool
failureDetected() const636 CommunicationState4::failureDetected() const {
637     if (MultiThreadingMgr::instance().getMode()) {
638         std::lock_guard<std::mutex> lk(*mutex_);
639         return (failureDetectedInternal());
640     } else {
641         return (failureDetectedInternal());
642     }
643 }
644 
645 bool
failureDetectedInternal() const646 CommunicationState4::failureDetectedInternal() const {
647     return ((config_->getMaxUnackedClients() == 0) ||
648             (connecting_clients_.get<1>().count(true) >
649              config_->getMaxUnackedClients()));
650 }
651 
652 size_t
getConnectingClientsCount() const653 CommunicationState4::getConnectingClientsCount() const {
654     if (MultiThreadingMgr::instance().getMode()) {
655         std::lock_guard<std::mutex> lk(*mutex_);
656         return (connecting_clients_.size());
657     } else {
658         return (connecting_clients_.size());
659     }
660 }
661 
662 size_t
getUnackedClientsCount() const663 CommunicationState4::getUnackedClientsCount() const {
664     if (MultiThreadingMgr::instance().getMode()) {
665         std::lock_guard<std::mutex> lk(*mutex_);
666         return (connecting_clients_.get<1>().count(true));
667     } else {
668         return (connecting_clients_.get<1>().count(true));
669     }
670 }
671 
672 void
clearConnectingClients()673 CommunicationState4::clearConnectingClients() {
674     connecting_clients_.clear();
675 }
676 
CommunicationState6(const IOServicePtr & io_service,const HAConfigPtr & config)677 CommunicationState6::CommunicationState6(const IOServicePtr& io_service,
678                                          const HAConfigPtr& config)
679     : CommunicationState(io_service, config), connecting_clients_() {
680 }
681 
682 void
analyzeMessage(const boost::shared_ptr<dhcp::Pkt> & message)683 CommunicationState6::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message) {
684     if (MultiThreadingMgr::instance().getMode()) {
685         std::lock_guard<std::mutex> lk(*mutex_);
686         analyzeMessageInternal(message);
687     } else {
688         analyzeMessageInternal(message);
689     }
690 }
691 
692 void
analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt> & message)693 CommunicationState6::analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt>& message) {
694     // The DHCP message must successfully cast to a Pkt6 object.
695     Pkt6Ptr msg = boost::dynamic_pointer_cast<Pkt6>(message);
696     if (!msg) {
697         isc_throw(BadValue, "DHCP message to be analyzed is not a DHCPv6 message");
698     }
699 
700     ++analyzed_messages_count_;
701 
702     // Check the value of the "elapsed time" option. If it is below the threshold
703     // there is nothing to do. The "elapsed time" option holds the time in
704     // 1/100 of second, hence we have to multiply by 10 to get a value in milliseconds.
705     OptionUint16Ptr elapsed_time = boost::dynamic_pointer_cast<
706         OptionUint16>(msg->getOption(D6O_ELAPSED_TIME));
707     auto unacked = (elapsed_time && elapsed_time->getValue() * 10 > config_->getMaxAckDelay());
708 
709     // Get the DUID of the client to see if it hasn't been recorded already.
710     OptionPtr duid = msg->getOption(D6O_CLIENTID);
711     if (!duid) {
712         return;
713     }
714 
715     bool log_unacked = false;
716 
717     // Check if the given client was already recorded.
718     auto& idx = connecting_clients_.get<0>();
719     auto existing_request = idx.find(duid->getData());
720     if (existing_request != idx.end()) {
721         // If the client was recorded and was not considered unacked
722         // but it should be considered unacked as a result of processing
723         // this packet, let's update the recorded request to mark the
724         // client unacked.
725         if (!existing_request->unacked_ && unacked) {
726             ConnectingClient6 connecting_client{ duid->getData(), unacked };
727             idx.replace(existing_request, connecting_client);
728             log_unacked = true;
729         }
730 
731     } else {
732         // This is the first time we see the packet from this client. Let's
733         // record it.
734         ConnectingClient6 connecting_client{ duid->getData(), unacked };
735         idx.insert(connecting_client);
736         log_unacked = unacked;
737 
738         if (!unacked) {
739             // This is the first time we see this client after getting into the
740             // communication interrupted state. But, this client hasn't been
741             // yet trying log enough to be considered unacked.
742             LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT6)
743                 .arg(message->getLabel());
744         }
745     }
746 
747     // Only log the first time we detect a client is unacked.
748     if (log_unacked) {
749         unsigned unacked_left = 0;
750         unsigned unacked_total = connecting_clients_.get<1>().count(true);
751         if (config_->getMaxUnackedClients() >= unacked_total) {
752             unacked_left = config_->getMaxUnackedClients() - unacked_total + 1;
753         }
754         LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT6_UNACKED)
755             .arg(message->getLabel())
756             .arg(unacked_total)
757             .arg(unacked_left);
758     }
759 }
760 
761 bool
failureDetected() const762 CommunicationState6::failureDetected() const {
763     if (MultiThreadingMgr::instance().getMode()) {
764         std::lock_guard<std::mutex> lk(*mutex_);
765         return (failureDetectedInternal());
766     } else {
767         return (failureDetectedInternal());
768     }
769 }
770 
771 bool
failureDetectedInternal() const772 CommunicationState6::failureDetectedInternal() const {
773     return ((config_->getMaxUnackedClients() == 0) ||
774             (connecting_clients_.get<1>().count(true) >
775              config_->getMaxUnackedClients()));
776 }
777 
778 size_t
getConnectingClientsCount() const779 CommunicationState6::getConnectingClientsCount() const {
780     if (MultiThreadingMgr::instance().getMode()) {
781         std::lock_guard<std::mutex> lk(*mutex_);
782         return (connecting_clients_.size());
783     } else {
784         return (connecting_clients_.size());
785     }
786 }
787 
788 size_t
getUnackedClientsCount() const789 CommunicationState6::getUnackedClientsCount() const {
790     if (MultiThreadingMgr::instance().getMode()) {
791         std::lock_guard<std::mutex> lk(*mutex_);
792         return (connecting_clients_.get<1>().count(true));
793     } else {
794         return (connecting_clients_.get<1>().count(true));
795     }
796 }
797 
798 void
clearConnectingClients()799 CommunicationState6::clearConnectingClients() {
800     connecting_clients_.clear();
801 }
802 
803 } // end of namespace isc::ha
804 } // end of namespace isc
805