1 /*
2 * Copyright (C) 2009-2019 Codership Oy <info@codership.com>
3 */
4
5 #include "evs_proto.hpp"
6 #include "evs_message2.hpp"
7 #include "evs_input_map2.hpp"
8
9 #include "gcomm/transport.hpp"
10 #include "gcomm/conf.hpp"
11 #include "gcomm/util.hpp"
12
13 #include "defaults.hpp"
14
15 #include <cmath>
16
17 #include <stdexcept>
18 #include <algorithm>
19 #include <numeric>
20 #include <iterator>
21 #include <set>
22
23 using namespace std::rel_ops;
24
25 // Convenience macros for debug and info logging
26 #define evs_log_debug(__mask__) \
27 if ((debug_mask_ & (__mask__)) == 0) { } \
28 else log_debug << self_string() << ": "
29
30 #define evs_log_info(__mask__) \
31 if ((info_mask_ & (__mask__)) == 0) { } \
32 else log_info << self_string() << ": "
33
34
Proto(gu::Config & conf,const UUID & my_uuid,SegmentId segment,const gu::URI & uri,const size_t mtu,const View * rst_view)35 gcomm::evs::Proto::Proto(gu::Config& conf,
36 const UUID& my_uuid,
37 SegmentId segment,
38 const gu::URI& uri,
39 const size_t mtu,
40 const View* rst_view)
41 :
42 Protolay(conf),
43 timers_(),
44 version_(check_range(Conf::EvsVersion,
45 param<int>(conf, uri, Conf::EvsVersion, Defaults::EvsVersion),
46 0, GCOMM_PROTOCOL_MAX_VERSION + 1)),
47 debug_mask_(param<int>(conf, uri, Conf::EvsDebugLogMask, "0x1", std::hex)),
48 info_mask_(param<int>(conf, uri, Conf::EvsInfoLogMask, "0x0", std::hex)),
49 last_stats_report_(gu::datetime::Date::monotonic()),
50 collect_stats_(true),
51 hs_agreed_("0.0,0.0001,0.00031623,0.001,0.0031623,0.01,0.031623,0.1,0.31623,1.,3.1623,10.,31.623"),
52 hs_safe_("0.0,0.0001,0.00031623,0.001,0.0031623,0.01,0.031623,0.1,0.31623,1.,3.1623,10.,31.623"),
53 hs_local_causal_("0.0,0.0001,0.00031623,0.001,0.0031623,0.01,0.031623,0.1,0.31623,1.,3.1623,10.,31.623"),
54 safe_deliv_latency_(),
55 send_queue_s_(0),
56 n_send_queue_s_(0),
57 sent_msgs_(Message::num_message_types, 0),
58 retrans_msgs_(0),
59 recovered_msgs_(0),
60 recvd_msgs_(Message::num_message_types, 0),
61 delivered_msgs_(O_LOCAL_CAUSAL + 1),
62 delivering_(false),
63 my_uuid_(my_uuid),
64 segment_(segment),
65 known_(),
66 self_i_(),
67 view_forget_timeout_(
68 check_range(Conf::EvsViewForgetTimeout,
69 param<gu::datetime::Period>(
70 conf, uri, Conf::EvsViewForgetTimeout,
71 Defaults::EvsViewForgetTimeout),
72 gu::from_string<gu::datetime::Period>(
73 Defaults::EvsViewForgetTimeoutMin),
74 gu::datetime::Period::max())),
75 inactive_timeout_(
76 check_range(Conf::EvsInactiveTimeout,
77 param<gu::datetime::Period>(
78 conf, uri, Conf::EvsInactiveTimeout,
79 Defaults::EvsInactiveTimeout),
80 gu::from_string<gu::datetime::Period>(
81 Defaults::EvsInactiveTimeoutMin),
82 gu::datetime::Period::max())),
83 suspect_timeout_(
84 check_range(Conf::EvsSuspectTimeout,
85 param<gu::datetime::Period>(
86 conf, uri, Conf::EvsSuspectTimeout,
87 Defaults::EvsSuspectTimeout),
88 gu::from_string<gu::datetime::Period>(
89 Defaults::EvsSuspectTimeoutMin),
90 gu::datetime::Period::max())),
91 inactive_check_period_(
92 check_range(Conf::EvsInactiveCheckPeriod,
93 param<gu::datetime::Period>(
94 conf, uri, Conf::EvsInactiveCheckPeriod,
95 Defaults::EvsInactiveCheckPeriod),
96 gu::datetime::Period::min(),
97 suspect_timeout_/2 + 1)),
98 retrans_period_(
99 check_range(Conf::EvsKeepalivePeriod,
100 param<gu::datetime::Period>(
101 conf, uri, Conf::EvsKeepalivePeriod,
102 Defaults::EvsRetransPeriod),
103 gu::from_string<gu::datetime::Period>(
104 Defaults::EvsRetransPeriodMin),
105 suspect_timeout_/3 + 1)),
106 install_timeout_(
107 check_range(Conf::EvsInstallTimeout,
108 param<gu::datetime::Period>(
109 conf, uri, Conf::EvsInstallTimeout,
110 gu::to_string(inactive_timeout_/2)),
111 retrans_period_, inactive_timeout_ + 1)),
112 join_retrans_period_(
113 check_range(Conf::EvsJoinRetransPeriod,
114 param<gu::datetime::Period>(
115 conf, uri, Conf::EvsJoinRetransPeriod,
116 Defaults::EvsJoinRetransPeriod),
117 gu::from_string<gu::datetime::Period>(
118 Defaults::EvsRetransPeriodMin),
119 gu::datetime::Period::max())),
120 stats_report_period_(
121 check_range(Conf::EvsStatsReportPeriod,
122 param<gu::datetime::Period>(
123 conf, uri, Conf::EvsStatsReportPeriod,
124 Defaults::EvsStatsReportPeriod),
125 gu::from_string<gu::datetime::Period>(
126 Defaults::EvsStatsReportPeriodMin),
127 gu::datetime::Period::max())),
128 causal_keepalive_period_(retrans_period_),
129 delay_margin_(param<gu::datetime::Period>(
130 conf, uri, Conf::EvsDelayMargin,
131 Defaults::EvsDelayMargin)),
132 delayed_keep_period_(param<gu::datetime::Period>(
133 conf, uri, Conf::EvsDelayedKeepPeriod,
134 Defaults::EvsDelayedKeepPeriod)),
135 last_inactive_check_ (gu::datetime::Date::monotonic()),
136 last_causal_keepalive_ (gu::datetime::Date::monotonic()),
137 current_view_(0, ViewId(V_TRANS, my_uuid,
138 rst_view ? rst_view -> id().seq() + 1 : 0)),
139 previous_view_(),
140 previous_views_(),
141 gather_views_(),
142 input_map_(new InputMap()),
143 causal_queue_(),
144 consensus_(*this, known_, *input_map_, current_view_),
145 last_sent_join_tstamp_(),
146 install_message_(0),
147 max_view_id_seq_(0),
148 attempt_seq_(1),
149 new_view_logged_(false),
150 max_install_timeouts_(
151 check_range(Conf::EvsMaxInstallTimeouts,
152 param<int>(conf, uri, Conf::EvsMaxInstallTimeouts,
153 Defaults::EvsMaxInstallTimeouts),
154 0, std::numeric_limits<int>::max())),
155 install_timeout_count_(0),
156 fifo_seq_(-1),
157 last_sent_(-1),
158 send_window_(
159 check_range(Conf::EvsSendWindow,
160 param<seqno_t>(conf, uri, Conf::EvsSendWindow,
161 Defaults::EvsSendWindow),
162 gu::from_string<seqno_t>(Defaults::EvsSendWindowMin),
163 std::numeric_limits<seqno_t>::max())),
164 user_send_window_(
165 check_range(Conf::EvsUserSendWindow,
166 param<seqno_t>(conf, uri, Conf::EvsUserSendWindow,
167 Defaults::EvsUserSendWindow),
168 gu::from_string<seqno_t>(Defaults::EvsUserSendWindowMin),
169 send_window_ + 1)),
170 bytes_since_request_user_msg_feedback_(),
171 output_(),
172 send_buf_(),
173 max_output_size_(128),
174 mtu_(mtu),
175 use_aggregate_(param<bool>(conf, uri, Conf::EvsUseAggregate, "true")),
176 self_loopback_(false),
177 state_(S_CLOSED),
178 shift_to_rfcnt_(0),
179 pending_leave_(false),
180 isolation_end_(gu::datetime::Date::zero()),
181 delayed_list_(),
182 auto_evict_(param<size_t>(conf, uri, Conf::EvsAutoEvict,
183 Defaults::EvsAutoEvict))
184 {
185 log_info << "EVS version " << version_;
186
187 conf.set(Conf::EvsVersion, gu::to_string(version_));
188 conf.set(Conf::EvsViewForgetTimeout, gu::to_string(view_forget_timeout_));
189 conf.set(Conf::EvsSuspectTimeout, gu::to_string(suspect_timeout_));
190 conf.set(Conf::EvsInactiveTimeout, gu::to_string(inactive_timeout_));
191 conf.set(Conf::EvsKeepalivePeriod, gu::to_string(retrans_period_));
192 conf.set(Conf::EvsInactiveCheckPeriod,
193 gu::to_string(inactive_check_period_));
194 conf.set(Conf::EvsJoinRetransPeriod, gu::to_string(join_retrans_period_));
195 conf.set(Conf::EvsInstallTimeout, gu::to_string(install_timeout_));
196 conf.set(Conf::EvsStatsReportPeriod, gu::to_string(stats_report_period_));
197 conf.set(Conf::EvsCausalKeepalivePeriod,
198 gu::to_string(causal_keepalive_period_));
199 conf.set(Conf::EvsSendWindow, gu::to_string(send_window_));
200 conf.set(Conf::EvsUserSendWindow, gu::to_string(user_send_window_));
201 conf.set(Conf::EvsUseAggregate, gu::to_string(use_aggregate_));
202 conf.set(Conf::EvsDebugLogMask, gu::to_string(debug_mask_, std::hex));
203 conf.set(Conf::EvsInfoLogMask, gu::to_string(info_mask_, std::hex));
204 conf.set(Conf::EvsMaxInstallTimeouts, gu::to_string(max_install_timeouts_));
205 conf.set(Conf::EvsDelayMargin, gu::to_string(delay_margin_));
206 conf.set(Conf::EvsDelayedKeepPeriod, gu::to_string(delayed_keep_period_));
207 conf.set(Conf::EvsAutoEvict, gu::to_string(auto_evict_));
208 //
209
210 known_.insert_unique(
211 std::make_pair(my_uuid_, Node(*this)));
212 self_i_ = known_.begin();
213 assert(NodeMap::value(self_i_).operational() == true);
214
215 NodeMap::value(self_i_).set_index(0);
216 input_map_->reset(1);
217 current_view_.add_member(my_uuid_, segment_);
218 // we don't need to store previous views, do we ?
219 if (rst_view) {
220 previous_view_ = *rst_view;
221 previous_views_.insert(
222 std::make_pair(rst_view -> id(), gu::datetime::Date::monotonic()));
223 }
224 if (mtu_ != std::numeric_limits<size_t>::max())
225 {
226 send_buf_.reserve(mtu_);
227 }
228 }
229
230
~Proto()231 gcomm::evs::Proto::~Proto()
232 {
233 output_.clear();
234 delete install_message_;
235 delete input_map_;
236 }
237
238
239 bool
set_param(const std::string & key,const std::string & val,Protolay::sync_param_cb_t & sync_param_cb)240 gcomm::evs::Proto::set_param(const std::string& key, const std::string& val,
241 Protolay::sync_param_cb_t& sync_param_cb)
242 {
243 if (key == gcomm::Conf::EvsVersion)
244 {
245 version_ = check_range(Conf::EvsVersion,
246 gu::from_string<int>(val),
247 0, GCOMM_PROTOCOL_MAX_VERSION + 1);
248 conf_.set(Conf::EvsVersion, gu::to_string(version_));
249 // trigger configuration change to propagate version
250 shift_to(S_GATHER, true);
251 return true;
252 }
253 else if (key == gcomm::Conf::EvsSendWindow)
254 {
255 send_window_ = check_range(Conf::EvsSendWindow,
256 gu::from_string<seqno_t>(val),
257 user_send_window_,
258 std::numeric_limits<seqno_t>::max());
259 conf_.set(Conf::EvsSendWindow, gu::to_string(send_window_));
260 return true;
261 }
262 else if (key == gcomm::Conf::EvsUserSendWindow)
263 {
264 user_send_window_ = check_range(
265 Conf::EvsUserSendWindow,
266 gu::from_string<seqno_t>(val),
267 gu::from_string<seqno_t>(Defaults::EvsUserSendWindowMin),
268 send_window_ + 1);
269 conf_.set(Conf::EvsUserSendWindow, gu::to_string(user_send_window_));
270 return true;
271 }
272 else if (key == gcomm::Conf::EvsMaxInstallTimeouts)
273 {
274 max_install_timeouts_ = check_range(
275 Conf::EvsMaxInstallTimeouts,
276 gu::from_string<int>(val),
277 0, std::numeric_limits<int>::max());
278 conf_.set(Conf::EvsMaxInstallTimeouts, gu::to_string(max_install_timeouts_));
279 return true;
280 }
281 else if (key == Conf::EvsStatsReportPeriod)
282 {
283 stats_report_period_ = check_range(
284 Conf::EvsStatsReportPeriod,
285 gu::from_string<gu::datetime::Period>(val),
286 gu::from_string<gu::datetime::Period>(Defaults::EvsStatsReportPeriodMin),
287 gu::datetime::Period::max());
288 conf_.set(Conf::EvsStatsReportPeriod, gu::to_string(stats_report_period_));
289 reset_timer(T_STATS);
290 return true;
291 }
292 else if (key == Conf::EvsInfoLogMask)
293 {
294 info_mask_ = gu::from_string<int>(val, std::hex);
295 conf_.set(Conf::EvsInfoLogMask, gu::to_string<int>(info_mask_, std::hex));
296 return true;
297 }
298 else if (key == Conf::EvsDebugLogMask)
299 {
300 debug_mask_ = gu::from_string<int>(val, std::hex);
301 conf_.set(Conf::EvsDebugLogMask, gu::to_string<int>(debug_mask_, std::hex));
302 return true;
303 }
304 else if (key == Conf::EvsSuspectTimeout)
305 {
306 suspect_timeout_ = check_range(
307 Conf::EvsSuspectTimeout,
308 gu::from_string<gu::datetime::Period>(val),
309 gu::from_string<gu::datetime::Period>(Defaults::EvsSuspectTimeoutMin),
310 gu::datetime::Period::max());
311 conf_.set(Conf::EvsSuspectTimeout, gu::to_string(suspect_timeout_));
312 reset_timer(T_INACTIVITY);
313 return true;
314 }
315 else if (key == Conf::EvsInactiveTimeout)
316 {
317 inactive_timeout_ = check_range(
318 Conf::EvsInactiveTimeout,
319 gu::from_string<gu::datetime::Period>(val),
320 gu::from_string<gu::datetime::Period>(Defaults::EvsInactiveTimeoutMin),
321 gu::datetime::Period::max());
322 conf_.set(Conf::EvsInactiveTimeout, gu::to_string(inactive_timeout_));
323 reset_timer(T_INACTIVITY);
324 return true;
325 }
326 else if (key == Conf::EvsKeepalivePeriod)
327 {
328 retrans_period_ = check_range(
329 Conf::EvsKeepalivePeriod,
330 gu::from_string<gu::datetime::Period>(val),
331 gu::from_string<gu::datetime::Period>(Defaults::EvsRetransPeriodMin),
332 gu::datetime::Period::max());
333 conf_.set(Conf::EvsKeepalivePeriod, gu::to_string(retrans_period_));
334 reset_timer(T_RETRANS);
335 return true;
336 }
337 else if (key == Conf::EvsCausalKeepalivePeriod)
338 {
339 causal_keepalive_period_ = check_range(
340 Conf::EvsCausalKeepalivePeriod,
341 gu::from_string<gu::datetime::Period>(val),
342 gu::datetime::Period(0),
343 gu::datetime::Period::max());
344 conf_.set(Conf::EvsCausalKeepalivePeriod,
345 gu::to_string(causal_keepalive_period_));
346 // no timer reset here, causal keepalives don't rely on timer
347 return true;
348 }
349 else if (key == Conf::EvsJoinRetransPeriod)
350 {
351 join_retrans_period_ = check_range(
352 Conf::EvsJoinRetransPeriod,
353 gu::from_string<gu::datetime::Period>(val),
354 gu::from_string<gu::datetime::Period>(Defaults::EvsRetransPeriodMin),
355 gu::datetime::Period::max());
356 conf_.set(Conf::EvsJoinRetransPeriod, gu::to_string(join_retrans_period_));
357 reset_timer(T_RETRANS);
358 return true;
359 }
360 else if (key == Conf::EvsInstallTimeout)
361 {
362 install_timeout_ = check_range(
363 Conf::EvsInstallTimeout,
364 gu::from_string<gu::datetime::Period>(val),
365 retrans_period_*2, inactive_timeout_ + 1);
366 conf_.set(Conf::EvsInstallTimeout, gu::to_string(install_timeout_));
367 reset_timer(T_INSTALL);
368 return true;
369 }
370 else if (key == Conf::EvsUseAggregate)
371 {
372 use_aggregate_ = gu::from_string<bool>(val);
373 conf_.set(Conf::EvsUseAggregate, gu::to_string(use_aggregate_));
374 return true;
375 }
376 else if (key == Conf::EvsDelayMargin)
377 {
378 delay_margin_ = gu::from_string<gu::datetime::Period>(val);
379 conf_.set(Conf::EvsDelayMargin, gu::to_string(delay_margin_));
380 return true;
381 }
382 else if (key == Conf::EvsDelayedKeepPeriod)
383 {
384 delayed_keep_period_ = gu::from_string<gu::datetime::Period>(val);
385 conf_.set(Conf::EvsDelayedKeepPeriod,
386 gu::to_string(delayed_keep_period_));
387 return true;
388 }
389 else if (key == Conf::EvsEvict)
390 {
391 if (val.size())
392 {
393 UUID uuid;
394 std::istringstream is(val);
395 is >> uuid;
396 log_info << "Evicting node " << uuid << " permanently from cluster";
397 evict(uuid);
398 if (state() == S_OPERATIONAL && current_view_.is_member(uuid) == true)
399 {
400 shift_to(S_GATHER, true);
401 }
402 }
403 else
404 {
405 Protolay::EvictList::const_iterator i, i_next;
406 for (i = evict_list().begin(); i != evict_list().end(); i = i_next)
407 {
408 i_next = i, ++i_next;
409 log_info << "unevicting " << Protolay::EvictList::key(i);
410 unevict(Protolay::EvictList::key(i));
411 }
412 }
413 return true;
414 }
415 else if (key == Conf::EvsAutoEvict)
416 {
417 auto_evict_ = gu::from_string<size_t>(val);
418 conf_.set(Conf::EvsAutoEvict, gu::to_string(auto_evict_));
419 return true;
420 }
421 else if (key == Conf::EvsViewForgetTimeout ||
422 key == Conf::EvsInactiveCheckPeriod)
423 {
424 gu_throw_error(EPERM) << "can't change value for '"
425 << key << "' during runtime";
426 }
427 return false;
428 }
429
430
handle_get_status(gu::Status & status) const431 void gcomm::evs::Proto::handle_get_status(gu::Status& status) const
432 {
433 status.insert("evs_state", to_string(state_));
434 status.insert("evs_repl_latency", safe_deliv_latency_.to_string());
435 std::string delayed_list_str;
436 for (DelayedList::const_iterator i(delayed_list_.begin());
437 i != delayed_list_.end(); ++i)
438 {
439 if (is_evicted(i->first) == false ||
440 current_view_.is_member(i->first) == true)
441 {
442 delayed_list_str += i->first.full_str()
443 + ":"
444 + i->second.addr()
445 + ":"
446 + gu::to_string(i->second.state_change_cnt());
447 delayed_list_str += ",";
448 }
449 }
450 // Strip trailing comma
451 if (delayed_list_str.empty() == false)
452 {
453 delayed_list_str.resize(delayed_list_str.size() - 1);
454 }
455 status.insert("evs_delayed", delayed_list_str);
456
457 std::string evict_list_str;
458 for (Protolay::EvictList::const_iterator i(evict_list().begin());
459 i != evict_list().end(); )
460 {
461 evict_list_str += EvictList::key(i).full_str();
462 if (++i != evict_list().end()) evict_list_str += ",";
463 }
464 status.insert("evs_evict_list", evict_list_str);
465
466 if (info_mask_ & I_STATISTICS)
467 {
468 status.insert("evs_safe_hs", hs_safe_.to_string());
469 status.insert("evs_causal_hs", hs_local_causal_.to_string());
470 status.insert("evs_outq_avg",
471 gu::to_string(std::fabs(double(send_queue_s_)/
472 double(n_send_queue_s_))));
473 status.insert("evs_sent_user",
474 gu::to_string(sent_msgs_[Message::EVS_T_USER]));
475 status.insert("evs_sent_delegate",
476 gu::to_string(sent_msgs_[Message::EVS_T_DELEGATE]));
477 status.insert("evs_sent_gap",
478 gu::to_string(sent_msgs_[Message::EVS_T_GAP]));
479 status.insert("evs_sent_join",
480 gu::to_string(sent_msgs_[Message::EVS_T_JOIN]));
481 status.insert("evs_sent_install",
482 gu::to_string(sent_msgs_[Message::EVS_T_INSTALL]));
483 status.insert("evs_sent_leave",
484 gu::to_string(sent_msgs_[Message::EVS_T_LEAVE]));
485 status.insert("evs_retransmitted", gu::to_string(retrans_msgs_));
486 status.insert("evs_recovered", gu::to_string(recovered_msgs_));
487 status.insert("evs_deliv_safe",
488 gu::to_string(delivered_msgs_[O_SAFE]));
489 }
490 }
491
492
operator <<(std::ostream & os,const Proto & p)493 std::ostream& gcomm::evs::operator<<(std::ostream& os, const Proto& p)
494 {
495 os << "evs::proto("
496 << p.self_string() << ", "
497 << p.to_string(p.state()) << ") {\n";
498 os << "current_view=" << p.current_view_ << ",\n";
499 os << "input_map=" << *p.input_map_ << ",\n";
500 os << "fifo_seq=" << p.fifo_seq_ << ",\n";
501 os << "last_sent=" << p.last_sent_ << ",\n";
502 os << "known:\n";
503 for (NodeMap::const_iterator i(p.known_.begin()); i != p.known_.end(); ++i)
504 {
505 os << NodeMap::key(i) << " at "
506 << p.get_address(NodeMap::key(i)) << "\n";
507 os << NodeMap::value(i) << "\n";
508 }
509 if (p.install_message_ != 0)
510 os << "install msg=" << *p.install_message_ << "\n";
511 os << " }";
512 return os;
513 }
514
stats() const515 std::string gcomm::evs::Proto::stats() const
516 {
517 std::ostringstream os;
518 os << "\n\tnodes " << current_view_.members().size();
519 os << "\n\tagreed deliv hist {" << hs_agreed_ << "} ";
520 os << "\n\tsafe deliv hist {" << hs_safe_ << "} ";
521 os << "\n\tcaus deliv hist {" << hs_local_causal_ << "} ";
522 os << "\n\toutq avg " << double(send_queue_s_)/double(n_send_queue_s_);
523 os << "\n\tsent {";
524 std::copy(sent_msgs_.begin(), sent_msgs_.end(),
525 std::ostream_iterator<long long int>(os, ","));
526 os << "}\n\tsent per sec {";
527 const double norm(double(gu::datetime::Date::monotonic().get_utc()
528 - last_stats_report_.get_utc())/gu::datetime::Sec);
529 std::vector<double> result(7, norm);
530 std::transform(sent_msgs_.begin(), sent_msgs_.end(),
531 result.begin(), result.begin(), std::divides<double>());
532 std::copy(result.begin(), result.end(),
533 std::ostream_iterator<double>(os, ","));
534 os << "}\n\trecvd { ";
535 std::copy(recvd_msgs_.begin(), recvd_msgs_.end(),
536 std::ostream_iterator<long long int>(os, ","));
537 os << "}\n\trecvd per sec {";
538 std::fill(result.begin(), result.end(), norm);
539 std::transform(recvd_msgs_.begin(), recvd_msgs_.end(),
540 result.begin(), result.begin(), std::divides<double>());
541 std::copy(result.begin(), result.end(),
542 std::ostream_iterator<double>(os, ","));
543 os << "}\n\tretransmitted " << retrans_msgs_ << " ";
544 os << "\n\trecovered " << recovered_msgs_;
545 os << "\n\tdelivered {";
546 std::copy(delivered_msgs_.begin(), delivered_msgs_.end(),
547 std::ostream_iterator<long long int>(os, ", "));
548 os << "}\n\teff(delivered/sent) " <<
549 double(accumulate(delivered_msgs_.begin() + 1,
550 delivered_msgs_.begin() + O_SAFE + 1, 0))
551 /double(accumulate(sent_msgs_.begin(), sent_msgs_.end(), 0));
552 return os.str();
553 }
554
reset_stats()555 void gcomm::evs::Proto::reset_stats()
556 {
557 hs_agreed_.clear();
558 hs_safe_.clear();
559 hs_local_causal_.clear();
560 safe_deliv_latency_.clear();
561 send_queue_s_ = 0;
562 n_send_queue_s_ = 0;
563 last_stats_report_ = gu::datetime::Date::monotonic();
564 }
565
566
is_msg_from_previous_view(const Message & msg)567 bool gcomm::evs::Proto::is_msg_from_previous_view(const Message& msg)
568 {
569 ViewList::const_iterator i;
570 if ((i = previous_views_.find(msg.source_view_id()))
571 != previous_views_.end())
572 {
573 evs_log_debug(D_FOREIGN_MSGS) << " message " << msg
574 << " from previous view " << i->first;
575 return true;
576 }
577
578 // If node is in current view, check message source view seq, if it is
579 // smaller than current view seq then the message is also from some
580 // previous (but unknown to us) view
581 NodeList::const_iterator ni(current_view_.members().find(msg.source()));
582 if (ni != current_view_.members().end())
583 {
584 if (msg.source_view_id().seq() <
585 current_view_.id().seq())
586 {
587 log_warn << "stale message from unknown origin " << msg;
588 return true;
589 }
590 }
591
592 return false;
593 }
594
595
handle_inactivity_timer()596 void gcomm::evs::Proto::handle_inactivity_timer()
597 {
598 gu_trace(check_inactive());
599 gu_trace(cleanup_views());
600 gu_trace(cleanup_evicted());
601 }
602
603
handle_retrans_timer()604 void gcomm::evs::Proto::handle_retrans_timer()
605 {
606 evs_log_debug(D_TIMERS) << "retrans timer";
607 if (state() == S_GATHER)
608 {
609 if (install_message_ != 0)
610 {
611 // Retransmit install message if representative and all commit
612 // gaps have not been received yet.
613 if (is_all_committed() == false &&
614 install_message_->source() == uuid())
615 {
616 evs_log_debug(D_INSTALL_MSGS) << "retrans install";
617 gu::Buffer buf;
618 install_message_->set_flags(
619 install_message_->flags() | Message::F_RETRANS);
620 (void)serialize(*install_message_, buf);
621 Datagram dg(buf);
622 // Must not be sent as delegate, newly joining node
623 // will filter them out in handle_msg().
624 gu_trace(send_down(dg, ProtoDownMeta()));
625 }
626 evs_log_debug(D_GAP_MSGS) << "resend commit gap";
627 // Resend commit gap
628 gu_trace(send_gap(EVS_CALLER, UUID::nil(),
629 install_message_->install_view_id(),
630 Range(), true));
631 }
632 else
633 {
634 evs_log_debug(D_JOIN_MSGS) << "retrans join";
635 gu_trace(send_join(true));
636 }
637 }
638 else if (state() == S_INSTALL)
639 {
640 gcomm_assert(install_message_ != 0);
641 gu_trace(send_gap(EVS_CALLER, UUID::nil(),
642 install_message_->install_view_id(),
643 Range(), true));
644 gu_trace(send_gap(EVS_CALLER, UUID::nil(),
645 install_message_->install_view_id(),
646 Range()));
647 }
648 else if (state() == S_OPERATIONAL)
649 {
650 const seqno_t prev_last_sent(last_sent_);
651 evs_log_debug(D_TIMERS) << "sending keepalive, last_sent=" << last_sent_;
652 Datagram dg;
653 gu_trace((void)send_user(dg, 0xff, O_DROP, -1, -1));
654 if (prev_last_sent == last_sent_)
655 {
656 log_warn << "could not send keepalive";
657 }
658 }
659 else if (state() == S_LEAVING)
660 {
661 evs_log_debug(D_TIMERS) << "send leave timer";
662 send_leave(false);
663 retrans_missing();
664 }
665 }
666
isolate(gu::datetime::Period period)667 void gcomm::evs::Proto::isolate(gu::datetime::Period period)
668 {
669 isolation_end_ = gu::datetime::Date::monotonic() + period;
670 }
671
672
handle_install_timer()673 void gcomm::evs::Proto::handle_install_timer()
674 {
675 gcomm_assert(state() == S_GATHER || state() == S_INSTALL);
676 log_warn << self_string() << " install timer expired";
677
678 bool is_cons(consensus_.is_consensus());
679 bool is_repr(is_representative(uuid()));
680 evs_log_info(I_STATE) << "before inspection:";
681 evs_log_info(I_STATE) << "consensus: " << is_cons;
682 evs_log_info(I_STATE) << "repr : " << is_repr;
683 evs_log_info(I_STATE) << "state dump for diagnosis:";
684 std::cerr << *this << std::endl;
685
686 if (install_timeout_count_ < max_install_timeouts_ )
687 {
688 // before reaching max_install_timeouts, declare only inconsistent
689 // nodes as inactive
690 for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
691 {
692 const UUID& node_uuid(NodeMap::key(i));
693 const Node& node(NodeMap::value(i));
694 if (node_uuid != uuid() &&
695 (node.join_message() == 0 ||
696 consensus_.is_consistent(*node.join_message()) == false))
697 {
698 evs_log_info(I_STATE)
699 << " setting source " << NodeMap::key(i)
700 << " as inactive due to expired install timer";
701 set_inactive(NodeMap::key(i));
702 }
703 }
704 }
705 else if (install_timeout_count_ == max_install_timeouts_)
706 {
707 // max install timeouts reached, declare all other nodes
708 // as inactive
709 for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
710 {
711 if (NodeMap::key(i) != uuid())
712 {
713 evs_log_info(I_STATE)
714 << " setting source " << NodeMap::key(i)
715 << " as inactive due to expired install timer";
716 set_inactive(NodeMap::key(i));
717 }
718 }
719 log_info << "max install timeouts reached, will isolate node "
720 << "for " << suspect_timeout_ + inactive_timeout_;
721 isolate(suspect_timeout_ + inactive_timeout_);
722 }
723 else if (install_timeout_count_ > max_install_timeouts_)
724 {
725 log_info << "going to give up, state dump for diagnosis:";
726 std::cerr << *this << std::endl;
727 gu_throw_fatal << self_string()
728 << " failed to form singleton view after exceeding "
729 << "max_install_timeouts " << max_install_timeouts_
730 << ", giving up";
731 }
732
733
734 if (install_message_ != 0)
735 {
736 for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
737 {
738 if (NodeMap::value(i).committed() == false)
739 {
740 log_info << self_string() << " node " << NodeMap::key(i)
741 << " failed to commit for install message, "
742 << "declaring inactive";
743 if (NodeMap::key(i) != uuid())
744 {
745 set_inactive(NodeMap::key(i));
746 }
747 }
748 }
749 }
750 else
751 {
752 log_info << "no install message received";
753 }
754
755 shift_to(S_GATHER, true);
756 is_cons = consensus_.is_consensus();
757 is_repr = is_representative(uuid());
758 evs_log_info(I_STATE) << "after inspection:";
759 evs_log_info(I_STATE) << "consensus: " << is_cons;
760 evs_log_info(I_STATE) << "repr : " << is_repr;
761 if (is_cons == true && is_repr == true)
762 {
763 send_install(EVS_CALLER);
764 }
765 install_timeout_count_++;
766 }
767
handle_stats_timer()768 void gcomm::evs::Proto::handle_stats_timer()
769 {
770 reset_stats();
771 }
772
773
774
775 class TimerSelectOp
776 {
777 public:
TimerSelectOp(const gcomm::evs::Proto::Timer t_)778 TimerSelectOp(const gcomm::evs::Proto::Timer t_) : t(t_) { }
operator ()(const gcomm::evs::Proto::TimerList::value_type & vt) const779 bool operator()(const gcomm::evs::Proto::TimerList::value_type& vt) const
780 {
781 return (gcomm::evs::Proto::TimerList::value(vt) == t);
782 }
783 private:
784 gcomm::evs::Proto::Timer const t;
785 };
786
787
next_expiration(const Timer t) const788 gu::datetime::Date gcomm::evs::Proto::next_expiration(const Timer t) const
789 {
790 gcomm_assert(state() != S_CLOSED);
791 gu::datetime::Date now(gu::datetime::Date::monotonic());
792 switch (t)
793 {
794 case T_INACTIVITY:
795 return (now + inactive_check_period_);
796 case T_RETRANS:
797 switch (state())
798 {
799 case S_OPERATIONAL:
800 case S_LEAVING:
801 return (now + retrans_period_);
802 case S_GATHER:
803 case S_INSTALL:
804 return (now + join_retrans_period_);
805 default:
806 gu_throw_fatal;
807 }
808 case T_INSTALL:
809
810 switch (state())
811 {
812 case S_GATHER:
813 case S_INSTALL:
814 return (now + install_timeout_);
815 default:
816 return gu::datetime::Date::max();
817 }
818 case T_STATS:
819 return (now + stats_report_period_);
820 }
821 gu_throw_fatal;
822 }
823
824
timer_list_erase_by_type(gcomm::evs::Proto::TimerList & timer_list,gcomm::evs::Proto::Timer timer)825 void timer_list_erase_by_type(gcomm::evs::Proto::TimerList& timer_list,
826 gcomm::evs::Proto::Timer timer)
827 {
828 gcomm::evs::Proto::TimerList::iterator i, i_next;
829 for (i = timer_list.begin(); i != timer_list.end(); i = i_next)
830 {
831 i_next = i, ++i_next;
832 if (gcomm::evs::Proto::TimerList::value(i) == timer)
833 {
834 timer_list.erase(i);
835 }
836 }
837 }
838
reset_timer(Timer t)839 void gcomm::evs::Proto::reset_timer(Timer t)
840 {
841 timer_list_erase_by_type(timers_, t);
842 timers_.insert(std::make_pair(next_expiration(t), t));
843 }
844
cancel_timer(Timer t)845 void gcomm::evs::Proto::cancel_timer(Timer t)
846 {
847 timer_list_erase_by_type(timers_, t);
848 }
849
handle_timers()850 gu::datetime::Date gcomm::evs::Proto::handle_timers()
851 {
852 gu::datetime::Date now(gu::datetime::Date::monotonic());
853
854 while (timers_.empty() == false &&
855 TimerList::key(timers_.begin()) <= now)
856 {
857 Timer t(TimerList::value(timers_.begin()));
858 timers_.erase(timers_.begin());
859 switch (t)
860 {
861 case T_INACTIVITY:
862 handle_inactivity_timer();
863 break;
864 case T_RETRANS:
865 handle_retrans_timer();
866 break;
867 case T_INSTALL:
868 handle_install_timer();
869 break;
870 case T_STATS:
871 handle_stats_timer();
872 break;
873 }
874 if (state() == S_CLOSED)
875 {
876 return gu::datetime::Date::max();
877 }
878 reset_timer(t);
879 }
880
881 if (timers_.empty() == true)
882 {
883 evs_log_debug(D_TIMERS) << "no timers set";
884 return gu::datetime::Date::max();
885 }
886 return TimerList::key(timers_.begin());
887 }
888
889
check_inactive()890 void gcomm::evs::Proto::check_inactive()
891 {
892 const gu::datetime::Date now(gu::datetime::Date::monotonic());
893 if (last_inactive_check_ + inactive_check_period_*3 < now)
894 {
895 log_warn << "last inactive check more than " << inactive_check_period_*3
896 << " ago (" << (now - last_inactive_check_)
897 << "), skipping check";
898 last_inactive_check_ = now;
899 return;
900 }
901
902 NodeMap::value(self_i_).set_tstamp(gu::datetime::Date::monotonic());
903 std::for_each(known_.begin(), known_.end(), InspectNode());
904
905 bool has_inactive(false);
906 size_t n_suspected(0);
907 bool do_send_delayed_list(false);
908
909 // Iterate over known nodes and check inactive/suspected/delayed status
910 for (NodeMap::iterator i(known_.begin()); i != known_.end(); ++i)
911 {
912 if (i == self_i_) continue; // No need to check self
913
914 const UUID& node_uuid(NodeMap::key(i));
915 Node& node(NodeMap::value(i));
916 if (node_uuid != uuid() &&
917 (node.is_inactive() == true ||
918 node.is_suspected() == true ))
919 {
920 if (node.operational() == true && node.is_inactive() == true)
921 {
922 log_info << self_string() << " detected inactive node: " << node_uuid;
923 }
924 else if (node.is_suspected() == true && node.is_inactive() == false)
925 {
926 log_info << self_string() << " suspecting node: " << node_uuid;
927 }
928 if (node.is_inactive() == true)
929 {
930 set_inactive(node_uuid);
931 }
932 if (node.is_suspected() == true && node.operational() == true)
933 {
934 ++n_suspected;
935 if (node.join_message() == 0)
936 {
937 log_info << self_string()
938 << " suspected node without join message, declaring inactive";
939 set_inactive(node_uuid);
940 }
941 }
942 has_inactive = true;
943 }
944
945 DelayedList::iterator dli(delayed_list_.find(node_uuid));
946 if (auto_evict_ &&
947 node.seen_tstamp() + retrans_period_ + delay_margin_ <= now)
948 {
949 if (node.index() != Node::invalid_index)
950 {
951 // Delayed node in group, check input map state and request
952 // message recovery if necessary
953 Range range(input_map_->range(node.index()));
954 log_info << "delayed node: "
955 << node_uuid << ", requesting range "
956 << Range(range.lu(), last_sent_);
957 if (last_sent_ >= range.lu())
958 {
959 // Request missing message range from delayed node.
960 request_retrans(node_uuid, node_uuid,
961 Range(range.lu(), last_sent_));
962 }
963 }
964
965 if (dli == delayed_list_.end())
966 {
967 delayed_list_.insert(
968 std::make_pair(node_uuid,
969 DelayedEntry(get_address(node_uuid))));
970 }
971 else
972 {
973 dli->second.set_tstamp(now);
974 dli->second.set_state(DelayedEntry::S_DELAYED,
975 delayed_keep_period_, now);
976 evs_log_debug(D_STATE) << "set '" << dli->first <<
977 "' delayed state to S_DELAYED , cnt = " <<
978 dli->second.state_change_cnt();
979 // todo(dirlt): make threshold as a configurable variable ?
980 if (dli->second.state_change_cnt() > 0)
981 {
982 do_send_delayed_list = true;
983 }
984 }
985 }
986 else if (dli != delayed_list_.end())
987 {
988 const size_t prev_cnt(dli->second.state_change_cnt());
989 dli->second.set_state(DelayedEntry::S_OK,
990 delayed_keep_period_, now);
991 if (prev_cnt != dli->second.state_change_cnt())
992 {
993 dli->second.set_tstamp(now);
994 }
995 evs_log_debug(D_STATE) << "set '" << dli->first <<
996 "' delayed state to S_OK. prev_cnt = " << prev_cnt <<
997 ", cur_cnt = " << dli->second.state_change_cnt();
998 if (dli->second.state_change_cnt() > 0)
999 {
1000 do_send_delayed_list = true;
1001 }
1002 }
1003 }
1004
1005 // Clean up delayed list and evict list messages
1006 {
1007 DelayedList::iterator i, i_next;
1008 for (i = delayed_list_.begin(); i != delayed_list_.end(); i = i_next)
1009 {
1010 i_next = i, ++i_next;
1011 // State change count has decayed back to zero
1012 // or node is already evicted and not in the current view
1013 // anymore.
1014 if ((i->second.state_change_cnt() == 0 &&
1015 i->second.state() == DelayedEntry::S_OK) ||
1016 (is_evicted(i->first) == true &&
1017 current_view_.is_member(i->first) == false))
1018 {
1019 log_debug << "remove '" << i->first << "' from delayed_list";
1020 delayed_list_.erase(i);
1021 }
1022 }
1023 for (NodeMap::iterator i(known_.begin()); i != known_.end(); ++i)
1024 {
1025 Node& node(NodeMap::value(i));
1026 const DelayedListMessage* const elm(node.delayed_list_message());
1027 if (elm != 0 && elm->tstamp() + delayed_keep_period_ < now)
1028 {
1029 log_debug << "discarding expired elm from " << elm->source();
1030 node.set_delayed_list_message(0);
1031 }
1032 }
1033 }
1034
1035 if (current_view_.version() > 0 &&
1036 do_send_delayed_list == true && auto_evict_ > 0)
1037 {
1038 send_delayed_list();
1039 }
1040
1041 // All other nodes are under suspicion, set all others as inactive.
1042 // This will speed up recovery when this node has been isolated from
1043 // other group. Note that this should be done only if known size is
1044 // greater than 2 in order to avoid immediate split brain.
1045 if (known_.size() > 2 && n_suspected + 1 == known_.size())
1046 {
1047 for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
1048 {
1049 if (NodeMap::key(i) != uuid())
1050 {
1051 evs_log_info(I_STATE)
1052 << " setting source " << NodeMap::key(i)
1053 << " inactive (other nodes under suspicion)";
1054 set_inactive(NodeMap::key(i));
1055 }
1056 }
1057 }
1058
1059 if (has_inactive == true && state() == S_OPERATIONAL)
1060 {
1061 gu_trace(shift_to(S_GATHER, true));
1062 }
1063 else if (has_inactive == true &&
1064 state() == S_LEAVING &&
1065 n_operational() == 1)
1066 {
1067 gu_trace(shift_to(S_CLOSED));
1068 }
1069
1070 last_inactive_check_ = now;
1071
1072
1073 // Check if isolation period has ended
1074 if (isolation_end_ != gu::datetime::Date::zero() &&
1075 isolation_end_ <= now)
1076 {
1077 log_info << "ending isolation";
1078 isolation_end_ = gu::datetime::Date::zero();
1079 }
1080 }
1081
1082
set_inactive(const UUID & node_uuid)1083 void gcomm::evs::Proto::set_inactive(const UUID& node_uuid)
1084 {
1085 NodeMap::iterator i;
1086 gcomm_assert(node_uuid != uuid());
1087 gu_trace(i = known_.find_checked(node_uuid));
1088 evs_log_debug(D_STATE) << "setting " << node_uuid << " inactive";
1089 Node& node(NodeMap::value(i));
1090 node.set_tstamp(gu::datetime::Date::zero());
1091 node.set_join_message(0);
1092 // node.set_leave_message(0);
1093 node.set_operational(false);
1094 }
1095
1096
is_inactive(const UUID & uuid) const1097 bool gcomm::evs::Proto::is_inactive(const UUID& uuid) const
1098 {
1099 NodeMap::const_iterator i;
1100 gu_trace(i = known_.find_checked(uuid));
1101 const Node& node(NodeMap::value(i));
1102 return (node.operational() == false);
1103 }
1104
cleanup_foreign(const InstallMessage & im)1105 void gcomm::evs::Proto::cleanup_foreign(const InstallMessage& im)
1106 {
1107 NodeMap::iterator i, i_next;
1108 for (i = known_.begin(); i != known_.end(); i = i_next)
1109 {
1110 const UUID& uuid(NodeMap::key(i));
1111 i_next = i, ++i_next;
1112 const MessageNodeList::const_iterator mni(im.node_list().find(uuid));
1113 if (mni == im.node_list().end() ||
1114 MessageNodeList::value(mni).operational() == false)
1115 {
1116 known_.erase(i);
1117 }
1118 }
1119 }
1120
cleanup_views()1121 void gcomm::evs::Proto::cleanup_views()
1122 {
1123 gu::datetime::Date now(gu::datetime::Date::monotonic());
1124
1125 ViewList::iterator i, i_next;
1126 for (i = previous_views_.begin(); i != previous_views_.end(); i = i_next)
1127 {
1128 i_next = i, ++i_next;
1129 if (i->second + view_forget_timeout_ <= now)
1130 {
1131 evs_log_debug(D_STATE) << " erasing view: " << i->first;
1132 previous_views_.erase(i);
1133 }
1134 }
1135 }
1136
cleanup_evicted()1137 void gcomm::evs::Proto::cleanup_evicted()
1138 {
1139 gu::datetime::Date now(gu::datetime::Date::monotonic());
1140 Protolay::EvictList::const_iterator i, i_next;
1141 for (i = evict_list().begin(); i != evict_list().end(); i = i_next)
1142 {
1143 i_next = i, ++i_next;
1144 if (Protolay::EvictList::value(i) + view_forget_timeout_ <= now)
1145 {
1146 log_info << "unevicting " << Protolay::EvictList::key(i);
1147 unevict(Protolay::EvictList::key(i));
1148 }
1149 }
1150 }
1151
n_operational() const1152 size_t gcomm::evs::Proto::n_operational() const
1153 {
1154 NodeMap::const_iterator i;
1155 size_t ret = 0;
1156 for (i = known_.begin(); i != known_.end(); ++i) {
1157 if (i->second.operational() == true)
1158 ret++;
1159 }
1160 return ret;
1161 }
1162
deliver_reg_view(const InstallMessage & im,const View & prev_view)1163 void gcomm::evs::Proto::deliver_reg_view(const InstallMessage& im,
1164 const View& prev_view)
1165 {
1166
1167 View view(im.version(), im.install_view_id());
1168 for (MessageNodeList::const_iterator i(im.node_list().begin());
1169 i != im.node_list().end(); ++i)
1170 {
1171 const UUID& uuid(MessageNodeList::key(i));
1172 const MessageNode& mn(MessageNodeList::value(i));
1173
1174 // 1) Operational nodes will be members of new view
1175 // 2) Operational nodes that were not present in previous
1176 // view are going also to joined set
1177 // 3) Leaving nodes go to left set
1178 // 4) All other nodes present in previous view but not in
1179 // member of left set are considered partitioned
1180 if (mn.operational() == true)
1181 {
1182 view.add_member(uuid, mn.segment());
1183 if (prev_view.is_member(uuid) == false)
1184 {
1185 view.add_joined(uuid, mn.segment());
1186 }
1187 }
1188 else if (mn.leaving() == true)
1189 {
1190 view.add_left(uuid, mn.segment());
1191 }
1192 else
1193 {
1194 // Partitioned set is constructed after this loop
1195 }
1196
1197 // If node has been evicted, it should have been added to
1198 // evicted list via JOIN messages.
1199 assert(mn.evicted() == false || is_evicted(uuid) == true);
1200 }
1201
1202 // Loop over previous view and add each node not in new view
1203 // member of left set as partitioned.
1204 for (NodeList::const_iterator i(prev_view.members().begin());
1205 i != prev_view.members().end(); ++i)
1206 {
1207 const UUID& uuid(NodeList::key(i));
1208 const gcomm::Node& mn(NodeList::value(i));
1209 if (view.is_member(uuid) == false &&
1210 view.is_leaving(uuid) == false)
1211 {
1212 view.add_partitioned(uuid, mn.segment());
1213 }
1214 }
1215
1216 evs_log_info(I_VIEWS) << "delivering view " << view;
1217
1218 // This node must be a member of the view it delivers and
1219 // view id UUID must be of one of the members.
1220 gcomm_assert(view.is_member(uuid()) == true);
1221 gcomm_assert(view.is_member(view.id().uuid()) == true)
1222 << "view id UUID " << view.id().uuid()
1223 << " not found from reg view members "
1224 << view.members()
1225 << " must abort to avoid possibility of two groups "
1226 << "with the same view id";
1227
1228 set_stable_view(view);
1229 ProtoUpMeta up_meta(UUID::nil(), ViewId(), &view);
1230 send_up(Datagram(), up_meta);
1231 }
1232
deliver_trans_view(const InstallMessage & im,const View & curr_view)1233 void gcomm::evs::Proto::deliver_trans_view(const InstallMessage& im,
1234 const View& curr_view)
1235 {
1236
1237 // Trans view is intersection of members in curr_view
1238 // and members going to be in the next view that come from
1239 // curr_view according to install message
1240
1241 View view(current_view_.version(),
1242 ViewId(V_TRANS,
1243 curr_view.id().uuid(),
1244 curr_view.id().seq()));
1245
1246 for (MessageNodeList::const_iterator i(im.node_list().begin());
1247 i != im.node_list().end(); ++i)
1248 {
1249 const UUID& uuid(MessageNodeList::key(i));
1250 const MessageNode& mn(MessageNodeList::value(i));
1251
1252 if (curr_view.id() == mn.view_id() &&
1253 curr_view.is_member(uuid) == true)
1254 {
1255 // 1) Operational nodes go to next view
1256 // 2) Leaving nodes go to left set
1257 // 3) All other nodes present in previous view but not in
1258 // member of left set are considered partitioned
1259 if (mn.operational() == true)
1260 {
1261 view.add_member(uuid, mn.segment());
1262 }
1263 else if (mn.leaving() == true)
1264 {
1265 view.add_left(uuid, mn.segment());
1266 }
1267 else
1268 {
1269 // Partitioned set is constructed after this loop
1270 }
1271 }
1272 }
1273
1274 // Loop over current view and add each node not in new view
1275 // member of left set as partitioned.
1276 for (NodeList::const_iterator i(curr_view.members().begin());
1277 i != curr_view.members().end(); ++i)
1278 {
1279 const UUID& uuid(NodeList::key(i));
1280 const gcomm::Node& mn(NodeList::value(i));
1281
1282 if (view.is_member(uuid) == false &&
1283 view.is_leaving(uuid) == false)
1284 {
1285 view.add_partitioned(uuid, mn.segment());
1286 }
1287 }
1288
1289 // This node must be a member of the view it delivers and
1290 // if the view is the last transitional, view must have
1291 // exactly one member and no-one in left set.
1292 gcomm_assert(view.is_member(uuid()) == true);
1293
1294 evs_log_info(I_VIEWS) << " delivering view " << view;
1295
1296 ProtoUpMeta up_meta(UUID::nil(), ViewId(), &view);
1297 gu_trace(send_up(Datagram(), up_meta));
1298 }
1299
1300
deliver_empty_view()1301 void gcomm::evs::Proto::deliver_empty_view()
1302 {
1303 View view(0, V_REG);
1304
1305 evs_log_info(I_VIEWS) << "delivering view " << view;
1306
1307 ProtoUpMeta up_meta(UUID::nil(), ViewId(), &view);
1308 send_up(Datagram(), up_meta);
1309 }
1310
1311
setall_committed(bool val)1312 void gcomm::evs::Proto::setall_committed(bool val)
1313 {
1314 for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
1315 {
1316 NodeMap::value(i).set_committed(val);
1317 }
1318 }
1319
1320 // Check if commit gaps from all known nodes found from install message have
1321 // been seen.
is_all_committed() const1322 bool gcomm::evs::Proto::is_all_committed() const
1323 {
1324 gcomm_assert(install_message_ != 0);
1325 for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
1326 {
1327 const UUID& uuid(NodeMap::key(i));
1328 const Node& inst(NodeMap::value(i));
1329 if (install_message_->node_list().find(uuid) !=
1330 install_message_->node_list().end() &&
1331 inst.operational() == true &&
1332 inst.committed() == false)
1333 {
1334 return false;
1335 }
1336 }
1337 return true;
1338 }
1339
setall_installed(bool val)1340 void gcomm::evs::Proto::setall_installed(bool val)
1341 {
1342 for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
1343 {
1344 NodeMap::value(i).set_installed(val);
1345 }
1346 }
1347
1348 // Check if gaps from new view from all known nodes found from install
1349 // message have been seen.
is_all_installed() const1350 bool gcomm::evs::Proto::is_all_installed() const
1351 {
1352 gcomm_assert(install_message_ != 0);
1353 for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
1354 {
1355 const UUID& uuid(NodeMap::key(i));
1356 const Node& inst(NodeMap::value(i));
1357 if (install_message_->node_list().find(uuid) !=
1358 install_message_->node_list().end() &&
1359 inst.operational() == true &&
1360 inst.installed() == false)
1361 {
1362 return false;
1363 }
1364 }
1365 return true;
1366 }
1367
cleanup_joins()1368 void gcomm::evs::Proto::cleanup_joins()
1369 {
1370 for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
1371 {
1372 NodeMap::value(i).set_join_message(0);
1373 }
1374 }
1375
is_representative(const UUID & uuid) const1376 bool gcomm::evs::Proto::is_representative(const UUID& uuid) const
1377 {
1378 for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
1379 {
1380 if (NodeMap::value(i).operational() == true &&
1381 NodeMap::value(i).is_inactive() == false)
1382 {
1383 assert(NodeMap::value(i).leave_message() == 0);
1384 if (NodeMap::value(i).leave_message() != 0)
1385 {
1386 log_warn << "operational node " << NodeMap::key(i)
1387 << " with leave message: " << NodeMap::value(i);
1388 continue;
1389 }
1390 return (uuid == NodeMap::key(i));
1391 }
1392 }
1393
1394 return false;
1395 }
1396
is_all_suspected(const UUID & uuid) const1397 bool gcomm::evs::Proto::is_all_suspected(const UUID& uuid) const
1398 {
1399 for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
1400 {
1401 const Node& node(NodeMap::value(i));
1402 if (node.operational() == true) {
1403 const JoinMessage* jm(node.join_message());
1404 if (!jm) return false;
1405 const MessageNodeList::const_iterator j(jm->node_list().find(uuid));
1406 if (!(j != jm->node_list().end() &&
1407 MessageNodeList::value(j).suspected()))
1408 return false;
1409 }
1410 }
1411 return true;
1412 }
1413
1414
1415
1416 /////////////////////////////////////////////////////////////////////////////
1417 // Message sending
1418 /////////////////////////////////////////////////////////////////////////////
1419
1420
1421
1422
is_flow_control(const seqno_t seq,const seqno_t win) const1423 bool gcomm::evs::Proto::is_flow_control(const seqno_t seq, const seqno_t win) const
1424 {
1425 gcomm_assert(seq != -1 && win != -1);
1426
1427 const seqno_t base(input_map_->safe_seq());
1428 if (seq > base + win)
1429 {
1430 return true;
1431 }
1432 return false;
1433 }
1434
request_user_msg_feedback(const gcomm::Datagram & dg) const1435 bool gcomm::evs::Proto::request_user_msg_feedback(const gcomm::Datagram& dg)
1436 const
1437 {
1438 // Request feedback from peers at least once per 128kB chunk. This will
1439 // force the nodes to complete their seqnos.
1440 if (bytes_since_request_user_msg_feedback_ + dg.len() >= (size_t(1) << 17))
1441 {
1442 evs_log_debug(D_USER_MSGS) << "bytes since request user msg feedback: "
1443 << bytes_since_request_user_msg_feedback_
1444 << " dg len: " << dg.len();
1445 return true;
1446 }
1447 return false;
1448 }
1449
send_user(Datagram & dg,uint8_t const user_type,Order const order,seqno_t const win,seqno_t const up_to_seqno,size_t const n_aggregated)1450 int gcomm::evs::Proto::send_user(Datagram& dg,
1451 uint8_t const user_type,
1452 Order const order,
1453 seqno_t const win,
1454 seqno_t const up_to_seqno,
1455 size_t const n_aggregated)
1456 {
1457 assert(state() == S_LEAVING ||
1458 state() == S_GATHER ||
1459 state() == S_OPERATIONAL);
1460 assert(dg.offset() == 0);
1461 assert(n_aggregated == 1 || output_.size() >= n_aggregated);
1462
1463 gcomm_assert(up_to_seqno == -1 || up_to_seqno >= last_sent_);
1464 gcomm_assert(up_to_seqno == -1 || win == -1);
1465
1466 int ret;
1467 const seqno_t seq(last_sent_ + 1);
1468
1469 if (win != -1 &&
1470 is_flow_control(seq, win) == true)
1471 {
1472 return EAGAIN;
1473 }
1474
1475 // seq_range max 0xff because of Message seq_range_ field limitation
1476 seqno_t seq_range(
1477 std::min(up_to_seqno == -1 ? 0 : up_to_seqno - seq,
1478 evs::seqno_t(0xff)));
1479 seqno_t last_msg_seq(seq + seq_range);
1480 uint8_t flags;
1481
1482 // If output queue wont contain messages after this patch,
1483 // up_to_seqno is given (msg completion) or flow contol would kick in
1484 // at next batch, don't set F_MSG_MORE. Also if the number of bytes
1485 // in send pipeline exceeds predefined value as reported by
1486 // request_user_msg_feedback(), the F_MSG_MORE will not get set.
1487 if (output_.size() <= n_aggregated ||
1488 up_to_seqno != -1 ||
1489 (win != -1 && (is_flow_control(last_msg_seq + 1, win) ||
1490 request_user_msg_feedback(dg))))
1491 {
1492 flags = 0;
1493 bytes_since_request_user_msg_feedback_ = 0;
1494 }
1495 else
1496 {
1497 flags = Message::F_MSG_MORE;
1498 bytes_since_request_user_msg_feedback_ += dg.len();
1499 }
1500 if (n_aggregated > 1)
1501 {
1502 flags |= Message::F_AGGREGATE;
1503 }
1504
1505 // Maximize seq range in the case next message batch won't be sent
1506 // immediately.
1507 if ((flags & Message::F_MSG_MORE) == 0 && up_to_seqno == -1)
1508 {
1509 seq_range = input_map_->max_hs() - seq;
1510 seq_range = std::max(static_cast<seqno_t>(0), seq_range);
1511 seq_range = std::min(static_cast<seqno_t>(0xff), seq_range);
1512 if (seq_range != 0)
1513 {
1514 log_debug << "adjusted seq range to: " << seq_range;
1515 last_msg_seq = seq + seq_range;
1516 }
1517 }
1518
1519 gcomm_assert(last_msg_seq >= seq && last_msg_seq - seq <= 0xff);
1520 gcomm_assert(seq_range >= 0 && seq_range <= 0xff);
1521
1522 UserMessage msg(version_,
1523 uuid(),
1524 current_view_.id(),
1525 seq,
1526 input_map_->aru_seq(),
1527 seq_range,
1528 order,
1529 ++fifo_seq_,
1530 user_type,
1531 flags);
1532
1533 // Insert first to input map to determine correct aru seq
1534 Range range;
1535 gu_trace(range = input_map_->insert(NodeMap::value(self_i_).index(),
1536 msg, dg));
1537
1538 gcomm_assert(range.hs() == last_msg_seq)
1539 << msg << " " << *input_map_ << " " << *this;
1540
1541 last_sent_ = last_msg_seq;
1542 assert(range.hs() == last_sent_);
1543
1544 update_im_safe_seq(NodeMap::value(self_i_).index(),
1545 input_map_->aru_seq());
1546
1547 msg.set_aru_seq(input_map_->aru_seq());
1548 evs_log_debug(D_USER_MSGS) << " sending " << msg;
1549 gu_trace(push_header(msg, dg));
1550 if ((ret = send_down(dg, ProtoDownMeta())) != 0)
1551 {
1552 log_debug << "send failed: " << strerror(ret);
1553 }
1554 gu_trace(pop_header(msg, dg));
1555 sent_msgs_[Message::EVS_T_USER]++;
1556
1557 if (delivering_ == false)
1558 {
1559 gu_trace(deliver());
1560 gu_trace(deliver_local());
1561 }
1562
1563 return 0;
1564 }
1565
aggregate_len() const1566 size_t gcomm::evs::Proto::aggregate_len() const
1567 {
1568 bool is_aggregate(false);
1569 size_t ret(0);
1570 AggregateMessage am;
1571 out_queue::const_iterator i(output_.begin());
1572 const Order ord(i->second.order());
1573 ret += i->first.len() + am.serial_size();
1574 for (++i; i != output_.end() && i->second.order() == ord; ++i)
1575 {
1576 if (ret + i->first.len() + am.serial_size() <= mtu())
1577 {
1578 ret += i->first.len() + am.serial_size();
1579 is_aggregate = true;
1580 }
1581 else
1582 {
1583 break;
1584 }
1585 }
1586 evs_log_debug(D_USER_MSGS) << "is aggregate " << is_aggregate << " ret " << ret;
1587 return (is_aggregate == true ? ret : 0);
1588 }
1589
send_user(const seqno_t win)1590 int gcomm::evs::Proto::send_user(const seqno_t win)
1591 {
1592 gcomm_assert(output_.empty() == false);
1593 gcomm_assert(state() == S_OPERATIONAL);
1594 gcomm_assert(win <= send_window_);
1595 int ret;
1596 size_t alen;
1597 if (use_aggregate_ == true && (alen = aggregate_len()) > 0)
1598 {
1599 // Messages can be aggregated into single message
1600 send_buf_.resize(alen);
1601 size_t offset(0);
1602 size_t n(0);
1603
1604 out_queue::const_iterator i(output_.begin());
1605 Order ord(i->second.order());
1606 while ((alen > 0 && i != output_.end()))
1607 {
1608 const Datagram& dg(i->first);
1609 const ProtoDownMeta dm(i->second);
1610 AggregateMessage am(0, dg.len(), dm.user_type());
1611 gcomm_assert(alen >= dg.len() + am.serial_size());
1612
1613 gu_trace(offset = am.serialize(&send_buf_[0],
1614 send_buf_.size(), offset));
1615 std::copy(dg.header() + dg.header_offset(),
1616 dg.header() + dg.header_size(),
1617 &send_buf_[0] + offset);
1618 offset += (dg.header_len());
1619 std::copy(dg.payload().begin(), dg.payload().end(),
1620 &send_buf_[0] + offset);
1621 offset += dg.payload().size();
1622 alen -= dg.len() + am.serial_size();
1623 ++n;
1624 ++i;
1625 }
1626 Datagram dg(gu::SharedBuffer(new gu::Buffer(send_buf_.begin(),
1627 send_buf_.end())));
1628 if ((ret = send_user(dg, 0xff, ord, win, -1, n)) == 0)
1629 {
1630 while (n-- > 0)
1631 {
1632 output_.pop_front();
1633 }
1634 }
1635 }
1636 else
1637 {
1638 std::pair<Datagram, ProtoDownMeta> wb(output_.front());
1639 if ((ret = send_user(wb.first,
1640 wb.second.user_type(),
1641 wb.second.order(),
1642 win,
1643 -1)) == 0)
1644 {
1645 output_.pop_front();
1646 }
1647 }
1648 return ret;
1649 }
1650
1651
complete_user(const seqno_t high_seq)1652 void gcomm::evs::Proto::complete_user(const seqno_t high_seq)
1653 {
1654 gcomm_assert(state() == S_OPERATIONAL || state() == S_GATHER);
1655
1656 evs_log_debug(D_USER_MSGS) << "completing seqno to " << high_seq;;
1657
1658 Datagram wb;
1659 int err;
1660 err = send_user(wb, 0xff, O_DROP, -1, high_seq);
1661 if (err != 0)
1662 {
1663 log_debug << "failed to send completing msg " << strerror(err)
1664 << " seq=" << high_seq << " send_window=" << send_window_
1665 << " last_sent=" << last_sent_;
1666 }
1667
1668 }
1669
send_delegate(Datagram & wb,const UUID & target)1670 int gcomm::evs::Proto::send_delegate(Datagram& wb, const UUID& target)
1671 {
1672 DelegateMessage dm(version_, uuid(), current_view_.id(),
1673 ++fifo_seq_);
1674 push_header(dm, wb);
1675 int ret = send_down(wb, ProtoDownMeta(target));
1676 pop_header(dm, wb);
1677 sent_msgs_[Message::EVS_T_DELEGATE]++;
1678 return ret;
1679 }
1680
gap_rate_limit(const UUID & target,const Range & range) const1681 bool gcomm::evs::Proto::gap_rate_limit(const UUID& target, const Range& range)
1682 const
1683 {
1684 NodeMap::const_iterator target_i(known_.find(target));
1685 // Sanity check: The target should always be in the set
1686 // of known nodes. If it is not, skip sending the gap message
1687 // in production.
1688 assert(target_i != known_.end());
1689 if (target_i == known_.end())
1690 {
1691 return true;
1692 }
1693 const Node& target_node(target_i->second);
1694 // Limit requesting ranges with the same highest seen within
1695 // 50msec period.
1696 gu::datetime::Date now(gu::datetime::Date::monotonic());
1697 if (now < target_node.last_requested_range_tstamp() + gu::datetime::MSec*100)
1698 {
1699 evs_log_debug(D_GAP_MSGS) << "Rate limiting gap: now " << now
1700 << " requested range tstamp: "
1701 << target_node.last_requested_range_tstamp()
1702 << " requested range: "
1703 << target_node.last_requested_range();
1704 return true;
1705 }
1706 return false;
1707 }
1708
send_gap(EVS_CALLER_ARG,const UUID & range_uuid,const ViewId & source_view_id,const Range range,const bool commit)1709 void gcomm::evs::Proto::send_gap(EVS_CALLER_ARG,
1710 const UUID& range_uuid,
1711 const ViewId& source_view_id,
1712 const Range range,
1713 const bool commit)
1714 {
1715 assert(range_uuid == UUID::nil());
1716 assert(range.is_empty());
1717 gcomm_assert((commit == false && source_view_id == current_view_.id())
1718 || install_message_ != 0);
1719 uint8_t flags(0);
1720 if (commit == true) flags |= Message::F_COMMIT;
1721
1722 GapMessage gm(version_,
1723 uuid(),
1724 source_view_id,
1725 (source_view_id == current_view_.id() ? last_sent_ :
1726 (commit == true ? install_message_->fifo_seq() : -1)),
1727 (source_view_id == current_view_.id() ?
1728 input_map_->aru_seq() : -1),
1729 ++fifo_seq_,
1730 range_uuid,
1731 range,
1732 flags);
1733
1734 evs_log_debug(D_GAP_MSGS) << EVS_LOG_METHOD << gm;
1735 gu::Buffer buf;
1736 serialize(gm, buf);
1737 Datagram dg(buf);
1738 int err = send_down(dg, ProtoDownMeta(range_uuid));
1739 if (err != 0)
1740 {
1741 log_debug << "send failed: " << strerror(err);
1742 }
1743 sent_msgs_[Message::EVS_T_GAP]++;
1744 gu_trace(handle_gap(gm, self_i_));
1745 }
1746
1747
populate_node_list(MessageNodeList * node_list) const1748 void gcomm::evs::Proto::populate_node_list(MessageNodeList* node_list) const
1749 {
1750 for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
1751 {
1752 const UUID& node_uuid(NodeMap::key(i));
1753 const Node& node(NodeMap::value(i));
1754 MessageNode mnode(node.operational(), node.suspected(),
1755 is_evicted(node_uuid));
1756 if (node_uuid != uuid())
1757 {
1758 const JoinMessage* jm(node.join_message());
1759 const LeaveMessage* lm(node.leave_message());
1760
1761 //
1762 if (jm != 0)
1763 {
1764 const ViewId& nsv(jm->source_view_id());
1765 const MessageNode& mn(MessageNodeList::value(jm->node_list().find_checked(node_uuid)));
1766 mnode = MessageNode(node.operational(),
1767 node.is_suspected(),
1768 node.segment(),
1769 is_evicted(node_uuid),
1770 -1,
1771 jm->source_view_id(),
1772 (nsv == current_view_.id() ?
1773 input_map_->safe_seq(node.index()) :
1774 mn.safe_seq()),
1775 (nsv == current_view_.id() ?
1776 input_map_->range(node.index()) :
1777 mn.im_range()));
1778 }
1779 else if (lm != 0)
1780 {
1781 const ViewId& nsv(lm->source_view_id());
1782 mnode = MessageNode(node.operational(),
1783 node.is_suspected(),
1784 node.segment(),
1785 is_evicted(node_uuid),
1786 lm->seq(),
1787 nsv,
1788 (nsv == current_view_.id() ?
1789 input_map_->safe_seq(node.index()) :
1790 -1),
1791 (nsv == current_view_.id() ?
1792 input_map_->range(node.index()) :
1793 Range()));
1794 }
1795 else if (current_view_.is_member(node_uuid) == true)
1796 {
1797 mnode = MessageNode(node.operational(),
1798 node.is_suspected(),
1799 node.segment(),
1800 is_evicted(node_uuid),
1801 -1,
1802 current_view_.id(),
1803 input_map_->safe_seq(node.index()),
1804 input_map_->range(node.index()));
1805 }
1806 }
1807 else
1808 {
1809 mnode = MessageNode(true,
1810 false,
1811 node.segment(),
1812 is_evicted(node_uuid),
1813 -1,
1814 current_view_.id(),
1815 input_map_->safe_seq(node.index()),
1816 input_map_->range(node.index()));
1817 }
1818 gu_trace((void)node_list->insert_unique(std::make_pair(node_uuid, mnode)));
1819 }
1820
1821 // Iterate over evicted_list and add evicted nodes not yet in node list.
1822 for (Protolay::EvictList::const_iterator i(evict_list().begin());
1823 i != evict_list().end(); ++i)
1824 {
1825 if (node_list->find(Protolay::EvictList::key(i)) == node_list->end())
1826 {
1827 // default arguments are evil.
1828 MessageNode mnode(false, false, 0, true);
1829 gu_trace((void)node_list->insert_unique(
1830 std::make_pair(Protolay::EvictList::key(i), mnode)));
1831 }
1832 }
1833
1834 evs_log_debug(D_CONSENSUS) << "populate node list:\n" << *node_list;
1835 }
1836
create_join()1837 const gcomm::evs::JoinMessage& gcomm::evs::Proto::create_join()
1838 {
1839
1840 MessageNodeList node_list;
1841
1842 gu_trace(populate_node_list(&node_list));
1843 JoinMessage jm(version_,
1844 uuid(),
1845 current_view_.id(),
1846 input_map_->safe_seq(),
1847 input_map_->aru_seq(),
1848 ++fifo_seq_,
1849 node_list);
1850 NodeMap::value(self_i_).set_join_message(&jm);
1851
1852 evs_log_debug(D_JOIN_MSGS) << " created join message " << jm;
1853
1854 return *NodeMap::value(self_i_).join_message();
1855 }
1856
1857
set_join(const JoinMessage & jm,const UUID & source)1858 void gcomm::evs::Proto::set_join(const JoinMessage& jm, const UUID& source)
1859 {
1860 NodeMap::iterator i;
1861 gu_trace(i = known_.find_checked(source));
1862 NodeMap::value(i).set_join_message(&jm);;
1863 }
1864
1865
set_leave(const LeaveMessage & lm,const UUID & source)1866 void gcomm::evs::Proto::set_leave(const LeaveMessage& lm, const UUID& source)
1867 {
1868 NodeMap::iterator i;
1869 gu_trace(i = known_.find_checked(source));
1870 Node& inst(NodeMap::value(i));
1871
1872 if (inst.leave_message())
1873 {
1874 evs_log_debug(D_LEAVE_MSGS) << "Duplicate leave:\told: "
1875 << *inst.leave_message()
1876 << "\tnew: " << lm;
1877 }
1878 else
1879 {
1880 inst.set_leave_message(&lm);
1881 }
1882 }
1883
1884
send_join(bool handle)1885 void gcomm::evs::Proto::send_join(bool handle)
1886 {
1887 assert(output_.empty() == true);
1888
1889 JoinMessage jm(create_join());
1890
1891 gu::Buffer buf;
1892 serialize(jm, buf);
1893 Datagram dg(buf);
1894 int err = send_down(dg, ProtoDownMeta());
1895
1896 if (err != 0)
1897 {
1898 log_debug << "send failed: " << strerror(err);
1899 }
1900 else
1901 {
1902 last_sent_join_tstamp_ = gu::datetime::Date::monotonic();
1903 }
1904 sent_msgs_[Message::EVS_T_JOIN]++;
1905 if (handle == true)
1906 {
1907 handle_join(jm, self_i_);
1908 }
1909 }
1910
1911
send_leave(bool handle)1912 void gcomm::evs::Proto::send_leave(bool handle)
1913 {
1914 gcomm_assert(state() == S_LEAVING);
1915
1916 // If no messages have been sent, generate one dummy to
1917 // trigger message acknowledgement mechanism
1918 if (last_sent_ == -1 && output_.empty() == true)
1919 {
1920 Datagram wb;
1921 gu_trace(send_user(wb, 0xff, O_DROP, -1, -1));
1922 }
1923
1924 /* Move all pending messages from output to input map */
1925 while (output_.empty() == false)
1926 {
1927 std::pair<Datagram, ProtoDownMeta> wb = output_.front();
1928 if (send_user(wb.first,
1929 wb.second.user_type(),
1930 wb.second.order(),
1931 -1, -1) != 0)
1932 {
1933 gu_throw_fatal << "send_user() failed";
1934 }
1935 output_.pop_front();
1936 }
1937
1938
1939 LeaveMessage lm(version_,
1940 uuid(),
1941 current_view_.id(),
1942 last_sent_,
1943 input_map_->aru_seq(),
1944 ++fifo_seq_);
1945
1946 evs_log_debug(D_LEAVE_MSGS) << "sending leave msg " << lm;
1947
1948 gu::Buffer buf;
1949 serialize(lm, buf);
1950 Datagram dg(buf);
1951 int err = send_down(dg, ProtoDownMeta());
1952 if (err != 0)
1953 {
1954 log_debug << "send failed " << strerror(err);
1955 }
1956
1957 sent_msgs_[Message::EVS_T_LEAVE]++;
1958
1959 if (handle == true)
1960 {
1961 handle_leave(lm, self_i_);
1962 }
1963 }
1964
1965
1966 struct ViewIdCmp
1967 {
operator ()ViewIdCmp1968 bool operator()(const gcomm::evs::NodeMap::value_type& a,
1969 const gcomm::evs::NodeMap::value_type& b) const
1970 {
1971 using gcomm::evs::NodeMap;
1972 gcomm_assert(NodeMap::value(a).join_message() != 0 &&
1973 NodeMap::value(b).join_message() != 0);
1974 return (NodeMap::value(a).join_message()->source_view_id().seq() <
1975 NodeMap::value(b).join_message()->source_view_id().seq());
1976
1977 }
1978 };
1979
1980
1981 struct ProtoVerCmp
1982 {
operator ()ProtoVerCmp1983 bool operator()(const gcomm::evs::NodeMap::value_type& a,
1984 const gcomm::evs::NodeMap::value_type& b) const
1985 {
1986 using gcomm::evs::NodeMap;
1987 gcomm_assert(NodeMap::value(a).join_message() != 0 &&
1988 NodeMap::value(b).join_message() != 0);
1989 return (NodeMap::value(a).join_message()->version() <
1990 NodeMap::value(b).join_message()->version());
1991
1992 }
1993 };
1994
send_install(EVS_CALLER_ARG)1995 void gcomm::evs::Proto::send_install(EVS_CALLER_ARG)
1996 {
1997 gcomm_assert(consensus_.is_consensus() == true &&
1998 is_representative(uuid()) == true) << *this;
1999
2000 // Select list of operational nodes from known
2001 NodeMap oper_list;
2002 for_each(known_.begin(), known_.end(), OperationalSelect(oper_list));
2003 NodeMap::const_iterator max_node =
2004 max_element(oper_list.begin(), oper_list.end(), ViewIdCmp());
2005
2006 // Compute maximum known view id seq
2007 max_view_id_seq_ =
2008 std::max(max_view_id_seq_,
2009 NodeMap::value(max_node).join_message()->source_view_id().seq());
2010
2011 // Compute highest commonly supported protocol version.
2012 // Oper_list is non-empty, join message existence is asserted.
2013 const int version(
2014 NodeMap::value(
2015 std::min_element(oper_list.begin(), oper_list.end(),
2016 ProtoVerCmp())).join_message()->version());
2017
2018 MessageNodeList node_list;
2019 populate_node_list(&node_list);
2020
2021 InstallMessage imsg(version,
2022 uuid(),
2023 current_view_.id(),
2024 ViewId(V_REG, uuid(), max_view_id_seq_ + attempt_seq_),
2025 input_map_->safe_seq(),
2026 input_map_->aru_seq(),
2027 ++fifo_seq_,
2028 node_list);
2029 ++attempt_seq_;
2030 evs_log_debug(D_INSTALL_MSGS) << EVS_LOG_METHOD << imsg;
2031 evs_log_info(I_STATE) << "sending install message" << imsg;
2032 gcomm_assert(consensus_.is_consistent(imsg));
2033
2034 gu::Buffer buf;
2035 serialize(imsg, buf);
2036 Datagram dg(buf);
2037 int err = send_down(dg, ProtoDownMeta());
2038 if (err != 0)
2039 {
2040 log_debug << "send failed: " << strerror(err);
2041 }
2042
2043 sent_msgs_[Message::EVS_T_INSTALL]++;
2044 handle_install(imsg, self_i_);
2045 }
2046
2047
send_delayed_list()2048 void gcomm::evs::Proto::send_delayed_list()
2049 {
2050 DelayedListMessage elm(version_, uuid(), current_view_.id(), ++fifo_seq_);
2051 for (DelayedList::const_iterator i(delayed_list_.begin());
2052 i != delayed_list_.end(); ++i)
2053 {
2054 elm.add(i->first, i->second.state_change_cnt());
2055 }
2056 gu::Buffer buf;
2057 serialize(elm, buf);
2058 Datagram dg(buf);
2059 (void)send_down(dg, ProtoDownMeta());
2060 handle_delayed_list(elm, self_i_);
2061 }
2062
resend(const UUID & gap_source,const Range range)2063 void gcomm::evs::Proto::resend(const UUID& gap_source, const Range range)
2064 {
2065 gcomm_assert(gap_source != uuid());
2066 gcomm_assert(range.lu() <= range.hs()) <<
2067 "lu (" << range.lu() << ") > hs(" << range.hs() << ")";
2068
2069 if (range.lu() <= input_map_->safe_seq())
2070 {
2071 evs_log_debug(D_RETRANS) << self_string() << "lu (" << range.lu()
2072 << ") <= safe_seq("
2073 << input_map_->safe_seq()
2074 << "), can't recover message";
2075 return;
2076 }
2077
2078 evs_log_debug(D_RETRANS) << " retrans requested by "
2079 << gap_source
2080 << " "
2081 << range.lu() << " -> "
2082 << range.hs();
2083
2084 // All of the nodes have received all messages up to input_map_->safe_seq(),
2085 // therefore it does not make sense to retransmit anything below that.
2086 seqno_t seq(std::max(range.lu(), input_map_->safe_seq() + 1));
2087 evs_log_debug(D_RETRANS) << "retransmitting from " << seq;
2088 while (seq <= range.hs())
2089 {
2090 InputMap::iterator msg_i = input_map_->find(
2091 NodeMap::value(self_i_).index(), seq);
2092 if (msg_i == input_map_->end())
2093 {
2094 try
2095 {
2096 gu_trace(msg_i = input_map_->recover(
2097 NodeMap::value(self_i_).index(), seq));
2098 }
2099 catch (...)
2100 {
2101 evs_log_debug(D_RETRANS) << "could not recover message "
2102 << gap_source << ":" << seq;
2103 seq = seq + 1;
2104 continue;
2105 }
2106 }
2107
2108 const UserMessage& msg(InputMapMsgIndex::value(msg_i).msg());
2109 gcomm_assert(msg.source() == uuid());
2110 Datagram rb(InputMapMsgIndex::value(msg_i).rb());
2111 assert(rb.offset() == 0);
2112
2113 UserMessage um(msg.version(),
2114 msg.source(),
2115 msg.source_view_id(),
2116 msg.seq(),
2117 input_map_->aru_seq(),
2118 msg.seq_range(),
2119 msg.order(),
2120 msg.fifo_seq(),
2121 msg.user_type(),
2122 static_cast<uint8_t>(
2123 Message::F_RETRANS |
2124 (msg.flags() & Message::F_AGGREGATE)));
2125
2126 push_header(um, rb);
2127
2128 int err = send_down(rb, ProtoDownMeta(gap_source));
2129 if (err != 0)
2130 {
2131 log_debug << "send failed: " << strerror(err);
2132 break;
2133 }
2134 else
2135 {
2136 evs_log_debug(D_RETRANS) << "retransmitted " << um;
2137 }
2138 seq = seq + msg.seq_range() + 1;
2139 retrans_msgs_++;
2140 }
2141 }
2142
2143
recover(const UUID & gap_source,const UUID & range_uuid,const Range range)2144 void gcomm::evs::Proto::recover(const UUID& gap_source,
2145 const UUID& range_uuid,
2146 const Range range)
2147 {
2148 gcomm_assert(gap_source != uuid())
2149 << "gap_source (" << gap_source << ") == uuid() (" << uuid()
2150 << " state " << *this;
2151 gcomm_assert(range.lu() <= range.hs())
2152 << "lu (" << range.lu() << ") > hs (" << range.hs() << ")";
2153
2154 if (range.lu() <= input_map_->safe_seq())
2155 {
2156 evs_log_debug(D_RETRANS) << "lu (" << range.lu()
2157 << ") <= safe_seq(" << input_map_->safe_seq()
2158 << "), can't recover message";
2159 return;
2160 }
2161
2162 const Node& range_node(NodeMap::value(known_.find_checked(range_uuid)));
2163 const Range im_range(input_map_->range(range_node.index()));
2164
2165 evs_log_debug(D_RETRANS) << " recovering message from "
2166 << range_uuid
2167 << " requested by "
2168 << gap_source
2169 << " requested range " << range
2170 << " available " << im_range;
2171
2172 // All of the nodes have received all messages up to input_map_->safe_seq(),
2173 // therefore it does not make sense to retransmit anything below that.
2174 seqno_t seq(std::max(range.lu(), input_map_->safe_seq() + 1));
2175 evs_log_debug(D_RETRANS) << "recovering from " << seq;
2176 size_t n_recovered(0);
2177 while (seq <= range.hs() && seq <= im_range.hs())
2178 {
2179 InputMap::iterator msg_i = input_map_->find(range_node.index(), seq);
2180 if (msg_i == input_map_->end())
2181 {
2182 try
2183 {
2184 gu_trace(msg_i = input_map_->recover(range_node.index(), seq));
2185 }
2186 catch (...)
2187 {
2188 seq = seq + 1;
2189 continue;
2190 }
2191 }
2192
2193 const UserMessage& msg(InputMapMsgIndex::value(msg_i).msg());
2194 assert(msg.source() == range_uuid);
2195
2196 Datagram rb(InputMapMsgIndex::value(msg_i).rb());
2197 assert(rb.offset() == 0);
2198 UserMessage um(msg.version(),
2199 msg.source(),
2200 msg.source_view_id(),
2201 msg.seq(),
2202 msg.aru_seq(),
2203 msg.seq_range(),
2204 msg.order(),
2205 msg.fifo_seq(),
2206 msg.user_type(),
2207 static_cast<uint8_t>(
2208 Message::F_SOURCE |
2209 Message::F_RETRANS |
2210 (msg.flags() & Message::F_AGGREGATE)));
2211
2212 push_header(um, rb);
2213
2214 ++n_recovered;
2215 int err = send_delegate(rb, gap_source);
2216 if (err != 0)
2217 {
2218 log_debug << "send failed: " << strerror(err);
2219 break;
2220 }
2221 else
2222 {
2223 evs_log_debug(D_RETRANS) << "recover " << um;
2224 }
2225 seq = seq + msg.seq_range() + 1;
2226 recovered_msgs_++;
2227 }
2228 evs_log_debug(D_RETRANS) << "recovered: " << n_recovered;
2229 }
2230
2231
2232 class UUIDFixedPartCmp
2233 {
2234 public:
UUIDFixedPartCmp(const gcomm::UUID & uuid)2235 UUIDFixedPartCmp(const gcomm::UUID& uuid) : uuid_(uuid) { }
operator ()(const gcomm::evs::NodeMap::value_type & vt) const2236 bool operator()(const gcomm::evs::NodeMap::value_type& vt) const
2237 {
2238 return uuid_.fixed_part_matches(vt.first);
2239 }
2240 private:
2241 const gcomm::UUID& uuid_;
2242 };
2243
handle_foreign(const Message & msg)2244 void gcomm::evs::Proto::handle_foreign(const Message& msg)
2245 {
2246 // no need to handle foreign LEAVE message
2247 if (msg.type() == Message::EVS_T_LEAVE)
2248 {
2249 return;
2250 }
2251
2252 // Don't handle foreign messages in install phase.
2253 // This includes not only INSTALL state, but also
2254 // GATHER state after receiving install message.
2255 if (install_message_ != 0)
2256 {
2257 evs_log_debug(D_FOREIGN_MSGS)
2258 << " dropping foreign message from "
2259 << msg.source() << " in install state";
2260 return;
2261 }
2262
2263 if (is_msg_from_previous_view(msg) == true)
2264 {
2265 return;
2266 }
2267
2268 const UUID& source(msg.source());
2269
2270 if (source == UUID::nil())
2271 {
2272 log_warn << "Received message with nil source UUDI, dropping";
2273 return;
2274 }
2275
2276 NodeMap::iterator i;
2277 if ((i = std::find_if(known_.begin(), known_.end(), UUIDFixedPartCmp(source)))
2278 != known_.end())
2279 {
2280 // Keep the new incarnation out of the group until a new view has been
2281 // established.
2282 evs_log_debug(D_FOREIGN_MSGS)
2283 << "Dropping message from new incarnation of already known "
2284 "node in current view, old: " << i->first << " new: " << source;
2285 return;
2286 }
2287 evs_log_info(I_STATE) << " detected new message source "
2288 << source;
2289
2290 gu_trace(i = known_.insert_unique(
2291 std::make_pair(source, Node(*this))));
2292 assert(NodeMap::value(i).operational() == true);
2293
2294 if (state() == S_JOINING || state() == S_GATHER ||
2295 state() == S_OPERATIONAL)
2296 {
2297 evs_log_info(I_STATE)
2298 << " shift to GATHER due to foreign message from "
2299 << msg.source();
2300 gu_trace(shift_to(S_GATHER, false));
2301 // Reset install timer each time foreign message is seen to
2302 // synchronize install timers.
2303 reset_timer(T_INSTALL);
2304 }
2305
2306 // Set join message after shift to recovery, shift may clean up
2307 // join messages
2308 if (msg.type() == Message::EVS_T_JOIN)
2309 {
2310 set_join(static_cast<const JoinMessage&>(msg), msg.source());
2311 }
2312 send_join(true);
2313 }
2314
handle_msg(const Message & msg,const Datagram & rb,bool direct)2315 void gcomm::evs::Proto::handle_msg(const Message& msg,
2316 const Datagram& rb,
2317 bool direct)
2318 {
2319 assert(msg.type() <= Message::EVS_T_DELAYED_LIST);
2320 if (msg.type() > Message::EVS_T_DELAYED_LIST)
2321 {
2322 return;
2323 }
2324
2325 if (state() == S_CLOSED)
2326 {
2327 return;
2328 }
2329
2330 if (isolation_end_ != gu::datetime::Date::zero())
2331 {
2332 evs_log_debug(D_STATE) << " dropping message due to isolation";
2333 // Isolation period is on
2334 return;
2335 }
2336
2337 if (msg.source() == uuid())
2338 {
2339 evs_log_debug(D_FOREIGN_MSGS) << " dropping own message";
2340 return;
2341 }
2342
2343 if (msg.version() > GCOMM_PROTOCOL_MAX_VERSION)
2344 {
2345 log_info << "incompatible protocol version "
2346 << static_cast<int>(msg.version());
2347 return;
2348 }
2349
2350 gcomm_assert(msg.source() != UUID::nil());
2351
2352 // Figure out if the message is from known source
2353 NodeMap::iterator ii = known_.find(msg.source());
2354
2355 if (ii == known_.end())
2356 {
2357 gu_trace(handle_foreign(msg));
2358 return;
2359 }
2360
2361 Node& node(NodeMap::value(ii));
2362 if (direct == true)
2363 {
2364 node.set_seen_tstamp(gu::datetime::Date::monotonic());
2365 }
2366
2367 if (state() == S_LEAVING && msg.source_view_id() == current_view_.id())
2368 {
2369 // Allow messages in leaving state. This is needed for both
2370 // updating the join messages for retransmission and for handling
2371 // retransmitted messages.
2372 evs_log_debug(D_FOREIGN_MSGS) << "Allow message from current view "
2373 << "in leaving state" << msg;
2374 }
2375 else if (node.operational() == false &&
2376 node.leave_message() == 0 &&
2377 (msg.flags() & Message::F_RETRANS) == 0)
2378 {
2379 // We have set this node unoperational and there was
2380 // probably good reason to do so. Don't accept messages
2381 // from it before new view has been formed.
2382 // Exceptions:
2383 // - Node that is leaving
2384 // - Retransmitted messages.
2385
2386 // why we accept retransimted messages?
2387 // a node sends a message, some nodes(A) get it, but some(B) don't
2388 // then this node is non-operational(or unreachable)
2389 // so A need to send B the missing message(in envelope as delegate message)
2390 // otherwise the input map will not be consistent forever.
2391 // and user message in delegate message always comes with F_RETRANS flag.
2392 evs_log_debug(D_FOREIGN_MSGS)
2393 << " dropping message from unoperational source " << node;
2394 return;
2395 }
2396
2397 // Filter out non-fifo messages
2398 if (msg.fifo_seq() != -1 && (msg.flags() & Message::F_RETRANS) == 0)
2399 {
2400
2401 if (node.fifo_seq() >= msg.fifo_seq())
2402 {
2403 evs_log_debug(D_FOREIGN_MSGS)
2404 << "droppoing non-fifo message " << msg
2405 << " fifo seq " << node.fifo_seq();
2406 return;
2407 }
2408 else
2409 {
2410 node.set_fifo_seq(msg.fifo_seq());
2411 }
2412 }
2413
2414 // Accept non-membership messages only from current view
2415 // or from view to be installed
2416 if (msg.is_membership() == false &&
2417 msg.source_view_id() != current_view_.id() &&
2418 (install_message_ == 0 ||
2419 install_message_->install_view_id() != msg.source_view_id()))
2420 {
2421 // If source node seems to be operational but it has proceeded
2422 // into new view, mark it as unoperational in order to create
2423 // intermediate views before re-merge.
2424 if (node.installed() == true &&
2425 node.operational() == true &&
2426 is_msg_from_previous_view(msg) == false &&
2427 state() != S_LEAVING)
2428 {
2429 if (new_view_logged_ == false)
2430 {
2431 evs_log_info(I_STATE)
2432 << " detected new view from operational source "
2433 << msg.source() << ": "
2434 << msg.source_view_id();
2435 new_view_logged_ = true;
2436 }
2437 // Note: Commented out, this causes problems with
2438 // attempt_seq. Newly (remotely?) generated install message
2439 // followed by commit gap may cause undesired
2440 // node inactivation and shift to gather.
2441 //
2442 // set_inactive(msg.source());
2443 // gu_trace(shift_to(S_GATHER, true));
2444 }
2445 evs_log_debug(D_FOREIGN_MSGS)
2446 << "dropping non-membership message from foreign view";
2447 return;
2448 }
2449 else if (NodeMap::value(ii).index() == Node::invalid_index &&
2450 msg.source_view_id() == current_view_.id())
2451 {
2452 log_warn << "Message from node that claims to come from same view but is not in current view " << msg;
2453 assert(0);
2454 return;
2455 }
2456
2457 recvd_msgs_[msg.type()]++;
2458
2459 switch (msg.type())
2460 {
2461 case Message::EVS_T_USER:
2462 gu_trace(handle_user(static_cast<const UserMessage&>(msg), ii, rb));
2463 break;
2464 case Message::EVS_T_DELEGATE:
2465 gu_trace(handle_delegate(static_cast<const DelegateMessage&>(msg), ii, rb));
2466 break;
2467 case Message::EVS_T_GAP:
2468 gu_trace(handle_gap(static_cast<const GapMessage&>(msg), ii));
2469 break;
2470 case Message::EVS_T_JOIN:
2471 gu_trace(handle_join(static_cast<const JoinMessage&>(msg), ii));
2472 break;
2473 case Message::EVS_T_LEAVE:
2474 gu_trace(handle_leave(static_cast<const LeaveMessage&>(msg), ii));
2475 break;
2476 case Message::EVS_T_INSTALL:
2477 gu_trace(handle_install(static_cast<const InstallMessage&>(msg), ii));
2478 break;
2479 case Message::EVS_T_DELAYED_LIST:
2480 gu_trace(handle_delayed_list(
2481 static_cast<const DelayedListMessage&>(msg), ii));
2482 break;
2483 default:
2484 log_warn << "invalid message type " << msg.type();
2485 }
2486 }
2487
2488 ////////////////////////////////////////////////////////////////////////
2489 // Protolay interface
2490 ////////////////////////////////////////////////////////////////////////
2491
unserialize_message(const UUID & source,const Datagram & rb,Message * msg)2492 size_t gcomm::evs::Proto::unserialize_message(const UUID& source,
2493 const Datagram& rb,
2494 Message* msg)
2495 {
2496 size_t offset;
2497 const gu::byte_t* begin(gcomm::begin(rb));
2498 const size_t available(gcomm::available(rb));
2499 gu_trace(offset = msg->unserialize(begin,
2500 available,
2501 0));
2502 if ((msg->flags() & Message::F_SOURCE) == 0)
2503 {
2504 assert(source != UUID::nil());
2505 gcomm_assert(source != UUID::nil());
2506 msg->set_source(source);
2507 }
2508
2509 switch (msg->type())
2510 {
2511 case Message::EVS_T_NONE:
2512 gu_throw_fatal;
2513 break;
2514 case Message::EVS_T_USER:
2515 gu_trace(offset = static_cast<UserMessage&>(*msg).unserialize(
2516 begin, available, offset, true));
2517 break;
2518 case Message::EVS_T_DELEGATE:
2519 gu_trace(offset = static_cast<DelegateMessage&>(*msg).unserialize(
2520 begin, available, offset, true));
2521 break;
2522 case Message::EVS_T_GAP:
2523 gu_trace(offset = static_cast<GapMessage&>(*msg).unserialize(
2524 begin, available, offset, true));
2525 break;
2526 case Message::EVS_T_JOIN:
2527 gu_trace(offset = static_cast<JoinMessage&>(*msg).unserialize(
2528 begin, available, offset, true));
2529 break;
2530 case Message::EVS_T_INSTALL:
2531 gu_trace(offset = static_cast<InstallMessage&>(*msg).unserialize(
2532 begin, available, offset, true));
2533 break;
2534 case Message::EVS_T_LEAVE:
2535 gu_trace(offset = static_cast<LeaveMessage&>(*msg).unserialize(
2536 begin, available, offset, true));
2537 break;
2538 case Message::EVS_T_DELAYED_LIST:
2539 gu_trace(offset = static_cast<DelayedListMessage&>(*msg).unserialize(
2540 begin, available, offset, true));
2541 break;
2542 }
2543 return (offset + rb.offset());
2544 }
2545
handle_up(const void * cid,const Datagram & rb,const ProtoUpMeta & um)2546 void gcomm::evs::Proto::handle_up(const void* cid,
2547 const Datagram& rb,
2548 const ProtoUpMeta& um)
2549 {
2550
2551 Message msg;
2552
2553 if (state() == S_CLOSED || um.source() == uuid() || is_evicted(um.source()))
2554 {
2555 // Silent drop
2556 return;
2557 }
2558
2559 gcomm_assert(um.source() != UUID::nil());
2560
2561 try
2562 {
2563 size_t offset;
2564 gu_trace(offset = unserialize_message(um.source(), rb, &msg));
2565 handle_msg(msg, Datagram(rb, offset),
2566 (msg.flags() & Message::F_RETRANS) == 0);
2567 }
2568 catch (gu::Exception& e)
2569 {
2570 switch (e.get_errno())
2571 {
2572 case EPROTONOSUPPORT:
2573 log_warn << e.what();
2574 break;
2575
2576 case EINVAL:
2577 log_warn << "invalid message: " << msg;
2578 break;
2579
2580 default:
2581 log_fatal << "exception caused by message: " << msg;
2582 std::cerr << " state after handling message: " << *this;
2583 throw;
2584 }
2585 }
2586 }
2587
2588
handle_down(Datagram & wb,const ProtoDownMeta & dm)2589 int gcomm::evs::Proto::handle_down(Datagram& wb, const ProtoDownMeta& dm)
2590 {
2591 if (state() == S_GATHER || state() == S_INSTALL)
2592 {
2593 return EAGAIN;
2594 }
2595
2596 else if (state() != S_OPERATIONAL)
2597 {
2598 log_warn << "user message in state " << to_string(state());
2599 return ENOTCONN;
2600 }
2601
2602 if (dm.order() == O_LOCAL_CAUSAL)
2603 {
2604 gu::datetime::Date now(gu::datetime::Date::monotonic());
2605 if (causal_queue_.empty() == true &&
2606 last_sent_ == input_map_->safe_seq() &&
2607 causal_keepalive_period_ > gu::datetime::Period(0) &&
2608 last_causal_keepalive_ + causal_keepalive_period_ > now)
2609 {
2610
2611 assert(last_sent_ == input_map_->aru_seq());
2612 // Input map should either be empty (all messages
2613 // delivered) or the undelivered messages have higher
2614 // seqno than safe_seq. Even if the delivry is
2615 // done below if needed, this assertion should stay
2616 // to catch errors in logic elsewhere in the code.
2617 assert(input_map_->begin() == input_map_->end() ||
2618 input_map_->is_safe(input_map_->begin()) == false);
2619
2620
2621 if (input_map_->begin() != input_map_->end() &&
2622 input_map_->is_safe(input_map_->begin()) == true)
2623 {
2624 gu_trace(deliver());
2625 if (input_map_->begin() != input_map_->end() &&
2626 input_map_->is_safe(input_map_->begin()) == true)
2627 {
2628 // If the input map state is still not good for fast path,
2629 // the situation is not likely to clear immediately. Return
2630 // error to retry later.
2631 return EAGAIN;
2632 }
2633 }
2634
2635 hs_local_causal_.insert(0.0);
2636 deliver_causal(dm.user_type(), last_sent_, wb);
2637 }
2638 else
2639 {
2640 seqno_t causal_seqno(input_map_->aru_seq());
2641 if (causal_keepalive_period_ == gu::datetime::Period(0) ||
2642 last_causal_keepalive_ + causal_keepalive_period_ <= now)
2643 {
2644 // generate traffic to make sure that group is live
2645 Datagram dg;
2646 int err(send_user(dg, 0xff, O_DROP, -1, -1));
2647 if (err != 0)
2648 {
2649 return err;
2650 }
2651 // reassign causal_seqno to be last_sent:
2652 // in order to make sure that the group is live,
2653 // safe seqno must be advanced and in this case
2654 // safe seqno equals to aru seqno.
2655 causal_seqno = last_sent_;
2656 last_causal_keepalive_ = now;
2657 }
2658 causal_queue_.push_back(CausalMessage(dm.user_type(),
2659 causal_seqno, wb));
2660 }
2661 return 0;
2662 }
2663
2664 // Limit outbound bytes to out_queue::max_outbound_bytes (1MB)
2665 // to limit the time it takes to transmit all outbound messages
2666 // during configuration change.
2667 if (output_.outbound_bytes() >= out_queue::max_outbound_bytes)
2668 {
2669 return EAGAIN;
2670 }
2671
2672 send_queue_s_ += output_.size();
2673 ++n_send_queue_s_;
2674
2675 int ret = 0;
2676
2677 if (output_.empty() == true)
2678 {
2679 int err;
2680 err = send_user(wb,
2681 dm.user_type(),
2682 dm.order(),
2683 user_send_window_,
2684 -1);
2685
2686 switch (err)
2687 {
2688 case EAGAIN:
2689 output_.push_back(std::make_pair(wb, dm));
2690 // fall through
2691 case 0:
2692 ret = 0;
2693 break;
2694 default:
2695 log_error << "send error: " << err;
2696 ret = err;
2697 }
2698 }
2699 else
2700 {
2701 output_.push_back(std::make_pair(wb, dm));
2702 }
2703
2704 return ret;
2705 }
2706
send_down(Datagram & dg,const ProtoDownMeta & dm)2707 int gcomm::evs::Proto::send_down(Datagram& dg, const ProtoDownMeta& dm)
2708 {
2709 if (isolation_end_ != gu::datetime::Date::zero())
2710 {
2711 // Node has isolated itself, don't emit any messages
2712 return 0;
2713 }
2714 else
2715 {
2716 return Protolay::send_down(dg, dm);
2717 }
2718 }
2719
2720
2721 /////////////////////////////////////////////////////////////////////////////
2722 // State handler
2723 /////////////////////////////////////////////////////////////////////////////
2724
shift_to(const State s,const bool send_j)2725 void gcomm::evs::Proto::shift_to(const State s, const bool send_j)
2726 {
2727 if (shift_to_rfcnt_ > 0) gu_throw_fatal << *this;
2728
2729 shift_to_rfcnt_++;
2730
2731 static const bool allowed[S_MAX][S_MAX] = {
2732 // CLOSED JOINING LEAVING GATHER INSTALL OPERAT
2733 { false, true, false, false, false, false }, // CLOSED
2734
2735 { false, false, true, true, false, false }, // JOINING
2736
2737 { true, false, false, false, false, false }, // LEAVING
2738
2739 { false, false, true, true, true, false }, // GATHER
2740
2741 { false, false, false, true, false, true }, // INSTALL
2742
2743 { false, false, true, true, false, false } // OPERATIONAL
2744 };
2745
2746 assert(s < S_MAX);
2747
2748 if (allowed[state_][s] == false) {
2749 gu_throw_fatal << "Forbidden state transition: "
2750 << to_string(state_) << " -> " << to_string(s);
2751 }
2752
2753 if (state() != s)
2754 {
2755 evs_log_info(I_STATE) << " state change: "
2756 << to_string(state_) << " -> " << to_string(s);
2757 }
2758 switch (s) {
2759 case S_CLOSED:
2760 {
2761 gcomm_assert(state() == S_LEAVING);
2762 gu_trace(deliver());
2763 gu_trace(deliver_local());
2764 setall_installed(false);
2765 NodeMap::value(self_i_).set_installed(true);
2766 // Construct install message containing only one node for
2767 // last trans view.
2768 MessageNodeList node_list;
2769 (void)node_list.insert_unique(
2770 std::make_pair(uuid(),
2771 MessageNode(true,
2772 false,
2773 NodeMap::value(self_i_).segment(),
2774 false,
2775 -1,
2776 current_view_.id(),
2777 input_map_->safe_seq(
2778 NodeMap::value(self_i_).index()),
2779 input_map_->range(
2780 NodeMap::value(self_i_).index()))));
2781 InstallMessage im(0,
2782 uuid(),
2783 current_view_.id(),
2784 ViewId(V_REG, uuid(), current_view_.id().seq() + 1),
2785 input_map_->safe_seq(),
2786 input_map_->aru_seq(),
2787 ++fifo_seq_,
2788 node_list);
2789 gu_trace(deliver_trans_view(im, current_view_));
2790 gu_trace(deliver_trans());
2791 gu_trace(deliver_local(true));
2792 gcomm_assert(causal_queue_.empty() == true);
2793 if (collect_stats_ == true)
2794 {
2795 handle_stats_timer();
2796 }
2797 gu_trace(deliver_empty_view());
2798 cleanup_foreign(im);
2799 cleanup_views();
2800 timers_.clear();
2801 state_ = S_CLOSED;
2802 break;
2803 }
2804 case S_JOINING:
2805 state_ = S_JOINING;
2806 reset_timer(T_STATS);
2807 break;
2808 case S_LEAVING:
2809 state_ = S_LEAVING;
2810 reset_timer(T_INACTIVITY);
2811 reset_timer(T_RETRANS);
2812 reset_timer(T_INSTALL);
2813 break;
2814 case S_GATHER:
2815 {
2816 setall_committed(false);
2817 setall_installed(false);
2818 delete install_message_;
2819 install_message_ = 0;
2820
2821 if (state() == S_OPERATIONAL)
2822 {
2823 while (output_.empty() == false)
2824 {
2825 int err;
2826 gu_trace(err = send_user(-1));
2827 if (err != 0)
2828 {
2829 gu_throw_fatal << self_string()
2830 << "send_user() failed in shifto "
2831 << "to S_GATHER: "
2832 << strerror(err);
2833 }
2834 }
2835 }
2836 else
2837 {
2838 gcomm_assert(output_.empty() == true);
2839 }
2840
2841 State prev_state(state_);
2842 state_ = S_GATHER;
2843 if (send_j == true)
2844 {
2845 gu_trace(send_join(false));
2846 }
2847 gcomm_assert(state() == S_GATHER);
2848 reset_timer(T_INACTIVITY);
2849 if (prev_state == S_OPERATIONAL || prev_state == S_JOINING)
2850 {
2851 reset_timer(T_RETRANS);
2852 reset_timer(T_INSTALL);
2853 }
2854 break;
2855 }
2856 case S_INSTALL:
2857 {
2858 gcomm_assert(install_message_ != 0);
2859 gcomm_assert(is_all_committed() == true);
2860 state_ = S_INSTALL;
2861 reset_timer(T_INACTIVITY);
2862 reset_timer(T_RETRANS);
2863 break;
2864 }
2865 case S_OPERATIONAL:
2866 {
2867 gcomm_assert(output_.empty() == true);
2868 gcomm_assert(install_message_ != 0);
2869 gcomm_assert(NodeMap::value(self_i_).join_message() != 0 &&
2870 consensus_.equal(
2871 *NodeMap::value(self_i_).join_message(),
2872 *install_message_))
2873 << "install message not consistent with own join, state: " << *this;
2874 gcomm_assert(is_all_installed() == true);
2875 gu_trace(deliver());
2876 gu_trace(deliver_local());
2877 gu_trace(deliver_trans_view(*install_message_, current_view_));
2878 gu_trace(deliver_trans());
2879 gu_trace(deliver_local(true));
2880 gcomm_assert(causal_queue_.empty() == true);
2881 input_map_->clear();
2882 if (collect_stats_ == true)
2883 {
2884 handle_stats_timer();
2885 }
2886 // End of previous view
2887
2888 // Construct new view and shift to S_OPERATIONAL before calling
2889 // deliver_reg_view(). Reg view delivery may trigger message
2890 // exchange on upper layer and operating view is needed to
2891 // handle messages.
2892
2893 previous_view_ = current_view_;
2894 std::copy(gather_views_.begin(), gather_views_.end(),
2895 std::inserter(previous_views_, previous_views_.end()));
2896 gather_views_.clear();
2897
2898 if (install_message_->version() > current_view_.version())
2899 {
2900 log_info << "EVS version upgrade " << current_view_.version()
2901 << " -> " << static_cast<int>(install_message_->version());
2902 }
2903 else if (install_message_->version() < current_view_.version())
2904 {
2905 log_info << "EVS version downgrade " << current_view_.version()
2906 << " -> " << static_cast<int>(install_message_->version());
2907 }
2908
2909 current_view_ = View(install_message_->version(),
2910 install_message_->install_view_id());
2911 size_t idx = 0;
2912
2913 const MessageNodeList& imnl(install_message_->node_list());
2914
2915 for (MessageNodeList::const_iterator i(imnl.begin());
2916 i != imnl.end(); ++i)
2917 {
2918 const UUID& uuid(MessageNodeList::key(i));
2919 const MessageNode& n(MessageNodeList::value(i));
2920
2921 // Add operational nodes to new view, assign input map index
2922 NodeMap::iterator nmi(known_.find(uuid));
2923 gcomm_assert(nmi != known_.end()) << "node " << uuid
2924 << " not found from known map";
2925 if (n.operational() == true)
2926 {
2927 current_view_.add_member(uuid, NodeMap::value(nmi).segment());
2928 NodeMap::value(nmi).set_index(idx++);
2929 }
2930 else
2931 {
2932 NodeMap::value(nmi).set_index(
2933 Node::invalid_index);
2934 }
2935
2936 }
2937
2938 if (previous_view_.id().type() == V_REG &&
2939 previous_view_.members() == current_view_.members())
2940 {
2941 evs_log_info(I_VIEWS)
2942 << "subsequent views have same members, prev view "
2943 << previous_view_ << " current view " << current_view_;
2944 }
2945
2946 input_map_->reset(current_view_.members().size());
2947 last_sent_ = -1;
2948 state_ = S_OPERATIONAL;
2949 deliver_reg_view(*install_message_, previous_view_);
2950
2951 cleanup_foreign(*install_message_);
2952 cleanup_views();
2953 cleanup_joins();
2954
2955 delete install_message_;
2956 install_message_ = 0;
2957 attempt_seq_ = 1;
2958 install_timeout_count_ = 0;
2959 gu_trace(send_gap(EVS_CALLER, UUID::nil(), current_view_.id(), Range()));;
2960 gcomm_assert(state() == S_OPERATIONAL);
2961 reset_timer(T_INACTIVITY);
2962 reset_timer(T_RETRANS);
2963 cancel_timer(T_INSTALL);
2964 new_view_logged_ = false;
2965 break;
2966 }
2967 default:
2968 gu_throw_fatal << "invalid state";
2969 }
2970 shift_to_rfcnt_--;
2971 }
2972
2973 ////////////////////////////////////////////////////////////////////////////
2974 // Message delivery
2975 ////////////////////////////////////////////////////////////////////////////
2976
deliver_causal(uint8_t user_type,seqno_t seqno,const Datagram & datagram)2977 void gcomm::evs::Proto::deliver_causal(uint8_t user_type,
2978 seqno_t seqno,
2979 const Datagram& datagram)
2980 {
2981 send_up(datagram, ProtoUpMeta(uuid(),
2982 current_view_.id(),
2983 0,
2984 user_type,
2985 O_LOCAL_CAUSAL,
2986 seqno));
2987 ++delivered_msgs_[O_LOCAL_CAUSAL];
2988 }
2989
2990
deliver_local(bool trans)2991 void gcomm::evs::Proto::deliver_local(bool trans)
2992 {
2993 // local causal
2994 const seqno_t causal_seq(trans == false ? input_map_->safe_seq() : last_sent_);
2995 gu::datetime::Date now(gu::datetime::Date::monotonic());
2996
2997 assert(input_map_->begin() == input_map_->end() ||
2998 input_map_->is_safe(input_map_->begin()) == false);
2999
3000 while (causal_queue_.empty() == false &&
3001 causal_queue_.front().seqno() <= causal_seq)
3002 {
3003 const CausalMessage& cm(causal_queue_.front());
3004 hs_local_causal_.insert(double(now.get_utc() - cm.tstamp().get_utc())/gu::datetime::Sec);
3005 deliver_causal(cm.user_type(), cm.seqno(), cm.datagram());
3006 causal_queue_.pop_front();
3007 }
3008 }
3009
validate_reg_msg(const UserMessage & msg)3010 void gcomm::evs::Proto::validate_reg_msg(const UserMessage& msg)
3011 {
3012 if (msg.source_view_id() != current_view_.id())
3013 {
3014 // Note: This implementation should guarantee same view delivery,
3015 // this is sanity check for that.
3016 gu_throw_fatal << "reg validate: not current view";
3017 }
3018
3019 // Update statistics for locally generated messages
3020 if (msg.source() == uuid())
3021 {
3022 if (msg.order() == O_SAFE)
3023 {
3024 gu::datetime::Date now(gu::datetime::Date::monotonic());
3025 double lat(double(now.get_utc() - msg.tstamp().get_utc())/
3026 gu::datetime::Sec);
3027 if (info_mask_ & I_STATISTICS) hs_safe_.insert(lat);
3028 safe_deliv_latency_.insert(lat);
3029 }
3030 else if (msg.order() == O_AGREED)
3031 {
3032 if (info_mask_ & I_STATISTICS)
3033 {
3034 gu::datetime::Date now(gu::datetime::Date::monotonic());
3035 hs_agreed_.insert(double(now.get_utc() - msg.tstamp().get_utc())/gu::datetime::Sec);
3036 }
3037 }
3038 }
3039 }
3040
3041
deliver_finish(const InputMapMsg & msg)3042 void gcomm::evs::Proto::deliver_finish(const InputMapMsg& msg)
3043 {
3044 if ((msg.msg().flags() & Message::F_AGGREGATE) == 0)
3045 {
3046 ++delivered_msgs_[msg.msg().order()];
3047 if (msg.msg().order() != O_DROP)
3048 {
3049 gu_trace(validate_reg_msg(msg.msg()));
3050 ProtoUpMeta um(msg.msg().source(),
3051 msg.msg().source_view_id(),
3052 0,
3053 msg.msg().user_type(),
3054 msg.msg().order(),
3055 msg.msg().seq());
3056 try
3057 {
3058 send_up(msg.rb(), um);
3059 }
3060 catch (...)
3061 {
3062 log_info << msg.msg() << " " << msg.rb().len();
3063 throw;
3064 }
3065 }
3066 }
3067 else
3068 {
3069 gu_trace(validate_reg_msg(msg.msg()));
3070 size_t offset(0);
3071 while (offset < msg.rb().len())
3072 {
3073 ++delivered_msgs_[msg.msg().order()];
3074 AggregateMessage am;
3075 gu_trace(am.unserialize(msg.rb().payload().data(),
3076 msg.rb().payload().size(),
3077 offset));
3078 Datagram dg(
3079 gu::SharedBuffer(
3080 new gu::Buffer(
3081 msg.rb().payload().data()
3082 + offset
3083 + am.serial_size(),
3084 msg.rb().payload().data()
3085 + offset
3086 + am.serial_size()
3087 + am.len())));
3088 ProtoUpMeta um(msg.msg().source(),
3089 msg.msg().source_view_id(),
3090 0,
3091 am.user_type(),
3092 msg.msg().order(),
3093 msg.msg().seq());
3094 gu_trace(send_up(dg, um));
3095 offset += am.serial_size() + am.len();
3096 }
3097 gcomm_assert(offset == msg.rb().len());
3098 }
3099 }
3100
deliver()3101 void gcomm::evs::Proto::deliver()
3102 {
3103 if (delivering_ == true)
3104 {
3105 gu_throw_fatal << "Recursive enter to delivery";
3106 }
3107
3108 delivering_ = true;
3109
3110 if (state() != S_OPERATIONAL &&
3111 state() != S_GATHER &&
3112 state() != S_INSTALL &&
3113 state() != S_LEAVING)
3114 {
3115 gu_throw_fatal << "invalid state: " << to_string(state());
3116 }
3117
3118 evs_log_debug(D_DELIVERY)
3119 << " aru_seq=" << input_map_->aru_seq()
3120 << " safe_seq=" << input_map_->safe_seq();
3121
3122 // Read input map head until a message which cannot be
3123 // delivered is enountered.
3124 InputMapMsgIndex::iterator i;
3125 while ((i = input_map_->begin()) != input_map_->end())
3126 {
3127 const InputMapMsg& msg(InputMapMsgIndex::value(i));
3128 if ((msg.msg().order() <= O_SAFE &&
3129 input_map_->is_safe(i) == true) ||
3130 (msg.msg().order() <= O_AGREED &&
3131 input_map_->is_agreed(i) == true) ||
3132 (msg.msg().order() <= O_FIFO &&
3133 input_map_->is_fifo(i) == true))
3134 {
3135 deliver_finish(msg);
3136 gu_trace(input_map_->erase(i));
3137 }
3138 else
3139 {
3140 if (msg.msg().order() > O_SAFE)
3141 {
3142 gu_throw_fatal << "Message with order " << msg.msg().order()
3143 << " in input map, cannot continue safely";
3144 }
3145 break;
3146 }
3147 }
3148 delivering_ = false;
3149
3150 assert(input_map_->begin() == input_map_->end() ||
3151 input_map_->is_safe(input_map_->begin()) == false);
3152
3153 }
3154
3155
deliver_trans()3156 void gcomm::evs::Proto::deliver_trans()
3157 {
3158 if (delivering_ == true)
3159 {
3160 gu_throw_fatal << "Recursive enter to delivery";
3161 }
3162
3163 delivering_ = true;
3164
3165 if (state() != S_INSTALL &&
3166 state() != S_LEAVING)
3167 gu_throw_fatal << "invalid state";
3168
3169 evs_log_debug(D_DELIVERY)
3170 << " aru_seq=" << input_map_->aru_seq()
3171 << " safe_seq=" << input_map_->safe_seq();
3172
3173 // In transitional configuration we must deliver all messages that
3174 // are fifo. This is because:
3175 // - We know that it is possible to deliver all fifo messages originated
3176 // from partitioned component as safe in partitioned component
3177 // - Aru in this component is at least the max known fifo seq
3178 // from partitioned component due to message recovery
3179 // - All FIFO messages originated from this component must be
3180 // delivered to fulfill self delivery requirement and
3181 // - FIFO messages originated from this component qualify as AGREED
3182 // in transitional configuration
3183
3184 InputMap::iterator i, i_next;
3185 for (i = input_map_->begin(); i != input_map_->end(); i = i_next)
3186 {
3187 i_next = i;
3188 ++i_next;
3189 const InputMapMsg& msg(InputMapMsgIndex::value(i));
3190 bool deliver = false;
3191 switch (msg.msg().order())
3192 {
3193 case O_SAFE:
3194 case O_AGREED:
3195 case O_FIFO:
3196 case O_DROP:
3197 if (input_map_->is_fifo(i) == true)
3198 {
3199 deliver = true;
3200 }
3201 break;
3202 default:
3203 gu_throw_fatal;
3204 }
3205
3206 if (deliver == true)
3207 {
3208 if (install_message_ != 0)
3209 {
3210 const MessageNode& mn(
3211 MessageNodeList::value(
3212 install_message_->node_list().find_checked(
3213 msg.msg().source())));
3214 if (msg.msg().seq() <= mn.im_range().hs())
3215 {
3216 deliver_finish(msg);
3217 }
3218 else
3219 {
3220 gcomm_assert(mn.operational() == false);
3221 log_info << "filtering out trans message higher than "
3222 << "install message hs "
3223 << mn.im_range().hs()
3224 << ": " << msg.msg();
3225 }
3226 }
3227 else
3228 {
3229 deliver_finish(msg);
3230 }
3231 gu_trace(input_map_->erase(i));
3232 }
3233 }
3234
3235 // Sanity check:
3236 // There must not be any messages left that
3237 // - Are originated from outside of trans conf and are FIFO
3238 // - Are originated from trans conf
3239 for (i = input_map_->begin(); i != input_map_->end(); i = i_next)
3240 {
3241 i_next = i;
3242 ++i_next;
3243 const InputMapMsg& msg(InputMapMsgIndex::value(i));
3244 NodeMap::iterator ii;
3245 gu_trace(ii = known_.find_checked(msg.msg().source()));
3246
3247 if (NodeMap::value(ii).installed() == true)
3248 {
3249 gu_throw_fatal << "Protocol error in transitional delivery "
3250 << "(self delivery constraint)";
3251 }
3252 else if (input_map_->is_fifo(i) == true)
3253 {
3254 gu_throw_fatal << "Protocol error in transitional delivery "
3255 << "(fifo from partitioned component)";
3256 }
3257 gu_trace(input_map_->erase(i));
3258 }
3259 delivering_ = false;
3260 }
3261
3262
3263 /////////////////////////////////////////////////////////////////////////////
3264 // Message handlers
3265 /////////////////////////////////////////////////////////////////////////////
3266
3267
3268
update_im_safe_seq(const size_t uuid,const seqno_t seq)3269 gcomm::evs::seqno_t gcomm::evs::Proto::update_im_safe_seq(const size_t uuid,
3270 const seqno_t seq)
3271 {
3272 const seqno_t im_safe_seq(input_map_->safe_seq(uuid));
3273 if (im_safe_seq < seq)
3274 {
3275 input_map_->set_safe_seq(uuid, seq);
3276 }
3277 return im_safe_seq;
3278 }
3279
send_request_retrans_gap(const UUID & target,const UUID & origin,const Range & range)3280 void gcomm::evs::Proto::send_request_retrans_gap(const UUID& target,
3281 const UUID& origin,
3282 const Range& range)
3283 {
3284 GapMessage gm(version_,
3285 uuid(),
3286 current_view_.id(),
3287 last_sent_,
3288 input_map_->aru_seq(),
3289 ++fifo_seq_,
3290 origin,
3291 range,
3292 Message::F_RETRANS);
3293 gu::Buffer buf;
3294 serialize(gm, buf);
3295 Datagram dg(buf);
3296 int err = send_down(dg, ProtoDownMeta(target));
3297 if (err != 0)
3298 {
3299 log_debug << "send failed: " << strerror(err);
3300 }
3301 sent_msgs_[Message::EVS_T_GAP]++;
3302 }
3303
request_retrans(const UUID & target,const UUID & origin,const Range & range)3304 void gcomm::evs::Proto::request_retrans(const UUID& target, const UUID& origin,
3305 const Range& range)
3306 {
3307 NodeMap::const_iterator origin_node_i(known_.find(origin));
3308 assert(origin_node_i != known_.end());
3309 if (origin_node_i == known_.end())
3310 {
3311 log_warn << "Origin " << origin << " not found from known nodes";
3312 return;
3313 }
3314 const Node& origin_node(NodeMap::value(origin_node_i));
3315 if (origin_node.index() == Node::invalid_index)
3316 {
3317 log_warn << "Origin " << origin << " has no index";
3318 return;
3319 }
3320 if (not gap_rate_limit(target, range))
3321 {
3322 evs_log_debug(D_RETRANS) << self_string()
3323 << " requesting retrans from " << target
3324 << " origin " << origin
3325 << " range " << range
3326 << " due to input map gap, aru "
3327 << input_map_->aru_seq();
3328 std::vector<Range> gap_ranges(input_map_->gap_range_list(
3329 origin_node.index(), range));
3330 for (std::vector<Range>::const_iterator ri(gap_ranges.begin());
3331 ri != gap_ranges.end(); ++ri)
3332 {
3333 evs_log_debug(D_RETRANS)
3334 << "Requesting retransmssion from " << target
3335 << " origin: " << origin
3336 << " range: " << *ri;
3337 send_request_retrans_gap(target, origin, *ri);
3338 }
3339 NodeMap::iterator target_i(known_.find(target));
3340 if (target_i != known_.end())
3341 {
3342 target_i->second.last_requested_range(range);
3343 }
3344 }
3345 }
3346
3347 // Select suitable node for recovering missing messages. The node
3348 // is chosen to be one with join message originating from the same
3349 // view and highest lowest unseen for origin.
3350
3351 struct SelectRecoveryNodeForMissingResult
3352 {
3353 gcomm::evs::seqno_t lowest_unseen;
3354 gcomm::UUID target;
SelectRecoveryNodeForMissingResultSelectRecoveryNodeForMissingResult3355 SelectRecoveryNodeForMissingResult()
3356 : lowest_unseen(-1)
3357 , target()
3358 { }
3359 };
3360
3361 class SelectRecoveryNodeForMissing
3362 {
3363 public:
SelectRecoveryNodeForMissing(const gcomm::evs::Proto & evs,const gcomm::UUID & origin,const gcomm::ViewId & view_id,SelectRecoveryNodeForMissingResult & result)3364 SelectRecoveryNodeForMissing(const gcomm::evs::Proto& evs,
3365 const gcomm::UUID& origin,
3366 const gcomm::ViewId& view_id,
3367 SelectRecoveryNodeForMissingResult&
3368 result /* Out parameter */)
3369 : evs_(evs)
3370 , origin_(origin)
3371 , view_id_(view_id)
3372 , result_(result)
3373 { }
3374
operator ()(const gcomm::evs::NodeMap::value_type & node_v)3375 void operator()(const gcomm::evs::NodeMap::value_type& node_v)
3376 {
3377 // Do not try to recover from self.
3378 if (evs_.uuid() == node_v.first) return;
3379
3380 if (node_v.second.operational())
3381 {
3382 gcomm::evs::seqno_t lu(get_lu_for(origin_, node_v.second));
3383 if (lu > result_.lowest_unseen)
3384 {
3385 result_.lowest_unseen = lu;
3386 result_.target = node_v.first;
3387 }
3388 }
3389 }
3390
3391 private:
get_lu_from_join_for(const gcomm::UUID & origin,const gcomm::evs::JoinMessage & jm)3392 gcomm::evs::seqno_t get_lu_from_join_for(const gcomm::UUID& origin,
3393 const gcomm::evs::JoinMessage& jm)
3394 {
3395 gcomm::evs::MessageNodeList::const_iterator origin_i(
3396 jm.node_list().find(origin));
3397 if (origin_i != jm.node_list().end())
3398 {
3399 return origin_i->second.im_range().lu();
3400 }
3401 return -1;
3402 }
3403
get_lu_for(const gcomm::UUID & origin,const gcomm::evs::Node & node)3404 gcomm::evs::seqno_t get_lu_for(const gcomm::UUID& origin,
3405 const gcomm::evs::Node& node)
3406 {
3407 const gcomm::evs::JoinMessage* jm(node.join_message());
3408 // No join message received
3409 if (not jm) return -1;
3410 // Not in the same view
3411 if (jm->source_view_id() != view_id_) return -1;
3412 return get_lu_from_join_for(origin, *jm);
3413 }
3414
3415 const gcomm::evs::Proto& evs_;
3416 const gcomm::UUID& origin_;
3417 const gcomm::ViewId& view_id_;
3418 SelectRecoveryNodeForMissingResult& result_; // Reference to out parameter
3419 };
3420
request_missing()3421 void gcomm::evs::Proto::request_missing()
3422 {
3423 // This method should be called only during configuration changes.
3424 // In operational state requests should be done based on
3425 // detected gaps and on delayed node checks.
3426 assert(state() != S_OPERATIONAL);
3427 for (NodeMap::const_iterator node_i(known_.begin()); node_i != known_.end();
3428 ++node_i)
3429 {
3430 const UUID& origin(node_i->first);
3431 if (origin == my_uuid_) continue; // No need to request from self.
3432 const Node& node(node_i->second);
3433 // Node has no index assigned, so it was not in the current group.
3434 if (node.index() == Node::invalid_index) continue;
3435
3436 Range range(input_map_->range(node.index()));
3437 if ((not range.is_empty() || range.hs() < last_sent_) &&
3438 (node.leave_message() == 0 ||
3439 node.leave_message()->seq() > range.hs()))
3440 {
3441 // Missing messages from node. If it is still considerd operational,
3442 // send a retransimission request to it. Otherwise locate some
3443 // other node to recover the missing messages.
3444 if (node.operational())
3445 {
3446 const Range request_range(range.lu(), last_sent_);
3447 if (not request_range.is_empty())
3448 {
3449 request_retrans(origin, origin, request_range);
3450 }
3451 }
3452 else
3453 {
3454 // Try to find suitable node to recover the missing messages
3455 // from origin.
3456 SelectRecoveryNodeForMissingResult result;
3457 std::for_each(known_.begin(), known_.end(),
3458 SelectRecoveryNodeForMissing(
3459 *this, origin, current_view_.id(), result));
3460 // If the target node was found, it has messages up to
3461 // result.lowest_unseen - 1 from origin.
3462 const Range request_range(range.lu(), result.lowest_unseen - 1);
3463 if (result.target != UUID::nil() && not request_range.is_empty())
3464 {
3465 request_retrans(result.target, origin, request_range);
3466 }
3467 else
3468 {
3469 evs_log_debug(D_RETRANS)
3470 << "Could not find a node to recover messages "
3471 << "from, missing from " << origin
3472 << " range: " << range
3473 << " last_sent: " << last_sent_;
3474 }
3475 }
3476 }
3477 }
3478 }
3479
3480 class ResendMissingRanges
3481 {
3482 public:
ResendMissingRanges(gcomm::evs::Proto & evs,gcomm::evs::seqno_t last_sent,const gcomm::ViewId & view_id)3483 ResendMissingRanges(gcomm::evs::Proto& evs,
3484 gcomm::evs::seqno_t last_sent,
3485 const gcomm::ViewId& view_id)
3486 : evs_(evs)
3487 , last_sent_(last_sent)
3488 , view_id_(view_id)
3489 { }
3490
operator ()(const gcomm::evs::NodeMap::value_type & node_v)3491 void operator()(const gcomm::evs::NodeMap::value_type& node_v)
3492 {
3493 if (node_v.first == evs_.uuid()) return; // No need to inspect self
3494
3495 const gcomm::evs::JoinMessage* jm(node_v.second.join_message());
3496 if (jm && jm->source_view_id() == view_id_)
3497 {
3498 resend_missing_from_join_message(*jm);
3499 }
3500
3501 const gcomm::evs::LeaveMessage* lm(node_v.second.leave_message());
3502 if (lm && lm->source_view_id() == view_id_)
3503 {
3504 resend_missing_from_leave_message(*lm);
3505 }
3506 }
3507
3508 private:
resend_missing_from_join_message(const gcomm::evs::JoinMessage & jm)3509 void resend_missing_from_join_message(const gcomm::evs::JoinMessage& jm)
3510 {
3511 gcomm::evs::MessageNodeList::const_iterator self_i(
3512 jm.node_list().find(evs_.uuid()));
3513 if (self_i == jm.node_list().end())
3514 {
3515 log_warn << "Node join message claims to be from the same "
3516 << "view but does not list this node, "
3517 << "own uuid: " << evs_.uuid()
3518 << " join message: " << jm;
3519 return;
3520 }
3521 if (self_i->second.im_range().lu() <= last_sent_)
3522 {
3523 evs_.resend(jm.source(),
3524 gcomm::evs::Range(self_i->second.im_range().lu(),
3525 last_sent_));
3526 }
3527 }
3528
resend_missing_from_leave_message(const gcomm::evs::LeaveMessage & lm)3529 void resend_missing_from_leave_message(const gcomm::evs::LeaveMessage& lm)
3530 {
3531 if (lm.aru_seq() < last_sent_)
3532 {
3533 evs_.resend(lm.source(),
3534 gcomm::evs::Range(lm.aru_seq() + 1, last_sent_));
3535 }
3536 }
3537
3538 gcomm::evs::Proto& evs_;
3539 const gcomm::evs::seqno_t last_sent_;
3540 const gcomm::ViewId& view_id_;
3541 };
3542
retrans_missing()3543 void gcomm::evs::Proto::retrans_missing()
3544 {
3545 // This method should be called only during configuration changes.
3546 // In operational state retransmits should happen only by
3547 // responding to retrans request Gap messages.
3548 assert(state() != S_OPERATIONAL);
3549
3550 // Iterate over join messages and retransmit is some nodes
3551 // have not received all messages.
3552 ResendMissingRanges resend_missing(*this, last_sent_, current_view_.id());
3553 std::for_each(known_.begin(), known_.end(), resend_missing);
3554 }
3555
handle_user_from_different_view(const Node & source_node,const UserMessage & msg)3556 void gcomm::evs::Proto::handle_user_from_different_view(
3557 const Node& source_node, const UserMessage& msg)
3558 {
3559 if (state() == S_LEAVING)
3560 {
3561 // Silent drop
3562 return;
3563 }
3564
3565 if (is_msg_from_previous_view(msg) == true)
3566 {
3567 evs_log_debug(D_FOREIGN_MSGS) << "user message "
3568 << msg
3569 << " from previous view";
3570 return;
3571 }
3572
3573 if (source_node.operational() == false)
3574 {
3575 evs_log_debug(D_STATE)
3576 << "dropping message from unoperational source "
3577 << msg.source();
3578 }
3579 else if (source_node.installed() == false)
3580 {
3581 if (install_message_ != 0 &&
3582 msg.source_view_id() == install_message_->install_view_id())
3583 {
3584 assert(state() == S_GATHER || state() == S_INSTALL);
3585 evs_log_debug(D_STATE) << " recovery user message "
3586 << msg;
3587
3588 // This is possible if install timer expires just before
3589 // new view is established on this source_node and retransmitted
3590 // install message is received just before user this message.
3591 if (state() == S_GATHER)
3592 {
3593 // Sanity check
3594 MessageNodeList::const_iterator self(
3595 install_message_->node_list().find(uuid()));
3596 gcomm_assert(self != install_message_->node_list().end()
3597 && MessageNodeList::value(self).operational() == true);
3598 // Mark all operational nodes in install message as
3599 // committed
3600 for (MessageNodeList::const_iterator
3601 mi = install_message_->node_list().begin();
3602 mi != install_message_->node_list().end(); ++mi)
3603 {
3604 if (MessageNodeList::value(mi).operational() == true)
3605 {
3606 NodeMap::iterator jj;
3607 gu_trace(jj = known_.find_checked(
3608 MessageNodeList::key(mi)));
3609 NodeMap::value(jj).set_committed(true);
3610 }
3611 }
3612 shift_to(S_INSTALL);
3613 }
3614
3615 // Other instances installed view before this one, so it is
3616 // safe to shift to S_OPERATIONAL
3617
3618 // Mark all operational nodes in install message as installed
3619 for (MessageNodeList::const_iterator
3620 mi = install_message_->node_list().begin();
3621 mi != install_message_->node_list().end(); ++mi)
3622 {
3623 if (MessageNodeList::value(mi).operational() == true)
3624 {
3625 NodeMap::iterator jj;
3626 gu_trace(jj = known_.find_checked(
3627 MessageNodeList::key(mi)));
3628 NodeMap::value(jj).set_installed(true);
3629 }
3630 }
3631
3632 gu_trace(shift_to(S_OPERATIONAL));
3633 if (pending_leave_ == true)
3634 {
3635 close();
3636 }
3637 }
3638 }
3639 else
3640 {
3641 log_debug << self_string() << " unhandled user message " << msg;
3642 }
3643 }
3644
handle_user(const UserMessage & msg,NodeMap::iterator ii,const Datagram & rb)3645 void gcomm::evs::Proto::handle_user(const UserMessage& msg,
3646 NodeMap::iterator ii,
3647 const Datagram& rb)
3648
3649 {
3650 assert(ii != known_.end());
3651 assert(state() != S_CLOSED && state() != S_JOINING);
3652 Node& inst(NodeMap::value(ii));
3653
3654 evs_log_debug(D_USER_MSGS) << "received " << msg;
3655
3656 if (msg.source_view_id() != current_view_.id())
3657 {
3658 handle_user_from_different_view(inst, msg);
3659 // Handling user message from different view may cause shift
3660 // to operational or leaving state. Check the view ID again and if it
3661 // matches to current view proceed to handling the message.
3662 if (msg.source_view_id() != current_view_.id())
3663 {
3664 return;
3665 }
3666 assert(state() == S_OPERATIONAL || state() == S_LEAVING);
3667 }
3668
3669 if (install_message_)
3670 {
3671 // Install message has been received, which means that the
3672 // members of the group already got into agreement about the
3673 // set of delivered messages.
3674 return;
3675 }
3676
3677 Range range;
3678 Range prev_range;
3679 seqno_t prev_aru;
3680 seqno_t prev_safe;
3681
3682 prev_aru = input_map_->aru_seq();
3683 prev_range = input_map_->range(inst.index());
3684
3685 // Insert only if msg seq is greater or equal than current lowest unseen
3686 if (msg.seq() >= prev_range.lu())
3687 {
3688 Datagram im_dgram(rb, rb.offset());
3689 im_dgram.normalize();
3690 gu_trace(range = input_map_->insert(inst.index(), msg, im_dgram));
3691 if (range.lu() > prev_range.lu())
3692 {
3693 inst.set_tstamp(gu::datetime::Date::monotonic());
3694 }
3695 else
3696 {
3697 evs_log_debug(D_USER_MSGS)
3698 << "Not timestamping due to user msg: range.lu: "
3699 << range.lu()
3700 << " prev_range.lu(): "
3701 << prev_range.lu();
3702 }
3703 }
3704 else
3705 {
3706 evs_log_debug(D_USER_MSGS)
3707 << "Not timestamping due to user msg: msg.seq: "
3708 << msg.seq()
3709 << " prev_range.lu(): "
3710 << prev_range.lu();
3711 range = prev_range;
3712 }
3713
3714 // Update im safe seq for self
3715 update_im_safe_seq(NodeMap::value(self_i_).index(),
3716 input_map_->aru_seq());
3717
3718 // Update safe seq for message source
3719 prev_safe = update_im_safe_seq(inst.index(), msg.aru_seq());
3720
3721 // Check for missing messages
3722 if (range.hs() > range.lu() &&
3723 (msg.flags() & Message::F_RETRANS) == 0)
3724 {
3725 request_retrans(msg.source(), msg.source(), range);
3726 }
3727
3728 // Seqno range completion and acknowledgement
3729 const seqno_t max_hs(input_map_->max_hs());
3730 if (output_.empty() == true &&
3731 (state() == S_OPERATIONAL || state() == S_GATHER) &&
3732 (msg.flags() & Message::F_MSG_MORE) == 0 &&
3733 (last_sent_ < max_hs))
3734 {
3735 // Message not originated from this instance, output queue is empty
3736 // and last_sent seqno should be advanced
3737 gu_trace(complete_user(max_hs));
3738 }
3739 else if (output_.empty() == true &&
3740 input_map_->aru_seq() != prev_aru)
3741 {
3742 // Output queue empty and aru changed, send gap to inform others
3743 evs_log_debug(D_GAP_MSGS) << "sending empty gap";
3744 gu_trace(send_gap(EVS_CALLER, UUID::nil(), current_view_.id(), Range()));
3745 }
3746
3747 // Send messages
3748 if (state() == S_OPERATIONAL)
3749 {
3750 size_t n_sent(0);
3751 while (output_.empty() == false)
3752 {
3753 int err;
3754 gu_trace(err = send_user(send_window_));
3755 if (err != 0)
3756 {
3757 if (err == EAGAIN && n_sent == 0)
3758 {
3759 // If the send window was exhausted, send a gap
3760 // message to advance aru_seq/safe_seq on peers.
3761 gu_trace(send_gap(EVS_CALLER, UUID::nil(),
3762 current_view_.id(), Range()));
3763 }
3764 break;
3765 }
3766 else
3767 {
3768 ++n_sent;
3769 }
3770 }
3771 }
3772
3773 // Deliver messages
3774 gu_trace(deliver());
3775 gu_trace(deliver_local());
3776
3777 // If in recovery state, send join each time input map aru seq reaches
3778 // last sent and either input map aru or safe seq has changed.
3779 if (state() == S_GATHER &&
3780 consensus_.highest_reachable_safe_seq() == input_map_->aru_seq() &&
3781 (prev_aru != input_map_->aru_seq() ||
3782 prev_safe != input_map_->safe_seq()) &&
3783 (msg.flags() & Message::F_RETRANS) == 0)
3784 {
3785 gcomm_assert(output_.empty() == true);
3786 if (consensus_.is_consensus() == false)
3787 {
3788 gu_trace(send_join());
3789 }
3790 }
3791 }
3792
3793
handle_delegate(const DelegateMessage & msg,NodeMap::iterator ii,const Datagram & rb)3794 void gcomm::evs::Proto::handle_delegate(const DelegateMessage& msg,
3795 NodeMap::iterator ii,
3796 const Datagram& rb)
3797 {
3798 gcomm_assert(ii != known_.end());
3799 evs_log_debug(D_DELEGATE_MSGS) << "delegate message " << msg;
3800 Message umsg;
3801 size_t offset;
3802 gu_trace(offset = unserialize_message(UUID::nil(), rb, &umsg));
3803 gu_trace(handle_msg(umsg, Datagram(rb, offset), false));
3804 }
3805
3806
handle_gap(const GapMessage & msg,NodeMap::iterator ii)3807 void gcomm::evs::Proto::handle_gap(const GapMessage& msg, NodeMap::iterator ii)
3808 {
3809 assert(ii != known_.end());
3810 assert(state() != S_CLOSED && state() != S_JOINING);
3811
3812 Node& inst(NodeMap::value(ii));
3813 evs_log_debug(D_GAP_MSGS) << "gap message " << msg;
3814
3815
3816 if ((msg.flags() & Message::F_COMMIT) != 0)
3817 {
3818 log_debug << self_string() << " commit gap from " << msg.source();
3819 if (state() == S_GATHER &&
3820 install_message_ != 0 &&
3821 install_message_->install_view_id() == msg.source_view_id() &&
3822 install_message_->fifo_seq() == msg.seq())
3823 {
3824 inst.set_committed(true);
3825 inst.set_tstamp(gu::datetime::Date::monotonic());
3826 if (is_all_committed() == true)
3827 {
3828 shift_to(S_INSTALL);
3829 gu_trace(send_gap(EVS_CALLER, UUID::nil(),
3830 install_message_->install_view_id(),
3831 Range()));;
3832 }
3833 }
3834 else if (state() == S_GATHER &&
3835 install_message_ != 0 &&
3836 install_message_->install_view_id() == msg.source_view_id() &&
3837 install_message_->fifo_seq() < msg.seq())
3838 {
3839 // new install message has been generated
3840 shift_to(S_GATHER, true);
3841 }
3842 else
3843 {
3844 evs_log_debug(D_GAP_MSGS) << " unhandled commit gap " << msg;
3845 }
3846 return;
3847 }
3848 else if (state() == S_INSTALL &&
3849 install_message_ != 0 &&
3850 install_message_->install_view_id() == msg.source_view_id())
3851 {
3852 evs_log_debug(D_STATE) << "install gap " << msg;
3853 inst.set_installed(true);
3854 inst.set_tstamp(gu::datetime::Date::monotonic());
3855 if (is_all_installed() == true)
3856 {
3857 gu_trace(shift_to(S_OPERATIONAL));
3858 if (pending_leave_ == true)
3859 {
3860 close();
3861 }
3862 }
3863 return;
3864 }
3865 else if (msg.source_view_id() != current_view_.id())
3866 {
3867 if (state() == S_LEAVING)
3868 {
3869 // Silently drop
3870 return;
3871 }
3872
3873 if (is_msg_from_previous_view(msg) == true)
3874 {
3875 evs_log_debug(D_FOREIGN_MSGS) << "gap message from previous view";
3876 return;
3877 }
3878
3879 if (inst.operational() == false)
3880 {
3881 evs_log_debug(D_STATE)
3882 << "dropping message from unoperational source "
3883 << msg.source();
3884 }
3885 else if (inst.installed() == false)
3886 {
3887 evs_log_debug(D_STATE)
3888 << "dropping message from uninstalled source "
3889 << msg.source();
3890 }
3891 else
3892 {
3893 log_debug << "unhandled gap message " << msg;
3894 }
3895 return;
3896 }
3897
3898 gcomm_assert(msg.source_view_id() == current_view_.id());
3899
3900 //
3901 seqno_t prev_safe;
3902
3903 prev_safe = update_im_safe_seq(inst.index(), msg.aru_seq());
3904
3905 // Deliver messages and update tstamp only if safe_seq changed
3906 // for the source.
3907 if (prev_safe != input_map_->safe_seq(inst.index()))
3908 {
3909 inst.set_tstamp(gu::datetime::Date::monotonic());
3910 }
3911
3912 //
3913 if (msg.range_uuid() == uuid())
3914 {
3915 if (msg.range().hs() > last_sent_ &&
3916 (state() == S_OPERATIONAL || state() == S_GATHER))
3917 {
3918 // This could be leaving node requesting messages up to
3919 // its last sent.
3920 gu_trace(complete_user(msg.range().hs()));
3921 }
3922 const seqno_t upper_bound(
3923 std::min(msg.range().hs(), last_sent_));
3924 if (msg.range().lu() <= upper_bound)
3925 {
3926 gu_trace(resend(msg.source(),
3927 Range(msg.range().lu(), upper_bound)));
3928 }
3929 }
3930 else if ((msg.flags() & Message::F_RETRANS) != 0 &&
3931 msg.source() != uuid())
3932 {
3933 gu_trace(recover(msg.source(), msg.range_uuid(), msg.range()));
3934 }
3935
3936 //
3937 if (state() == S_OPERATIONAL)
3938 {
3939 if (output_.empty() == false)
3940 {
3941 while (output_.empty() == false)
3942 {
3943 int err;
3944 gu_trace(err = send_user(send_window_));
3945 if (err != 0)
3946 break;
3947 }
3948 }
3949 else
3950 {
3951 const seqno_t max_hs(input_map_->max_hs());
3952 if (last_sent_ < max_hs)
3953 {
3954 gu_trace(complete_user(max_hs));
3955 }
3956 }
3957 }
3958
3959 gu_trace(deliver());
3960 gu_trace(deliver_local());
3961
3962 //
3963 if (state() == S_GATHER &&
3964 consensus_.highest_reachable_safe_seq() == input_map_->aru_seq() &&
3965 prev_safe != input_map_->safe_seq() )
3966 {
3967 gcomm_assert(output_.empty() == true);
3968 if (consensus_.is_consensus() == false)
3969 {
3970 gu_trace(send_join());
3971 }
3972 }
3973 }
3974
3975
update_im_safe_seqs(const MessageNodeList & node_list)3976 bool gcomm::evs::Proto::update_im_safe_seqs(const MessageNodeList& node_list)
3977 {
3978 bool updated = false;
3979 // Update input map state
3980 for (MessageNodeList::const_iterator i = node_list.begin();
3981 i != node_list.end(); ++i)
3982 {
3983 const UUID& node_uuid(MessageNodeList::key(i));
3984 const Node& local_node(NodeMap::value(known_.find_checked(node_uuid)));
3985 const MessageNode& node(MessageNodeList::value(i));
3986 gcomm_assert(node.view_id() == current_view_.id());
3987 const seqno_t safe_seq(node.safe_seq());
3988 seqno_t prev_safe_seq;
3989 gu_trace(prev_safe_seq = update_im_safe_seq(local_node.index(), safe_seq));
3990 if (prev_safe_seq != safe_seq &&
3991 input_map_->safe_seq(local_node.index()) == safe_seq)
3992 {
3993 updated = true;
3994 }
3995 }
3996 return updated;
3997 }
3998
retrans_leaves(const MessageNodeList & node_list)3999 void gcomm::evs::Proto::retrans_leaves(const MessageNodeList& node_list)
4000 {
4001 for (NodeMap::const_iterator li = known_.begin(); li != known_.end(); ++li)
4002 {
4003 const Node& local_node(NodeMap::value(li));
4004 if (local_node.leave_message() != 0 &&
4005 local_node.is_inactive() == false)
4006 {
4007 MessageNodeList::const_iterator msg_li(
4008 node_list.find(NodeMap::key(li)));
4009
4010 if (msg_li == node_list.end() ||
4011 MessageNodeList::value(msg_li).leaving() == false)
4012 {
4013 const LeaveMessage& lm(*NodeMap::value(li).leave_message());
4014 LeaveMessage send_lm(lm.version(),
4015 lm.source(),
4016 lm.source_view_id(),
4017 lm.seq(),
4018 lm.aru_seq(),
4019 lm.fifo_seq(),
4020 Message::F_RETRANS | Message::F_SOURCE);
4021
4022 gu::Buffer buf;
4023 serialize(send_lm, buf);
4024 Datagram dg(buf);
4025 gu_trace(send_delegate(dg, UUID::nil()));
4026 }
4027 }
4028 }
4029 }
4030
4031
4032 class SelectSuspectsOp
4033 {
4034 public:
SelectSuspectsOp(gcomm::evs::MessageNodeList & nl)4035 SelectSuspectsOp(gcomm::evs::MessageNodeList& nl) : nl_(nl) { }
4036
operator ()(const gcomm::evs::MessageNodeList::value_type & vt) const4037 void operator()(const gcomm::evs::MessageNodeList::value_type& vt) const
4038 {
4039 if (gcomm::evs::MessageNodeList::value(vt).suspected() == true)
4040 {
4041 nl_.insert_unique(vt);
4042 }
4043 }
4044 private:
4045 gcomm::evs::MessageNodeList& nl_;
4046 };
4047
check_suspects(const UUID & source,const MessageNodeList & nl)4048 void gcomm::evs::Proto::check_suspects(const UUID& source,
4049 const MessageNodeList& nl)
4050 {
4051 assert(source != uuid());
4052 MessageNodeList suspected;
4053 for_each(nl.begin(), nl.end(), SelectSuspectsOp(suspected));
4054
4055 for (MessageNodeList::const_iterator i(suspected.begin());
4056 i != suspected.end(); ++i)
4057 {
4058 const UUID& node_uuid(MessageNodeList::key(i));
4059 const MessageNode& node(MessageNodeList::value(i));
4060 if (node.suspected() == true)
4061 {
4062 if (node_uuid != uuid())
4063 {
4064 size_t s_cnt(0);
4065 // Iterate over join messages to see if majority of current
4066 // view agrees with the suspicion
4067 for (NodeMap::const_iterator j(known_.begin());
4068 j != known_.end(); ++j)
4069 {
4070 const JoinMessage* jm(NodeMap::value(j).join_message());
4071 if (jm != 0 && jm->source() != node_uuid &&
4072 current_view_.is_member(jm->source()) == true)
4073 {
4074 MessageNodeList::const_iterator mni(jm->node_list().find(node_uuid));
4075 if (mni != jm->node_list().end())
4076 {
4077 const MessageNode& mn(MessageNodeList::value(mni));
4078 if (mn.suspected() == true)
4079 {
4080 ++s_cnt;
4081 }
4082 }
4083 }
4084 }
4085 const Node& kn(NodeMap::value(known_.find_checked(node_uuid)));
4086 if (kn.operational() == true &&
4087 s_cnt > current_view_.members().size()/2)
4088 {
4089 evs_log_info(I_STATE)
4090 << " declaring suspected "
4091 << node_uuid << " as inactive";
4092 set_inactive(node_uuid);
4093 }
4094 }
4095 }
4096 }
4097 }
4098
4099
cross_check_inactives(const UUID & source,const MessageNodeList & nl)4100 void gcomm::evs::Proto::cross_check_inactives(const UUID& source,
4101 const MessageNodeList& nl)
4102 {
4103 assert(source != uuid());
4104
4105 // Do elimination by suspect status
4106 NodeMap::const_iterator source_i(known_.find_checked(source));
4107
4108 for (MessageNodeList::const_iterator i(nl.begin()); i != nl.end(); ++i)
4109 {
4110 const UUID& node_uuid(MessageNodeList::key(i));
4111 const MessageNode& node(MessageNodeList::value(i));
4112 if (node.operational() == false)
4113 {
4114 NodeMap::iterator local_i(known_.find(node_uuid));
4115 if (local_i != known_.end() && node_uuid != uuid())
4116 {
4117 const Node& local_node(NodeMap::value(local_i));
4118 if (local_node.suspected())
4119 {
4120 // This node is suspecting and the source node has
4121 // already set inactve, mark also locally inactive.
4122 set_inactive(node_uuid);
4123 }
4124 }
4125 }
4126 }
4127 }
4128
4129
4130 // Asymmetry elimination:
4131 // 1a) Find all joins that has this node marked as operational and which
4132 // this node considers operational
4133 // 1b) Mark all operational nodes without join message unoperational
4134 // 2) Iterate over join messages gathered in 1a, find all
4135 // unoperational entries and mark them unoperational too
asymmetry_elimination()4136 void gcomm::evs::Proto::asymmetry_elimination()
4137 {
4138 // Allow some time to pass from setting install timers to get
4139 // join messages accumulated.
4140 const gu::datetime::Date now(gu::datetime::Date::monotonic());
4141 TimerList::const_iterator ti(
4142 find_if(timers_.begin(), timers_.end(), TimerSelectOp(T_INSTALL)));
4143
4144 assert(ti != timers_.end());
4145 if (ti == timers_.end())
4146 {
4147 log_warn << "install timer not set in asymmetry_elimination()";
4148 return;
4149 }
4150
4151 if (install_timeout_ - suspect_timeout_ < TimerList::key(ti) - now)
4152 {
4153 // No check yet
4154 return;
4155 }
4156
4157 // Record initial operational state for logging
4158 std::vector<int> oparr_before(known_.size());
4159 size_t index(0);
4160 for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
4161 {
4162 oparr_before[index] = (NodeMap::value(i).operational() == true);
4163 index++;
4164 }
4165 std::list<const JoinMessage*> joins;
4166
4167 // Compose list of join messages
4168 for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
4169 {
4170 const UUID& node_uuid(NodeMap::key(i));
4171 const Node& node(NodeMap::value(i));
4172 const JoinMessage* jm(node.join_message());
4173 if (jm != 0)
4174 {
4175 MessageNodeList::const_iterator self_ref(
4176 jm->node_list().find(uuid()));
4177 if (node.operational() == true &&
4178 self_ref != jm->node_list().end() &&
4179 MessageNodeList::value(self_ref).operational() == true)
4180 {
4181 joins.push_back(NodeMap::value(i).join_message());
4182 }
4183 }
4184 else if (node.operational() == true)
4185 {
4186 evs_log_info(I_STATE)
4187 << "marking operational node "
4188 << node_uuid << " without "
4189 << "join message inactive in asymmetry elimination";
4190 set_inactive(node_uuid);
4191 }
4192 }
4193
4194 // Setting node inactive may remove join message and so invalidate
4195 // pointer in joins list, so collect set of UUIDs to set inactive
4196 // and do inactivation in separate loop.
4197 std::set<UUID> to_inactive;
4198 // Iterate over join messages and collect nodes to be set inactive
4199 for (std::list<const JoinMessage*>::const_iterator i(joins.begin());
4200 i != joins.end(); ++i)
4201 {
4202 for (MessageNodeList::const_iterator j((*i)->node_list().begin());
4203 j != (*i)->node_list().end(); ++j)
4204 {
4205 if (MessageNodeList::value(j).operational() == false)
4206 {
4207 to_inactive.insert(MessageNodeList::key(j));
4208 }
4209 }
4210 }
4211 joins.clear();
4212 for (std::set<UUID>::const_iterator i(to_inactive.begin());
4213 i != to_inactive.end(); ++i)
4214 {
4215 NodeMap::const_iterator ni(known_.find(*i));
4216 if (ni != known_.end())
4217 {
4218 if (NodeMap::value(ni).operational() == true)
4219 {
4220 evs_log_info(I_STATE) << "setting " << *i
4221 << " inactive in asymmetry elimination";
4222 set_inactive(*i);
4223 }
4224 }
4225 else
4226 {
4227 log_warn << "node " << *i << " not found from known list in ae";
4228 }
4229 }
4230
4231 // Compute final state and log if it has changed
4232 std::vector<int> oparr_after(known_.size());
4233 index = 0;
4234 for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
4235 {
4236 oparr_after[index] = (NodeMap::value(i).operational() == true);
4237 index++;
4238 }
4239
4240 if (oparr_before != oparr_after)
4241 {
4242 evs_log_info(I_STATE) << "before asym elimination";
4243 if (info_mask_ & I_STATE)
4244 {
4245 std::copy(oparr_before.begin(), oparr_before.end(),
4246 std::ostream_iterator<int>(std::cerr, " "));
4247 std::cerr << "\n";
4248 }
4249
4250 evs_log_info(I_STATE) << "after asym elimination";
4251 if (info_mask_ & I_STATE)
4252 {
4253 std::copy(oparr_after.begin(), oparr_after.end(),
4254 std::ostream_iterator<int>(std::cerr, " "));
4255 std::cerr << "\n";
4256 }
4257 }
4258 }
4259
4260 // For each node thas has no join message associated, iterate over other
4261 // known nodes' join messages to find out if the node without join message
4262 // should be declared inactive.
check_unseen()4263 void gcomm::evs::Proto::check_unseen()
4264 {
4265 for (NodeMap::iterator i(known_.begin()); i != known_.end(); ++i)
4266 {
4267
4268 const UUID& node_uuid(NodeMap::key(i));
4269 Node& node(NodeMap::value(i));
4270
4271 if (node_uuid != uuid() &&
4272 current_view_.is_member(node_uuid) == false &&
4273 node.join_message() == 0 &&
4274 node.operational() == true)
4275 {
4276 evs_log_debug(D_STATE) << "checking operational unseen "
4277 << node_uuid;
4278 size_t cnt(0), inact_cnt(0);
4279 for (NodeMap::iterator j(known_.begin()); j != known_.end(); ++j)
4280 {
4281 const JoinMessage* jm(NodeMap::value(j).join_message());
4282 if (jm == 0 || NodeMap::key(j) == uuid())
4283 {
4284 continue;
4285 }
4286 MessageNodeList::const_iterator mn_i;
4287 for (mn_i = jm->node_list().begin();
4288 mn_i != jm->node_list().end(); ++mn_i)
4289 {
4290 NodeMap::const_iterator known_i(
4291 known_.find(MessageNodeList::key(mn_i)));
4292 if (known_i == known_.end() ||
4293 (MessageNodeList::value(mn_i).operational() == true &&
4294 NodeMap::value(known_i).join_message() == 0))
4295 {
4296 evs_log_debug(D_STATE)
4297 << "all joins not locally present for "
4298 << NodeMap::key(j)
4299 << " join message node list";
4300 return;
4301 }
4302 }
4303
4304 if ((mn_i = jm->node_list().find(node_uuid))
4305 != jm->node_list().end())
4306 {
4307 const MessageNode& mn(MessageNodeList::value(mn_i));
4308 evs_log_debug(D_STATE)
4309 << "found " << node_uuid << " from " << NodeMap::key(j)
4310 << " join message: "
4311 << mn.view_id() << " "
4312 << mn.operational();
4313 if (mn.view_id() != ViewId(V_REG))
4314 {
4315 ++cnt;
4316 if (mn.operational() == false) ++inact_cnt;
4317 }
4318 }
4319 }
4320 if (cnt > 0 && cnt == inact_cnt)
4321 {
4322 evs_log_info(I_STATE)
4323 << "unseen node marked inactive by others (cnt="
4324 << cnt
4325 << ", inact_cnt="
4326 << inact_cnt
4327 << ")";
4328 set_inactive(node_uuid);
4329 }
4330 }
4331 }
4332 }
4333
4334
4335 // Iterate over all join messages. If some node has nil view id and suspected
4336 // flag true in all present join messages, declare it inactive.
check_nil_view_id()4337 void gcomm::evs::Proto::check_nil_view_id()
4338 {
4339 size_t join_counts(0);
4340 std::map<UUID, size_t > nil_counts;
4341 for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
4342 {
4343 const JoinMessage* jm(NodeMap::value(i).join_message());
4344 if (jm == 0)
4345 {
4346 continue;
4347 }
4348 ++join_counts;
4349 for (MessageNodeList::const_iterator j(jm->node_list().begin());
4350 j != jm->node_list().end(); ++j)
4351 {
4352 const MessageNode& mn(MessageNodeList::value(j));
4353 if (mn.view_id() == ViewId(V_REG))
4354 {
4355 // todo: investigate why removing mn.suspected() == true
4356 // condition causes some unit tests to fail
4357 if (mn.suspected() == true)
4358 {
4359 const UUID& uuid(MessageNodeList::key(j));
4360 ++nil_counts[uuid];
4361 }
4362 }
4363 }
4364 }
4365 for (std::map<UUID, size_t>::const_iterator
4366 i(nil_counts.begin()); i != nil_counts.end(); ++i)
4367 {
4368 if (i->second == join_counts && is_inactive(i->first) == false)
4369 {
4370 log_info << "node " << i->first
4371 << " marked with nil view id and suspected in all present"
4372 << " join messages, declaring inactive";
4373 set_inactive(i->first);
4374 }
4375 }
4376 }
4377
join_rate_limit() const4378 bool gcomm::evs::Proto::join_rate_limit() const
4379 {
4380 gu::datetime::Date now(gu::datetime::Date::monotonic());
4381 // Limit join message sending. It is likely that
4382 // the transfer of user messages which were flushed into network
4383 // in shift to GATHER state takes some time. Too frequent join message
4384 // send will cause unwanted retransmits which will pile up in the
4385 // socket send queue.
4386 if (now < last_sent_join_tstamp_ + 100*gu::datetime::MSec)
4387 {
4388 evs_log_debug(D_JOIN_MSGS) << "join rate limit";
4389 return true;
4390 }
4391 return false;
4392 }
4393
handle_join(const JoinMessage & msg,NodeMap::iterator ii)4394 void gcomm::evs::Proto::handle_join(const JoinMessage& msg, NodeMap::iterator ii)
4395 {
4396 assert(ii != known_.end());
4397 assert(state() != S_CLOSED);
4398
4399 Node& inst(NodeMap::value(ii));
4400
4401 evs_log_debug(D_JOIN_MSGS) << " " << msg;
4402
4403 if (state() == S_LEAVING)
4404 {
4405 if (msg.source_view_id() == current_view_.id())
4406 {
4407 inst.set_tstamp(gu::datetime::Date::monotonic());
4408 // Join messages are needed for detecting gaps in message
4409 // sequences on other nodes.
4410 inst.set_join_message(&msg);
4411 MessageNodeList same_view;
4412 for_each(msg.node_list().begin(), msg.node_list().end(),
4413 SelectNodesOp(same_view, current_view_.id(),
4414 true, true));
4415 if (update_im_safe_seqs(same_view) == true)
4416 {
4417 gu_trace(send_leave(false));
4418 }
4419 request_missing();
4420 }
4421 return;
4422 }
4423 else if (is_msg_from_previous_view(msg) == true)
4424 {
4425 return;
4426 }
4427 else if (install_message_ != 0)
4428 {
4429 // Note: don't send join from this branch yet, join is
4430 // sent at the end of this method
4431 if (install_message_->source() == msg.source())
4432 {
4433 evs_log_info(I_STATE)
4434 << "shift to gather due to representative "
4435 << msg.source() << " join";
4436 if (msg.source_view_id() == install_message_->install_view_id())
4437 {
4438 // Representative reached operational state, we follow
4439 // Other instances installed view before this one, so it is
4440 // safe to shift to S_OPERATIONAL
4441
4442 // Mark all operational nodes in install message as installed
4443 for (MessageNodeList::const_iterator
4444 mi = install_message_->node_list().begin();
4445 mi != install_message_->node_list().end(); ++mi)
4446 {
4447 if (MessageNodeList::value(mi).operational() == true)
4448 {
4449 NodeMap::iterator jj;
4450 gu_trace(jj = known_.find_checked(
4451 MessageNodeList::key(mi)));
4452 NodeMap::value(jj).set_installed(true);
4453 }
4454 }
4455 inst.set_tstamp(gu::datetime::Date::monotonic());
4456 if (state() == S_INSTALL)
4457 {
4458 gu_trace(shift_to(S_OPERATIONAL));
4459 if (pending_leave_ == true)
4460 {
4461 close();
4462 return;
4463 }
4464 // proceed to process actual join message
4465 }
4466 else
4467 {
4468 log_warn << self_string()
4469 << "received join message from new "
4470 << "view while in GATHER, dropping";
4471 return;
4472 }
4473 }
4474 gu_trace(shift_to(S_GATHER, false));
4475 }
4476 else if (consensus_.is_consistent(*install_message_) == true)
4477 {
4478 return;
4479 // Commented out: It seems to be better strategy to
4480 // just wait source of inconsistent join to time out
4481 // instead of shifting to gather. #443
4482
4483 // if (consensus_.is_consistent(msg) == true)
4484 // {
4485 // return;
4486 // }
4487 // else
4488 // {
4489 // log_warn << "join message not consistent " << msg;
4490 // log_info << "state (stderr): ";
4491 // std::cerr << *this << std::endl;
4492 //
4493 // gu_trace(shift_to(S_GATHER, false));
4494 // }
4495 }
4496 else
4497 {
4498 evs_log_info(I_STATE)
4499 << "shift to GATHER, install message is "
4500 << "inconsistent when handling join from "
4501 << msg.source() << " " << msg.source_view_id();
4502 evs_log_info(I_STATE) << "state: " << *this;
4503 gu_trace(shift_to(S_GATHER, false));
4504 }
4505 }
4506 else if (state() != S_GATHER)
4507 {
4508 evs_log_info(I_STATE)
4509 << " shift to GATHER while handling join message from "
4510 << msg.source() << " " << msg.source_view_id();
4511 gu_trace(shift_to(S_GATHER, false));
4512 }
4513
4514 gcomm_assert(output_.empty() == true);
4515
4516 // If source node is member of current view but has already
4517 // formed new view, mark it unoperational
4518 if (current_view_.is_member(msg.source()) == true &&
4519 msg.source_view_id().seq() > current_view_.id().seq())
4520 {
4521 evs_log_info(I_STATE)
4522 << " join source has already formed new view, marking inactive";
4523 set_inactive(msg.source());
4524 return;
4525 }
4526
4527 // Collect view ids to gather_views_ list.
4528 // Add unseen nodes to known list and evicted nodes to evicted list.
4529 // Evicted nodes must also be added to known list for GATHER time
4530 // bookkeeping.
4531 // No need to adjust node state here, it is done later on in
4532 // check_suspects()/cross_check_inactives().
4533 for (MessageNodeList::const_iterator i(msg.node_list().begin());
4534 i != msg.node_list().end(); ++i)
4535 {
4536 NodeMap::iterator ni(known_.find(MessageNodeList::key(i)));
4537 const UUID mn_uuid(MessageNodeList::key(i));
4538 const MessageNode& mn(MessageNodeList::value(i));
4539 gather_views_.insert(std::make_pair(mn.view_id(),
4540 gu::datetime::Date::monotonic()));
4541 if (ni == known_.end())
4542 {
4543 known_.insert_unique(
4544 std::make_pair(mn_uuid, Node(*this)));
4545 }
4546
4547 // Evict nodes according to join message
4548 if (mn_uuid != uuid() && mn.evicted() == true)
4549 {
4550 set_inactive(mn_uuid);
4551 if (is_evicted(mn_uuid) == false)
4552 {
4553 evict(mn_uuid);
4554 }
4555 }
4556 }
4557
4558 // Timestamp source if it sees processing node as operational.
4559 // Adjust local entry operational status.
4560 MessageNodeList::const_iterator self(msg.node_list().find(uuid()));
4561 if (msg.node_list().end() != self)
4562 {
4563 if(MessageNodeList::value(self).operational() == true)
4564 {
4565 inst.set_tstamp(gu::datetime::Date::monotonic());
4566 }
4567 else
4568 {
4569 evs_log_info(I_STATE)
4570 << " declaring source " << msg.source()
4571 << " as inactive (mutual exclusion)";
4572 set_inactive(msg.source());
4573 }
4574 }
4575 inst.set_join_message(&msg);
4576
4577 // Select nodes that are coming from the same view as seen by
4578 // message source
4579 MessageNodeList same_view;
4580 for_each(msg.node_list().begin(), msg.node_list().end(),
4581 SelectNodesOp(same_view, current_view_.id(), true, true));
4582 // Find out self from node list
4583 MessageNodeList::const_iterator nlself_i(same_view.find(uuid()));
4584
4585 // Other node coming from the same view
4586 if (msg.source() != uuid() &&
4587 msg.source_view_id() == current_view_.id())
4588 {
4589 gcomm_assert(nlself_i != same_view.end());
4590 // Update input map state
4591 (void)update_im_safe_seqs(same_view);
4592
4593 // Find out max hs and complete up to that if needed
4594 MessageNodeList::const_iterator max_hs_i(
4595 max_element(same_view.begin(), same_view.end(), RangeHsCmp()));
4596 const seqno_t max_hs(MessageNodeList::value(max_hs_i).im_range().hs());
4597 if (last_sent_ < max_hs)
4598 {
4599 gu_trace(complete_user(max_hs));
4600 }
4601 }
4602
4603 // Request missing messages from other nodes.
4604 request_missing();
4605 // Retrans leave messages that others are missing
4606 gu_trace(retrans_leaves(same_view));
4607
4608 // Make cross check to resolve conflict if two nodes
4609 // declare each other inactive. There is no need to make
4610 // this for own messages.
4611 if (msg.source() != uuid())
4612 {
4613 gu_trace(check_suspects(msg.source(), same_view));
4614 gu_trace(cross_check_inactives(msg.source(), same_view));
4615 gu_trace(check_unseen());
4616 gu_trace(check_nil_view_id());
4617 }
4618
4619 // Eliminate asymmetry according to operational status flags in
4620 // join messages
4621 gu_trace(asymmetry_elimination());
4622
4623 // If current join message differs from current state, send new join
4624 const JoinMessage* curr_join(NodeMap::value(self_i_).join_message());
4625 MessageNodeList new_nl;
4626 populate_node_list(&new_nl);
4627
4628 if (curr_join == 0 ||
4629 (curr_join->aru_seq() != input_map_->aru_seq() ||
4630 curr_join->seq() != input_map_->safe_seq() ||
4631 curr_join->node_list() != new_nl))
4632 {
4633 gu_trace(create_join());
4634 if (consensus_.is_consensus() == false && not join_rate_limit())
4635 {
4636 send_join(false);
4637 }
4638 }
4639
4640 if (consensus_.is_consensus() == true)
4641 {
4642 if (is_representative(uuid()) == true)
4643 {
4644 gu_trace(send_install(EVS_CALLER));
4645 }
4646 }
4647 }
4648
4649
handle_leave(const LeaveMessage & msg,NodeMap::iterator ii)4650 void gcomm::evs::Proto::handle_leave(const LeaveMessage& msg,
4651 NodeMap::iterator ii)
4652 {
4653 assert(ii != known_.end());
4654 assert(state() != S_CLOSED && state() != S_JOINING);
4655
4656 Node& node(NodeMap::value(ii));
4657 evs_log_debug(D_LEAVE_MSGS) << "leave message " << msg;
4658
4659 // Leave messages must be always handled. They carry aru_seq information
4660 // which is used to retrasmit missing messages.
4661
4662 node.set_leave_message(&msg);
4663 if (msg.source() == uuid())
4664 {
4665 // The last one to live, instant close. Otherwise continue
4666 // serving until it becomes apparent that others have
4667 // leave message.
4668 if (current_view_.members().size() == 1)
4669 {
4670 gu_trace(shift_to(S_CLOSED));
4671 }
4672 }
4673 else
4674 {
4675 // Always set node nonoperational if leave message is seen
4676 node.set_operational(false);
4677 if (msg.source_view_id() != current_view_.id() ||
4678 is_msg_from_previous_view(msg) == true)
4679 {
4680 // Silent drop
4681 return;
4682 }
4683
4684 const seqno_t prev_safe_seq(update_im_safe_seq(node.index(), msg.aru_seq()));
4685 if (prev_safe_seq != input_map_->safe_seq(node.index()))
4686 {
4687 node.set_tstamp(gu::datetime::Date::monotonic());
4688 }
4689 if (state() == S_OPERATIONAL)
4690 {
4691 evs_log_info(I_STATE)
4692 << " shift to GATHER when handling leave from "
4693 << msg.source() << " " << msg.source_view_id();
4694 gu_trace(shift_to(S_GATHER, true));
4695 }
4696 else if (state() == S_GATHER &&
4697 prev_safe_seq != input_map_->safe_seq(node.index()))
4698 {
4699 gu_trace(send_join());
4700 }
4701 }
4702 }
4703
4704
handle_install(const InstallMessage & msg,NodeMap::iterator ii)4705 void gcomm::evs::Proto::handle_install(const InstallMessage& msg,
4706 NodeMap::iterator ii)
4707 {
4708
4709 assert(ii != known_.end());
4710 assert(state() != S_CLOSED && state() != S_JOINING);
4711
4712 Node& inst(NodeMap::value(ii));
4713
4714 evs_log_debug(D_INSTALL_MSGS) << "install msg " << msg;
4715
4716 if (state() == S_LEAVING)
4717 {
4718 // Check if others have receievd leave message or declared
4719 // as unoperational before shifting to closed.
4720 MessageNodeList::const_iterator mn_i(msg.node_list().find(uuid()));
4721 if (mn_i != msg.node_list().end())
4722 {
4723 const MessageNode& mn(MessageNodeList::value(mn_i));
4724 if (mn.operational() == false || mn.leaving() == true)
4725 {
4726 gu_trace(shift_to(S_CLOSED));
4727 }
4728 }
4729 return;
4730 }
4731 else if (state() == S_OPERATIONAL)
4732 {
4733 // Drop install messages in operational state.
4734 evs_log_debug(D_INSTALL_MSGS)
4735 << "dropping install message in already installed view";
4736 return;
4737 }
4738 else if (inst.operational() == false)
4739 {
4740 // Message source is not seen as operational, must not accept
4741 // anything from it.
4742 evs_log_debug(D_INSTALL_MSGS)
4743 << "install message source " << msg.source()
4744 << " is not operational, discarding message";
4745 return;
4746 }
4747 else if (is_msg_from_previous_view(msg) == true)
4748 {
4749 // Delayed install message
4750 evs_log_debug(D_FOREIGN_MSGS)
4751 << " dropping install message from previous view";
4752 return;
4753 }
4754 else if (install_message_ != 0)
4755 {
4756 if (msg.source() == install_message_->source() &&
4757 msg.install_view_id().seq() > install_message_->install_view_id().seq())
4758 {
4759 // Representative regenerated install message
4760 evs_log_debug(D_INSTALL_MSGS)
4761 << "regenerated install message";
4762 setall_committed(false);
4763 setall_installed(false);
4764 delete install_message_;
4765 install_message_ = 0;
4766 // Fall through to process new install message
4767 }
4768 else if (msg.source() == install_message_->source())
4769 {
4770 // Duplicate or delayed install message
4771 evs_log_debug(D_INSTALL_MSGS)
4772 << "duplicate or delayed install message";
4773 return;
4774 }
4775 else
4776 {
4777 MessageNodeList::const_iterator self(msg.node_list().find(uuid()));
4778 if (msg.node_list().end() == self ||
4779 MessageNodeList::value(self).operational() == false)
4780 {
4781 evs_log_debug(D_INSTALL_MSGS)
4782 << "dropping install message, processing node not in "
4783 << "new view";
4784 }
4785 else
4786 {
4787 // Two nodes decided to generate install message simultaneously,
4788 // shift to gather to combine groups in install messages.
4789 log_warn << self_string()
4790 << " shift to GATHER due to conflicting install "
4791 << "messages";
4792 gu_trace(shift_to(S_GATHER));
4793 }
4794 return;
4795 }
4796 }
4797 else if (inst.installed() == true)
4798 {
4799 log_warn << self_string()
4800 << " shift to GATHER due to inconsistent state";
4801 gu_trace(shift_to(S_GATHER));
4802 return;
4803 }
4804
4805 // Construct join from install message so that the most recent
4806 // information from representative is updated to local state.
4807 if (msg.source() != uuid())
4808 {
4809 const MessageNode& mn(
4810 MessageNodeList::value(
4811 msg.node_list().find_checked(msg.source())));
4812 JoinMessage jm(msg.version(),
4813 msg.source(),
4814 mn.view_id(),
4815 msg.seq(),
4816 msg.aru_seq(),
4817 msg.fifo_seq(),
4818 msg.node_list());
4819 handle_join(jm, ii);
4820 }
4821
4822 // Drop install message if processing node won't be part of the
4823 // view to be installed.
4824 // Don't set nodes that are forming another view inactive here,
4825 // they should enter new view shortly after install message
4826 // delivery and should be ready to restart GATHER round.
4827 MessageNodeList::const_iterator self(msg.node_list().find(uuid()));
4828 if (msg.node_list().end() == self ||
4829 MessageNodeList::value(self).operational() == false)
4830 {
4831 evs_log_debug(D_INSTALL_MSGS)
4832 << "dropping install message, processing node not in new view";
4833 return;
4834 }
4835
4836 // Proceed to install phase
4837 assert(install_message_ == 0);
4838
4839 // Run through known nodes and remove each entry that is
4840 // not member of current view or present in install message.
4841 // This is to prevent inconsistent view of group when first message(s)
4842 // from new node are received after install message on representative
4843 // and before install message on other nodes.
4844 bool changed(false);
4845 NodeMap::iterator i, i_next;
4846 for (NodeMap::iterator i(known_.begin()); i != known_.end(); i = i_next)
4847 {
4848 i_next = i, ++i_next;
4849 const UUID& uuid(NodeMap::key(i));
4850 if (msg.node_list().find(uuid) == msg.node_list().end() &&
4851 current_view_.members().find(uuid) == current_view_.members().end())
4852 {
4853 log_info << self_string() << " temporarily discarding known "
4854 << uuid << " due to received install message";
4855 known_.erase(i);
4856 changed = true;
4857 }
4858 }
4859
4860 // Recreate join message to match current state, otherwise is_consistent()
4861 // below will fail.
4862 if (changed == true)
4863 {
4864 (void)create_join();
4865 }
4866
4867 // See if install message is consistent with local state.
4868 // Is_consistent() checks only local state and local join
4869 // message in case other nodes have already been seen and reported
4870 // nodes that will not be in the next view.
4871 if (consensus_.is_consistent(msg) == true)
4872 {
4873 inst.set_tstamp(gu::datetime::Date::monotonic());
4874 install_message_ = new InstallMessage(msg);
4875 assert(install_message_->source() != UUID::nil());
4876 assert(install_message_->flags() != 0);
4877 // Send commit gap
4878 gu_trace(send_gap(EVS_CALLER, UUID::nil(), install_message_->install_view_id(),
4879 Range(), true));
4880 }
4881 else
4882 {
4883 evs_log_debug(D_INSTALL_MSGS)
4884 << "install message " << msg
4885 << " not consistent with state " << *this;
4886 gu_trace(shift_to(S_GATHER, true));
4887 }
4888 }
4889
4890
handle_delayed_list(const DelayedListMessage & msg,NodeMap::iterator ii)4891 void gcomm::evs::Proto::handle_delayed_list(const DelayedListMessage& msg,
4892 NodeMap::iterator ii)
4893 {
4894 if (auto_evict_ == 0)
4895 {
4896 // Ignore evict list messages if auto_evict_ is disabled.
4897 return;
4898 }
4899
4900 Node& node(NodeMap::value(ii));
4901 node.set_delayed_list_message(&msg);
4902 gu::datetime::Date now(gu::datetime::Date::monotonic());
4903
4904 // Construct a list of evict candidates that appear in evict list messages
4905 // with cnt greater than local auto_evict_. If evict candidate is reported
4906 // by majority of the current group, evict process is triggered.
4907
4908 // UUID -> over auto_evict_, total count
4909 typedef std::map<UUID, std::pair<size_t, size_t> > Evicts;
4910 Evicts evicts;
4911 bool found(false);
4912
4913 for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
4914 {
4915 const DelayedListMessage* const dlm(
4916 NodeMap::value(i).delayed_list_message());
4917 if (dlm == 0)
4918 {
4919 continue;
4920 }
4921 else if (dlm->delayed_list().find(uuid()) != dlm->delayed_list().end())
4922 {
4923 evs_log_debug(D_STATE)
4924 << "found self " << uuid() << " from evict list from "
4925 << msg.source() << " at " << get_address(msg.source());
4926 continue;
4927 }
4928 else if (dlm->tstamp() + delayed_keep_period_ < now)
4929 {
4930 evs_log_debug(D_STATE) << "ignoring expired evict message";
4931 continue;
4932 }
4933
4934 for (DelayedListMessage::DelayedList::const_iterator
4935 dlm_i(dlm->delayed_list().begin());
4936 dlm_i != dlm->delayed_list().end();
4937 ++dlm_i)
4938 {
4939 if (dlm_i->second <= 1)
4940 {
4941 // Don't consider entries with single delayed event as
4942 // evict candidates.
4943 continue;
4944 }
4945
4946 std::pair<Evicts::iterator, bool> eir(
4947 evicts.insert(
4948 std::make_pair(
4949 dlm_i->first, std::make_pair(0, 0))));
4950 evs_log_debug(D_STATE) << "eir " << eir.first->first
4951 << " " << eir.first->second.first
4952 << " " << eir.first->second.second;
4953 ++eir.first->second.second; // total count
4954 if (dlm_i->second >= auto_evict_)
4955 {
4956 ++eir.first->second.first; // over threshold count
4957 found = true;
4958 }
4959 }
4960 }
4961
4962 // Evict candidates that have reached threshold count
4963 for (Evicts::const_iterator i(evicts.begin());
4964 found == true && i != evicts.end(); ++i)
4965 {
4966 if (is_evicted(i->first) == true)
4967 {
4968 // Already evicted, avoid spamming
4969 continue;
4970 }
4971 evs_log_info(I_STATE) << "evict candidate "
4972 << i->first << " " << i->second.first
4973 << " " << i->second.second;
4974 // If the candidate is in the current view, require majority
4975 // of the view to agree. If the candidate is not in the current
4976 // view, require majority of known nodes to agree. Ability to
4977 // evict nodes outside of the group (even while in non-PC) is
4978 // needed to stabilize cluster also in the case that nodes
4979 // have already partitioned.
4980
4981 // TODO: Record stable views from PC and use weights from there
4982 // accordingly (need to be added to view).
4983 if (i->second.first != 0 &&
4984 ((current_view_.is_member(i->first) &&
4985 i->second.second > current_view_.members().size()/2) ||
4986 i->second.second > known_.size()/2))
4987 {
4988 log_warn << "evicting member " << i->first
4989 << " at " << get_address(i->first)
4990 << " permanently from group";
4991 evict(i->first);
4992 if (state() == S_OPERATIONAL)
4993 {
4994 shift_to(S_GATHER, true);
4995 }
4996 }
4997 }
4998 }
4999