1 // Copyright (C) 2018-2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7 #include <config.h>
8
9 #include <communication_state.h>
10 #include <ha_log.h>
11 #include <ha_service_states.h>
12 #include <cc/data.h>
13 #include <exceptions/exceptions.h>
14 #include <dhcp/dhcp4.h>
15 #include <dhcp/dhcp6.h>
16 #include <dhcp/option_int.h>
17 #include <dhcp/pkt4.h>
18 #include <dhcp/pkt6.h>
19 #include <http/date_time.h>
20 #include <util/boost_time_utils.h>
21 #include <util/multi_threading_mgr.h>
22
23 #include <boost/pointer_cast.hpp>
24
25 #include <functional>
26 #include <limits>
27 #include <sstream>
28 #include <utility>
29
30 using namespace isc::asiolink;
31 using namespace isc::data;
32 using namespace isc::dhcp;
33 using namespace isc::http;
34 using namespace isc::log;
35 using namespace isc::util;
36
37 using namespace boost::posix_time;
38 using namespace std;
39
40 namespace {
41
42 /// @brief Warning is issued if the clock skew exceeds this value.
43 constexpr long WARN_CLOCK_SKEW = 30;
44
45 /// @brief HA service terminates if the clock skew exceeds this value.
46 constexpr long TERM_CLOCK_SKEW = 60;
47
48 /// @brief Minimum time between two consecutive clock skew warnings.
49 constexpr long MIN_TIME_SINCE_CLOCK_SKEW_WARN = 60;
50
51 }
52
53 namespace isc {
54 namespace ha {
55
CommunicationState(const IOServicePtr & io_service,const HAConfigPtr & config)56 CommunicationState::CommunicationState(const IOServicePtr& io_service,
57 const HAConfigPtr& config)
58 : io_service_(io_service), config_(config), timer_(), interval_(0),
59 poke_time_(boost::posix_time::microsec_clock::universal_time()),
60 heartbeat_impl_(0), partner_state_(-1), partner_scopes_(),
61 clock_skew_(0, 0, 0, 0), last_clock_skew_warn_(),
62 my_time_at_skew_(), partner_time_at_skew_(),
63 analyzed_messages_count_(0), unsent_update_count_(0),
64 partner_unsent_update_count_{0, 0}, mutex_(new mutex()) {
65 }
66
~CommunicationState()67 CommunicationState::~CommunicationState() {
68 stopHeartbeat();
69 }
70
71 void
modifyPokeTime(const long secs)72 CommunicationState::modifyPokeTime(const long secs) {
73 if (MultiThreadingMgr::instance().getMode()) {
74 std::lock_guard<std::mutex> lk(*mutex_);
75 poke_time_ += boost::posix_time::seconds(secs);
76 } else {
77 poke_time_ += boost::posix_time::seconds(secs);
78 }
79 }
80
81 int
getPartnerState() const82 CommunicationState::getPartnerState() const {
83 if (MultiThreadingMgr::instance().getMode()) {
84 std::lock_guard<std::mutex> lk(*mutex_);
85 return (partner_state_);
86 } else {
87 return (partner_state_);
88 }
89 }
90
91 void
setPartnerState(const std::string & state)92 CommunicationState::setPartnerState(const std::string& state) {
93 if (MultiThreadingMgr::instance().getMode()) {
94 std::lock_guard<std::mutex> lk(*mutex_);
95 setPartnerStateInternal(state);
96 } else {
97 setPartnerStateInternal(state);
98 }
99 }
100
101 void
setPartnerStateInternal(const std::string & state)102 CommunicationState::setPartnerStateInternal(const std::string& state) {
103 try {
104 partner_state_ = stringToState(state);
105 } catch (...) {
106 isc_throw(BadValue, "unsupported HA partner state returned "
107 << state);
108 }
109 }
110
111 std::set<std::string>
getPartnerScopes() const112 CommunicationState::getPartnerScopes() const {
113 if (MultiThreadingMgr::instance().getMode()) {
114 std::lock_guard<std::mutex> lk(*mutex_);
115 return (partner_scopes_);
116 } else {
117 return (partner_scopes_);
118 }
119 }
120
121 void
setPartnerScopes(ConstElementPtr new_scopes)122 CommunicationState::setPartnerScopes(ConstElementPtr new_scopes) {
123 if (MultiThreadingMgr::instance().getMode()) {
124 std::lock_guard<std::mutex> lk(*mutex_);
125 setPartnerScopesInternal(new_scopes);
126 } else {
127 setPartnerScopesInternal(new_scopes);
128 }
129 }
130
131 void
setPartnerScopesInternal(ConstElementPtr new_scopes)132 CommunicationState::setPartnerScopesInternal(ConstElementPtr new_scopes) {
133 if (!new_scopes || (new_scopes->getType() != Element::list)) {
134 isc_throw(BadValue, "unable to record partner's HA scopes because"
135 " the received value is not a valid JSON list");
136 }
137
138 std::set<std::string> partner_scopes;
139 for (auto i = 0; i < new_scopes->size(); ++i) {
140 auto scope = new_scopes->get(i);
141 if (scope->getType() != Element::string) {
142 isc_throw(BadValue, "unable to record partner's HA scopes because"
143 " the received scope value is not a valid JSON string");
144 }
145 auto scope_str = scope->stringValue();
146 if (!scope_str.empty()) {
147 partner_scopes.insert(scope_str);
148 }
149 }
150 partner_scopes_ = partner_scopes;
151 }
152
153 void
startHeartbeat(const long interval,const std::function<void ()> & heartbeat_impl)154 CommunicationState::startHeartbeat(const long interval,
155 const std::function<void()>& heartbeat_impl) {
156 if (MultiThreadingMgr::instance().getMode()) {
157 std::lock_guard<std::mutex> lk(*mutex_);
158 startHeartbeatInternal(interval, heartbeat_impl);
159 } else {
160 startHeartbeatInternal(interval, heartbeat_impl);
161 }
162 }
163
164 void
startHeartbeatInternal(const long interval,const std::function<void ()> & heartbeat_impl)165 CommunicationState::startHeartbeatInternal(const long interval,
166 const std::function<void()>& heartbeat_impl) {
167 bool settings_modified = false;
168
169 // If we're setting the heartbeat for the first time, it should
170 // be non-null.
171 if (heartbeat_impl) {
172 settings_modified = true;
173 heartbeat_impl_ = heartbeat_impl;
174
175 } else if (!heartbeat_impl_) {
176 // The heartbeat is re-scheduled but we have no historic implementation
177 // pointer we could re-use. This is a programmatic issue.
178 isc_throw(BadValue, "unable to start heartbeat when pointer"
179 " to the heartbeat implementation is not specified");
180 }
181
182 // If we're setting the heartbeat for the first time, the interval
183 // should be greater than 0.
184 if (interval != 0) {
185 settings_modified |= (interval_ != interval);
186 interval_ = interval;
187
188 } else if (interval_ <= 0) {
189 // The heartbeat is re-scheduled but we have no historic interval
190 // which we could re-use. This is a programmatic issue.
191 heartbeat_impl_ = 0;
192 isc_throw(BadValue, "unable to start heartbeat when interval"
193 " for the heartbeat timer is not specified");
194 }
195
196 if (!timer_) {
197 timer_.reset(new IntervalTimer(*io_service_));
198 }
199
200 if (settings_modified) {
201 timer_->setup(heartbeat_impl_, interval_, IntervalTimer::ONE_SHOT);
202 }
203 }
204
205 void
stopHeartbeat()206 CommunicationState::stopHeartbeat() {
207 if (MultiThreadingMgr::instance().getMode()) {
208 std::lock_guard<std::mutex> lk(*mutex_);
209 stopHeartbeatInternal();
210 } else {
211 stopHeartbeatInternal();
212 }
213 }
214
215 void
stopHeartbeatInternal()216 CommunicationState::stopHeartbeatInternal() {
217 if (timer_) {
218 timer_->cancel();
219 timer_.reset();
220 interval_ = 0;
221 heartbeat_impl_ = 0;
222 }
223 }
224
225 bool
isHeartbeatRunning() const226 CommunicationState::isHeartbeatRunning() const {
227 if (MultiThreadingMgr::instance().getMode()) {
228 std::lock_guard<std::mutex> lk(*mutex_);
229 return (static_cast<bool>(timer_));
230 } else {
231 return (static_cast<bool>(timer_));
232 }
233 }
234
235 boost::posix_time::time_duration
updatePokeTime()236 CommunicationState::updatePokeTime() {
237 if (MultiThreadingMgr::instance().getMode()) {
238 std::lock_guard<std::mutex> lk(*mutex_);
239 return (updatePokeTimeInternal());
240 } else {
241 return (updatePokeTimeInternal());
242 }
243 }
244
245 boost::posix_time::time_duration
updatePokeTimeInternal()246 CommunicationState::updatePokeTimeInternal() {
247 // Remember previous poke time.
248 boost::posix_time::ptime prev_poke_time = poke_time_;
249 // Set poke time to the current time.
250 poke_time_ = boost::posix_time::microsec_clock::universal_time();
251 return (poke_time_ - prev_poke_time);
252 }
253
254 void
poke()255 CommunicationState::poke() {
256 if (MultiThreadingMgr::instance().getMode()) {
257 std::lock_guard<std::mutex> lk(*mutex_);
258 pokeInternal();
259 } else {
260 pokeInternal();
261 }
262 }
263
264 void
pokeInternal()265 CommunicationState::pokeInternal() {
266 // Update poke time and compute duration.
267 boost::posix_time::time_duration duration_since_poke = updatePokeTimeInternal();
268
269 // If we have been tracking the DHCP messages directed to the partner,
270 // we need to clear any gathered information because the connection
271 // seems to be (re)established.
272 clearConnectingClients();
273 analyzed_messages_count_ = 0;
274
275 if (timer_) {
276 // Check the duration since last poke. If it is less than a second, we don't
277 // want to reschedule the timer. In order to avoid the overhead of
278 // re-scheduling the timer too frequently we reschedule it only if the
279 // duration is 1s or more. This matches the time resolution for heartbeats.
280 if (duration_since_poke.total_seconds() > 0) {
281 // A poke causes the timer to be re-scheduled to prevent it
282 // from triggering a heartbeat shortly after confirming the
283 // connection is ok.
284 startHeartbeatInternal();
285 }
286 }
287 }
288
289 int64_t
getDurationInMillisecs() const290 CommunicationState::getDurationInMillisecs() const {
291 if (MultiThreadingMgr::instance().getMode()) {
292 std::lock_guard<std::mutex> lk(*mutex_);
293 return (getDurationInMillisecsInternal());
294 } else {
295 return (getDurationInMillisecsInternal());
296 }
297 }
298
299 int64_t
getDurationInMillisecsInternal() const300 CommunicationState::getDurationInMillisecsInternal() const {
301 ptime now = boost::posix_time::microsec_clock::universal_time();
302 time_duration duration = now - poke_time_;
303 return (duration.total_milliseconds());
304 }
305
306 bool
isCommunicationInterrupted() const307 CommunicationState::isCommunicationInterrupted() const {
308 return (getDurationInMillisecs() > config_->getMaxResponseDelay());
309 }
310
311 size_t
getAnalyzedMessagesCount() const312 CommunicationState::getAnalyzedMessagesCount() const {
313 return (analyzed_messages_count_);
314 }
315
316 bool
clockSkewShouldWarn()317 CommunicationState::clockSkewShouldWarn() {
318 if (MultiThreadingMgr::instance().getMode()) {
319 std::lock_guard<std::mutex> lk(*mutex_);
320 return (clockSkewShouldWarnInternal());
321 } else {
322 return (clockSkewShouldWarnInternal());
323 }
324 }
325
326 bool
clockSkewShouldWarnInternal()327 CommunicationState::clockSkewShouldWarnInternal() {
328 // First check if the clock skew is beyond the threshold.
329 if (isClockSkewGreater(WARN_CLOCK_SKEW)) {
330
331 // In order to prevent to frequent warnings we provide a gating mechanism
332 // which doesn't allow for issuing a warning earlier than 60 seconds after
333 // the previous one.
334
335 // Find the current time and the duration since last warning.
336 ptime now = boost::posix_time::microsec_clock::universal_time();
337 time_duration since_warn_duration = now - last_clock_skew_warn_;
338
339 // If the last warning was issued more than 60 seconds ago or it is a
340 // first warning, we need to update the last warning timestamp and return
341 // true to indicate that new warning should be issued.
342 if (last_clock_skew_warn_.is_not_a_date_time() ||
343 (since_warn_duration.total_seconds() > MIN_TIME_SINCE_CLOCK_SKEW_WARN)) {
344 last_clock_skew_warn_ = now;
345 LOG_WARN(ha_logger, HA_HIGH_CLOCK_SKEW)
346 .arg(logFormatClockSkewInternal());
347 return (true);
348 }
349 }
350
351 // The warning should not be issued.
352 return (false);
353 }
354
355 bool
clockSkewShouldTerminate() const356 CommunicationState::clockSkewShouldTerminate() const {
357 if (MultiThreadingMgr::instance().getMode()) {
358 std::lock_guard<std::mutex> lk(*mutex_);
359 // Issue a warning if the clock skew is greater than 60s.
360 return (clockSkewShouldTerminateInternal());
361 } else {
362 return (clockSkewShouldTerminateInternal());
363 }
364 }
365
366 bool
clockSkewShouldTerminateInternal() const367 CommunicationState::clockSkewShouldTerminateInternal() const {
368 if (isClockSkewGreater(TERM_CLOCK_SKEW)) {
369 LOG_ERROR(ha_logger, HA_HIGH_CLOCK_SKEW_CAUSES_TERMINATION)
370 .arg(logFormatClockSkewInternal());
371 return (true);
372 }
373
374 return (false);
375 }
376
377 bool
isClockSkewGreater(const long seconds) const378 CommunicationState::isClockSkewGreater(const long seconds) const {
379 return ((clock_skew_.total_seconds() > seconds) ||
380 (clock_skew_.total_seconds() < -seconds));
381 }
382
383 void
setPartnerTime(const std::string & time_text)384 CommunicationState::setPartnerTime(const std::string& time_text) {
385 if (MultiThreadingMgr::instance().getMode()) {
386 std::lock_guard<std::mutex> lk(*mutex_);
387 setPartnerTimeInternal(time_text);
388 } else {
389 setPartnerTimeInternal(time_text);
390 }
391 }
392
393 void
setPartnerTimeInternal(const std::string & time_text)394 CommunicationState::setPartnerTimeInternal(const std::string& time_text) {
395 partner_time_at_skew_ = HttpDateTime().fromRfc1123(time_text).getPtime();
396 my_time_at_skew_ = HttpDateTime().getPtime();
397 clock_skew_ = partner_time_at_skew_ - my_time_at_skew_;
398 }
399
400 std::string
logFormatClockSkew() const401 CommunicationState::logFormatClockSkew() const {
402 if (MultiThreadingMgr::instance().getMode()) {
403 std::lock_guard<std::mutex> lk(*mutex_);
404 return (logFormatClockSkewInternal());
405 } else {
406 return (logFormatClockSkewInternal());
407 }
408 }
409
410 std::string
logFormatClockSkewInternal() const411 CommunicationState::logFormatClockSkewInternal() const {
412 std::ostringstream os;
413
414 if ((my_time_at_skew_.is_not_a_date_time()) ||
415 (partner_time_at_skew_.is_not_a_date_time())) {
416 // Guard against being called before times have been set.
417 // Otherwise we'll get out-range exceptions.
418 return ("skew not initialized");
419 }
420
421 // Note HttpTime resolution is only to seconds, so we use fractional
422 // precision of zero when logging.
423 os << "my time: " << util::ptimeToText(my_time_at_skew_, 0)
424 << ", partner's time: " << util::ptimeToText(partner_time_at_skew_, 0)
425 << ", partner's clock is ";
426
427 // If negative clock skew, the partner's time is behind our time.
428 if (clock_skew_.is_negative()) {
429 os << clock_skew_.invert_sign().total_seconds() << "s behind";
430 } else {
431 // Partner's time is ahead of ours.
432 os << clock_skew_.total_seconds() << "s ahead";
433 }
434
435 return (os.str());
436 }
437
438 ElementPtr
getReport() const439 CommunicationState::getReport() const {
440 auto report = Element::createMap();
441
442 auto in_touch = (getPartnerState() > 0);
443 report->set("in-touch", Element::create(in_touch));
444
445 auto age = in_touch ? static_cast<long long int>(getDurationInMillisecs() / 1000) : 0;
446 report->set("age", Element::create(age));
447
448 try {
449 report->set("last-state", Element::create(stateToString(getPartnerState())));
450
451 } catch (...) {
452 report->set("last-state", Element::create(std::string()));
453 }
454
455 auto list = Element::createList();
456 for (auto scope : getPartnerScopes()) {
457 list->add(Element::create(scope));
458 }
459 report->set("last-scopes", list);
460 report->set("communication-interrupted",
461 Element::create(isCommunicationInterrupted()));
462 report->set("connecting-clients", Element::create(static_cast<long long>(getConnectingClientsCount())));
463 report->set("unacked-clients", Element::create(static_cast<long long>(getUnackedClientsCount())));
464
465 long long unacked_clients_left = 0;
466 if (isCommunicationInterrupted() && (config_->getMaxUnackedClients() >= getUnackedClientsCount())) {
467 unacked_clients_left = static_cast<long long>(config_->getMaxUnackedClients() -
468 getUnackedClientsCount() + 1);
469 }
470 report->set("unacked-clients-left", Element::create(unacked_clients_left));
471 report->set("analyzed-packets", Element::create(static_cast<long long>(getAnalyzedMessagesCount())));
472
473 return (report);
474 }
475
476 uint64_t
getUnsentUpdateCount() const477 CommunicationState::getUnsentUpdateCount() const {
478 if (MultiThreadingMgr::instance().getMode()) {
479 std::lock_guard<std::mutex> lk(*mutex_);
480 return (unsent_update_count_);
481 } else {
482 return (unsent_update_count_);
483 }
484 }
485
486 void
increaseUnsentUpdateCount()487 CommunicationState::increaseUnsentUpdateCount() {
488 if (MultiThreadingMgr::instance().getMode()) {
489 std::lock_guard<std::mutex> lk(*mutex_);
490 increaseUnsentUpdateCountInternal();
491 } else {
492 increaseUnsentUpdateCountInternal();
493 }
494 }
495
496 void
increaseUnsentUpdateCountInternal()497 CommunicationState::increaseUnsentUpdateCountInternal() {
498 // Protect against setting the incremented value to zero.
499 // The zero value is reserved for a server startup.
500 if (unsent_update_count_ < std::numeric_limits<uint64_t>::max()) {
501 ++unsent_update_count_;
502 } else {
503 unsent_update_count_ = 1;
504 }
505 }
506
507 bool
hasPartnerNewUnsentUpdates() const508 CommunicationState::hasPartnerNewUnsentUpdates() const {
509 if (MultiThreadingMgr::instance().getMode()) {
510 std::lock_guard<std::mutex> lk(*mutex_);
511 return (hasPartnerNewUnsentUpdatesInternal());
512 } else {
513 return (hasPartnerNewUnsentUpdatesInternal());
514 }
515 }
516
517 bool
hasPartnerNewUnsentUpdatesInternal() const518 CommunicationState::hasPartnerNewUnsentUpdatesInternal() const {
519 return (partner_unsent_update_count_.second > 0 &&
520 (partner_unsent_update_count_.first != partner_unsent_update_count_.second));
521 }
522
523 void
setPartnerUnsentUpdateCount(uint64_t unsent_update_count)524 CommunicationState::setPartnerUnsentUpdateCount(uint64_t unsent_update_count) {
525 if (MultiThreadingMgr::instance().getMode()) {
526 std::lock_guard<std::mutex> lk(*mutex_);
527 setPartnerUnsentUpdateCountInternal(unsent_update_count);
528 } else {
529 setPartnerUnsentUpdateCountInternal(unsent_update_count);
530 }
531 }
532
533 void
setPartnerUnsentUpdateCountInternal(uint64_t unsent_update_count)534 CommunicationState::setPartnerUnsentUpdateCountInternal(uint64_t unsent_update_count) {
535 partner_unsent_update_count_.first = partner_unsent_update_count_.second;
536 partner_unsent_update_count_.second = unsent_update_count;
537 }
538
CommunicationState4(const IOServicePtr & io_service,const HAConfigPtr & config)539 CommunicationState4::CommunicationState4(const IOServicePtr& io_service,
540 const HAConfigPtr& config)
541 : CommunicationState(io_service, config), connecting_clients_() {
542 }
543
544 void
analyzeMessage(const boost::shared_ptr<dhcp::Pkt> & message)545 CommunicationState4::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message) {
546 if (MultiThreadingMgr::instance().getMode()) {
547 std::lock_guard<std::mutex> lk(*mutex_);
548 analyzeMessageInternal(message);
549 } else {
550 analyzeMessageInternal(message);
551 }
552 }
553
554 void
analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt> & message)555 CommunicationState4::analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt>& message) {
556 // The DHCP message must successfully cast to a Pkt4 object.
557 Pkt4Ptr msg = boost::dynamic_pointer_cast<Pkt4>(message);
558 if (!msg) {
559 isc_throw(BadValue, "DHCP message to be analyzed is not a DHCPv4 message");
560 }
561
562 ++analyzed_messages_count_;
563
564 // Check value of the "secs" field by comparing it with the configured
565 // threshold.
566 uint16_t secs = msg->getSecs();
567
568 // It was observed that some Windows clients may send swapped bytes in the
569 // "secs" field. When the second byte is 0 and the first byte is non-zero
570 // we consider bytes to be swapped and so we correct them.
571 if ((secs > 255) && ((secs & 0xFF) == 0)) {
572 secs = ((secs >> 8) | (secs << 8));
573 }
574
575 // Check the value of the "secs" field. The "secs" field holds a value in
576 // seconds, hence we have to multiple by 1000 to get a value in milliseconds.
577 // If the secs value is above the threshold, it means that the current
578 // client should be considered unacked.
579 auto unacked = (secs * 1000 > config_->getMaxAckDelay());
580
581 // Client identifier will be stored together with the hardware address. It
582 // may remain empty if the client hasn't specified it.
583 std::vector<uint8_t> client_id;
584 OptionPtr opt_client_id = msg->getOption(DHO_DHCP_CLIENT_IDENTIFIER);
585 if (opt_client_id) {
586 client_id = opt_client_id->getData();
587 }
588
589 bool log_unacked = false;
590
591 // Check if the given client was already recorded.
592 auto& idx = connecting_clients_.get<0>();
593 auto existing_request = idx.find(boost::make_tuple(msg->getHWAddr()->hwaddr_, client_id));
594 if (existing_request != idx.end()) {
595 // If the client was recorded and was not considered unacked
596 // but it should be considered unacked as a result of processing
597 // this packet, let's update the recorded request to mark the
598 // client unacked.
599 if (!existing_request->unacked_ && unacked) {
600 ConnectingClient4 connecting_client{ msg->getHWAddr()->hwaddr_, client_id, unacked };
601 idx.replace(existing_request, connecting_client);
602 log_unacked = true;
603 }
604
605 } else {
606 // This is the first time we see the packet from this client. Let's
607 // record it.
608 ConnectingClient4 connecting_client{ msg->getHWAddr()->hwaddr_, client_id, unacked };
609 idx.insert(connecting_client);
610 log_unacked = unacked;
611
612 if (!unacked) {
613 // This is the first time we see this client after getting into the
614 // communication interrupted state. But, this client hasn't been
615 // yet trying log enough to be considered unacked.
616 LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT4)
617 .arg(message->getLabel());
618 }
619 }
620
621 // Only log the first time we detect a client is unacked.
622 if (log_unacked) {
623 unsigned unacked_left = 0;
624 unsigned unacked_total = connecting_clients_.get<1>().count(true);
625 if (config_->getMaxUnackedClients() >= unacked_total) {
626 unacked_left = config_->getMaxUnackedClients() - unacked_total + 1;
627 }
628 LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT4_UNACKED)
629 .arg(message->getLabel())
630 .arg(unacked_total)
631 .arg(unacked_left);
632 }
633 }
634
635 bool
failureDetected() const636 CommunicationState4::failureDetected() const {
637 if (MultiThreadingMgr::instance().getMode()) {
638 std::lock_guard<std::mutex> lk(*mutex_);
639 return (failureDetectedInternal());
640 } else {
641 return (failureDetectedInternal());
642 }
643 }
644
645 bool
failureDetectedInternal() const646 CommunicationState4::failureDetectedInternal() const {
647 return ((config_->getMaxUnackedClients() == 0) ||
648 (connecting_clients_.get<1>().count(true) >
649 config_->getMaxUnackedClients()));
650 }
651
652 size_t
getConnectingClientsCount() const653 CommunicationState4::getConnectingClientsCount() const {
654 if (MultiThreadingMgr::instance().getMode()) {
655 std::lock_guard<std::mutex> lk(*mutex_);
656 return (connecting_clients_.size());
657 } else {
658 return (connecting_clients_.size());
659 }
660 }
661
662 size_t
getUnackedClientsCount() const663 CommunicationState4::getUnackedClientsCount() const {
664 if (MultiThreadingMgr::instance().getMode()) {
665 std::lock_guard<std::mutex> lk(*mutex_);
666 return (connecting_clients_.get<1>().count(true));
667 } else {
668 return (connecting_clients_.get<1>().count(true));
669 }
670 }
671
672 void
clearConnectingClients()673 CommunicationState4::clearConnectingClients() {
674 connecting_clients_.clear();
675 }
676
CommunicationState6(const IOServicePtr & io_service,const HAConfigPtr & config)677 CommunicationState6::CommunicationState6(const IOServicePtr& io_service,
678 const HAConfigPtr& config)
679 : CommunicationState(io_service, config), connecting_clients_() {
680 }
681
682 void
analyzeMessage(const boost::shared_ptr<dhcp::Pkt> & message)683 CommunicationState6::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message) {
684 if (MultiThreadingMgr::instance().getMode()) {
685 std::lock_guard<std::mutex> lk(*mutex_);
686 analyzeMessageInternal(message);
687 } else {
688 analyzeMessageInternal(message);
689 }
690 }
691
692 void
analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt> & message)693 CommunicationState6::analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt>& message) {
694 // The DHCP message must successfully cast to a Pkt6 object.
695 Pkt6Ptr msg = boost::dynamic_pointer_cast<Pkt6>(message);
696 if (!msg) {
697 isc_throw(BadValue, "DHCP message to be analyzed is not a DHCPv6 message");
698 }
699
700 ++analyzed_messages_count_;
701
702 // Check the value of the "elapsed time" option. If it is below the threshold
703 // there is nothing to do. The "elapsed time" option holds the time in
704 // 1/100 of second, hence we have to multiply by 10 to get a value in milliseconds.
705 OptionUint16Ptr elapsed_time = boost::dynamic_pointer_cast<
706 OptionUint16>(msg->getOption(D6O_ELAPSED_TIME));
707 auto unacked = (elapsed_time && elapsed_time->getValue() * 10 > config_->getMaxAckDelay());
708
709 // Get the DUID of the client to see if it hasn't been recorded already.
710 OptionPtr duid = msg->getOption(D6O_CLIENTID);
711 if (!duid) {
712 return;
713 }
714
715 bool log_unacked = false;
716
717 // Check if the given client was already recorded.
718 auto& idx = connecting_clients_.get<0>();
719 auto existing_request = idx.find(duid->getData());
720 if (existing_request != idx.end()) {
721 // If the client was recorded and was not considered unacked
722 // but it should be considered unacked as a result of processing
723 // this packet, let's update the recorded request to mark the
724 // client unacked.
725 if (!existing_request->unacked_ && unacked) {
726 ConnectingClient6 connecting_client{ duid->getData(), unacked };
727 idx.replace(existing_request, connecting_client);
728 log_unacked = true;
729 }
730
731 } else {
732 // This is the first time we see the packet from this client. Let's
733 // record it.
734 ConnectingClient6 connecting_client{ duid->getData(), unacked };
735 idx.insert(connecting_client);
736 log_unacked = unacked;
737
738 if (!unacked) {
739 // This is the first time we see this client after getting into the
740 // communication interrupted state. But, this client hasn't been
741 // yet trying log enough to be considered unacked.
742 LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT6)
743 .arg(message->getLabel());
744 }
745 }
746
747 // Only log the first time we detect a client is unacked.
748 if (log_unacked) {
749 unsigned unacked_left = 0;
750 unsigned unacked_total = connecting_clients_.get<1>().count(true);
751 if (config_->getMaxUnackedClients() >= unacked_total) {
752 unacked_left = config_->getMaxUnackedClients() - unacked_total + 1;
753 }
754 LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT6_UNACKED)
755 .arg(message->getLabel())
756 .arg(unacked_total)
757 .arg(unacked_left);
758 }
759 }
760
761 bool
failureDetected() const762 CommunicationState6::failureDetected() const {
763 if (MultiThreadingMgr::instance().getMode()) {
764 std::lock_guard<std::mutex> lk(*mutex_);
765 return (failureDetectedInternal());
766 } else {
767 return (failureDetectedInternal());
768 }
769 }
770
771 bool
failureDetectedInternal() const772 CommunicationState6::failureDetectedInternal() const {
773 return ((config_->getMaxUnackedClients() == 0) ||
774 (connecting_clients_.get<1>().count(true) >
775 config_->getMaxUnackedClients()));
776 }
777
778 size_t
getConnectingClientsCount() const779 CommunicationState6::getConnectingClientsCount() const {
780 if (MultiThreadingMgr::instance().getMode()) {
781 std::lock_guard<std::mutex> lk(*mutex_);
782 return (connecting_clients_.size());
783 } else {
784 return (connecting_clients_.size());
785 }
786 }
787
788 size_t
getUnackedClientsCount() const789 CommunicationState6::getUnackedClientsCount() const {
790 if (MultiThreadingMgr::instance().getMode()) {
791 std::lock_guard<std::mutex> lk(*mutex_);
792 return (connecting_clients_.get<1>().count(true));
793 } else {
794 return (connecting_clients_.get<1>().count(true));
795 }
796 }
797
798 void
clearConnectingClients()799 CommunicationState6::clearConnectingClients() {
800 connecting_clients_.clear();
801 }
802
803 } // end of namespace isc::ha
804 } // end of namespace isc
805