1 /*
2  * Copyright (C) 2009-2019 Codership Oy <info@codership.com>
3  */
4 
5 #include "evs_proto.hpp"
6 #include "evs_message2.hpp"
7 #include "evs_input_map2.hpp"
8 
9 #include "gcomm/transport.hpp"
10 #include "gcomm/conf.hpp"
11 #include "gcomm/util.hpp"
12 
13 #include "defaults.hpp"
14 
15 #include <cmath>
16 
17 #include <stdexcept>
18 #include <algorithm>
19 #include <numeric>
20 #include <iterator>
21 #include <set>
22 
23 using namespace std::rel_ops;
24 
25 // Convenience macros for debug and info logging
26 #define evs_log_debug(__mask__)             \
27     if ((debug_mask_ & (__mask__)) == 0) { } \
28     else log_debug << self_string() << ": "
29 
30 #define evs_log_info(__mask__)              \
31     if ((info_mask_ & (__mask__)) == 0) { }  \
32     else log_info << self_string() << ": "
33 
34 
Proto(gu::Config & conf,const UUID & my_uuid,SegmentId segment,const gu::URI & uri,const size_t mtu,const View * rst_view)35 gcomm::evs::Proto::Proto(gu::Config&    conf,
36                          const UUID&    my_uuid,
37                          SegmentId      segment,
38                          const gu::URI& uri,
39                          const size_t   mtu,
40                          const View*    rst_view)
41     :
42     Protolay(conf),
43     timers_(),
44     version_(check_range(Conf::EvsVersion,
45                          param<int>(conf, uri, Conf::EvsVersion, Defaults::EvsVersion),
46                          0, GCOMM_PROTOCOL_MAX_VERSION + 1)),
47     debug_mask_(param<int>(conf, uri, Conf::EvsDebugLogMask, "0x1", std::hex)),
48     info_mask_(param<int>(conf, uri, Conf::EvsInfoLogMask, "0x0", std::hex)),
49     last_stats_report_(gu::datetime::Date::monotonic()),
50     collect_stats_(true),
51     hs_agreed_("0.0,0.0001,0.00031623,0.001,0.0031623,0.01,0.031623,0.1,0.31623,1.,3.1623,10.,31.623"),
52     hs_safe_("0.0,0.0001,0.00031623,0.001,0.0031623,0.01,0.031623,0.1,0.31623,1.,3.1623,10.,31.623"),
53     hs_local_causal_("0.0,0.0001,0.00031623,0.001,0.0031623,0.01,0.031623,0.1,0.31623,1.,3.1623,10.,31.623"),
54     safe_deliv_latency_(),
55     send_queue_s_(0),
56     n_send_queue_s_(0),
57     sent_msgs_(Message::num_message_types, 0),
58     retrans_msgs_(0),
59     recovered_msgs_(0),
60     recvd_msgs_(Message::num_message_types, 0),
61     delivered_msgs_(O_LOCAL_CAUSAL + 1),
62     delivering_(false),
63     my_uuid_(my_uuid),
64     segment_(segment),
65     known_(),
66     self_i_(),
67     view_forget_timeout_(
68         check_range(Conf::EvsViewForgetTimeout,
69                     param<gu::datetime::Period>(
70                         conf, uri, Conf::EvsViewForgetTimeout,
71                         Defaults::EvsViewForgetTimeout),
72                     gu::from_string<gu::datetime::Period>(
73                         Defaults::EvsViewForgetTimeoutMin),
74                     gu::datetime::Period::max())),
75     inactive_timeout_(
76         check_range(Conf::EvsInactiveTimeout,
77                     param<gu::datetime::Period>(
78                         conf, uri, Conf::EvsInactiveTimeout,
79                         Defaults::EvsInactiveTimeout),
80                     gu::from_string<gu::datetime::Period>(
81                         Defaults::EvsInactiveTimeoutMin),
82                     gu::datetime::Period::max())),
83     suspect_timeout_(
84         check_range(Conf::EvsSuspectTimeout,
85                     param<gu::datetime::Period>(
86                         conf, uri, Conf::EvsSuspectTimeout,
87                         Defaults::EvsSuspectTimeout),
88                     gu::from_string<gu::datetime::Period>(
89                         Defaults::EvsSuspectTimeoutMin),
90                     gu::datetime::Period::max())),
91     inactive_check_period_(
92         check_range(Conf::EvsInactiveCheckPeriod,
93                     param<gu::datetime::Period>(
94                         conf, uri, Conf::EvsInactiveCheckPeriod,
95                         Defaults::EvsInactiveCheckPeriod),
96                     gu::datetime::Period::min(),
97                     suspect_timeout_/2 + 1)),
98     retrans_period_(
99         check_range(Conf::EvsKeepalivePeriod,
100                     param<gu::datetime::Period>(
101                         conf, uri, Conf::EvsKeepalivePeriod,
102                         Defaults::EvsRetransPeriod),
103                     gu::from_string<gu::datetime::Period>(
104                         Defaults::EvsRetransPeriodMin),
105                     suspect_timeout_/3 + 1)),
106     install_timeout_(
107         check_range(Conf::EvsInstallTimeout,
108                     param<gu::datetime::Period>(
109                         conf, uri, Conf::EvsInstallTimeout,
110                         gu::to_string(inactive_timeout_/2)),
111                     retrans_period_, inactive_timeout_ + 1)),
112     join_retrans_period_(
113         check_range(Conf::EvsJoinRetransPeriod,
114                     param<gu::datetime::Period>(
115                         conf, uri, Conf::EvsJoinRetransPeriod,
116                         Defaults::EvsJoinRetransPeriod),
117                     gu::from_string<gu::datetime::Period>(
118                         Defaults::EvsRetransPeriodMin),
119                     gu::datetime::Period::max())),
120     stats_report_period_(
121         check_range(Conf::EvsStatsReportPeriod,
122                     param<gu::datetime::Period>(
123                         conf, uri, Conf::EvsStatsReportPeriod,
124                         Defaults::EvsStatsReportPeriod),
125                     gu::from_string<gu::datetime::Period>(
126                         Defaults::EvsStatsReportPeriodMin),
127                     gu::datetime::Period::max())),
128     causal_keepalive_period_(retrans_period_),
129     delay_margin_(param<gu::datetime::Period>(
130                       conf, uri, Conf::EvsDelayMargin,
131                       Defaults::EvsDelayMargin)),
132     delayed_keep_period_(param<gu::datetime::Period>(
133                              conf, uri, Conf::EvsDelayedKeepPeriod,
134                              Defaults::EvsDelayedKeepPeriod)),
135     last_inactive_check_   (gu::datetime::Date::monotonic()),
136     last_causal_keepalive_ (gu::datetime::Date::monotonic()),
137     current_view_(0, ViewId(V_TRANS, my_uuid,
138                          rst_view ? rst_view -> id().seq() + 1 : 0)),
139     previous_view_(),
140     previous_views_(),
141     gather_views_(),
142     input_map_(new InputMap()),
143     causal_queue_(),
144     consensus_(*this, known_, *input_map_, current_view_),
145     last_sent_join_tstamp_(),
146     install_message_(0),
147     max_view_id_seq_(0),
148     attempt_seq_(1),
149     new_view_logged_(false),
150     max_install_timeouts_(
151         check_range(Conf::EvsMaxInstallTimeouts,
152                     param<int>(conf, uri, Conf::EvsMaxInstallTimeouts,
153                                Defaults::EvsMaxInstallTimeouts),
154                     0, std::numeric_limits<int>::max())),
155     install_timeout_count_(0),
156     fifo_seq_(-1),
157     last_sent_(-1),
158     send_window_(
159         check_range(Conf::EvsSendWindow,
160                     param<seqno_t>(conf, uri, Conf::EvsSendWindow,
161                                    Defaults::EvsSendWindow),
162                     gu::from_string<seqno_t>(Defaults::EvsSendWindowMin),
163                     std::numeric_limits<seqno_t>::max())),
164     user_send_window_(
165         check_range(Conf::EvsUserSendWindow,
166                     param<seqno_t>(conf, uri, Conf::EvsUserSendWindow,
167                                    Defaults::EvsUserSendWindow),
168                     gu::from_string<seqno_t>(Defaults::EvsUserSendWindowMin),
169                     send_window_ + 1)),
170     bytes_since_request_user_msg_feedback_(),
171     output_(),
172     send_buf_(),
173     max_output_size_(128),
174     mtu_(mtu),
175     use_aggregate_(param<bool>(conf, uri, Conf::EvsUseAggregate, "true")),
176     self_loopback_(false),
177     state_(S_CLOSED),
178     shift_to_rfcnt_(0),
179     pending_leave_(false),
180     isolation_end_(gu::datetime::Date::zero()),
181     delayed_list_(),
182     auto_evict_(param<size_t>(conf, uri, Conf::EvsAutoEvict,
183                               Defaults::EvsAutoEvict))
184 {
185     log_info << "EVS version " << version_;
186 
187     conf.set(Conf::EvsVersion, gu::to_string(version_));
188     conf.set(Conf::EvsViewForgetTimeout, gu::to_string(view_forget_timeout_));
189     conf.set(Conf::EvsSuspectTimeout, gu::to_string(suspect_timeout_));
190     conf.set(Conf::EvsInactiveTimeout, gu::to_string(inactive_timeout_));
191     conf.set(Conf::EvsKeepalivePeriod, gu::to_string(retrans_period_));
192     conf.set(Conf::EvsInactiveCheckPeriod,
193              gu::to_string(inactive_check_period_));
194     conf.set(Conf::EvsJoinRetransPeriod, gu::to_string(join_retrans_period_));
195     conf.set(Conf::EvsInstallTimeout, gu::to_string(install_timeout_));
196     conf.set(Conf::EvsStatsReportPeriod, gu::to_string(stats_report_period_));
197     conf.set(Conf::EvsCausalKeepalivePeriod,
198              gu::to_string(causal_keepalive_period_));
199     conf.set(Conf::EvsSendWindow, gu::to_string(send_window_));
200     conf.set(Conf::EvsUserSendWindow, gu::to_string(user_send_window_));
201     conf.set(Conf::EvsUseAggregate, gu::to_string(use_aggregate_));
202     conf.set(Conf::EvsDebugLogMask, gu::to_string(debug_mask_, std::hex));
203     conf.set(Conf::EvsInfoLogMask, gu::to_string(info_mask_, std::hex));
204     conf.set(Conf::EvsMaxInstallTimeouts, gu::to_string(max_install_timeouts_));
205     conf.set(Conf::EvsDelayMargin, gu::to_string(delay_margin_));
206     conf.set(Conf::EvsDelayedKeepPeriod, gu::to_string(delayed_keep_period_));
207     conf.set(Conf::EvsAutoEvict, gu::to_string(auto_evict_));
208     //
209 
210     known_.insert_unique(
211         std::make_pair(my_uuid_, Node(*this)));
212     self_i_ = known_.begin();
213     assert(NodeMap::value(self_i_).operational() == true);
214 
215     NodeMap::value(self_i_).set_index(0);
216     input_map_->reset(1);
217     current_view_.add_member(my_uuid_, segment_);
218     // we don't need to store previous views, do we ?
219     if (rst_view) {
220         previous_view_ = *rst_view;
221         previous_views_.insert(
222             std::make_pair(rst_view -> id(), gu::datetime::Date::monotonic()));
223     }
224     if (mtu_ != std::numeric_limits<size_t>::max())
225     {
226         send_buf_.reserve(mtu_);
227     }
228 }
229 
230 
~Proto()231 gcomm::evs::Proto::~Proto()
232 {
233     output_.clear();
234     delete install_message_;
235     delete input_map_;
236 }
237 
238 
239 bool
set_param(const std::string & key,const std::string & val,Protolay::sync_param_cb_t & sync_param_cb)240 gcomm::evs::Proto::set_param(const std::string& key, const std::string& val,
241                             Protolay::sync_param_cb_t& sync_param_cb)
242 {
243     if (key == gcomm::Conf::EvsVersion)
244     {
245         version_ = check_range(Conf::EvsVersion,
246                                gu::from_string<int>(val),
247                                0, GCOMM_PROTOCOL_MAX_VERSION + 1);
248         conf_.set(Conf::EvsVersion, gu::to_string(version_));
249         // trigger configuration change to propagate version
250         shift_to(S_GATHER, true);
251         return true;
252     }
253     else if (key == gcomm::Conf::EvsSendWindow)
254     {
255         send_window_ = check_range(Conf::EvsSendWindow,
256                                    gu::from_string<seqno_t>(val),
257                                    user_send_window_,
258                                    std::numeric_limits<seqno_t>::max());
259         conf_.set(Conf::EvsSendWindow, gu::to_string(send_window_));
260         return true;
261     }
262     else if (key == gcomm::Conf::EvsUserSendWindow)
263     {
264         user_send_window_ = check_range(
265             Conf::EvsUserSendWindow,
266             gu::from_string<seqno_t>(val),
267             gu::from_string<seqno_t>(Defaults::EvsUserSendWindowMin),
268             send_window_ + 1);
269         conf_.set(Conf::EvsUserSendWindow, gu::to_string(user_send_window_));
270         return true;
271     }
272     else if (key == gcomm::Conf::EvsMaxInstallTimeouts)
273     {
274         max_install_timeouts_ = check_range(
275             Conf::EvsMaxInstallTimeouts,
276             gu::from_string<int>(val),
277             0, std::numeric_limits<int>::max());
278         conf_.set(Conf::EvsMaxInstallTimeouts, gu::to_string(max_install_timeouts_));
279         return true;
280     }
281     else if (key == Conf::EvsStatsReportPeriod)
282     {
283         stats_report_period_ = check_range(
284             Conf::EvsStatsReportPeriod,
285             gu::from_string<gu::datetime::Period>(val),
286             gu::from_string<gu::datetime::Period>(Defaults::EvsStatsReportPeriodMin),
287             gu::datetime::Period::max());
288         conf_.set(Conf::EvsStatsReportPeriod, gu::to_string(stats_report_period_));
289         reset_timer(T_STATS);
290         return true;
291     }
292     else if (key == Conf::EvsInfoLogMask)
293     {
294         info_mask_ = gu::from_string<int>(val, std::hex);
295         conf_.set(Conf::EvsInfoLogMask, gu::to_string<int>(info_mask_, std::hex));
296         return true;
297     }
298     else if (key == Conf::EvsDebugLogMask)
299     {
300         debug_mask_ = gu::from_string<int>(val, std::hex);
301         conf_.set(Conf::EvsDebugLogMask, gu::to_string<int>(debug_mask_, std::hex));
302         return true;
303     }
304     else if (key == Conf::EvsSuspectTimeout)
305     {
306         suspect_timeout_ = check_range(
307             Conf::EvsSuspectTimeout,
308             gu::from_string<gu::datetime::Period>(val),
309             gu::from_string<gu::datetime::Period>(Defaults::EvsSuspectTimeoutMin),
310             gu::datetime::Period::max());
311         conf_.set(Conf::EvsSuspectTimeout, gu::to_string(suspect_timeout_));
312         reset_timer(T_INACTIVITY);
313         return true;
314     }
315     else if (key == Conf::EvsInactiveTimeout)
316     {
317         inactive_timeout_ = check_range(
318             Conf::EvsInactiveTimeout,
319             gu::from_string<gu::datetime::Period>(val),
320             gu::from_string<gu::datetime::Period>(Defaults::EvsInactiveTimeoutMin),
321             gu::datetime::Period::max());
322         conf_.set(Conf::EvsInactiveTimeout, gu::to_string(inactive_timeout_));
323         reset_timer(T_INACTIVITY);
324         return true;
325     }
326     else if (key == Conf::EvsKeepalivePeriod)
327     {
328         retrans_period_ = check_range(
329             Conf::EvsKeepalivePeriod,
330             gu::from_string<gu::datetime::Period>(val),
331             gu::from_string<gu::datetime::Period>(Defaults::EvsRetransPeriodMin),
332             gu::datetime::Period::max());
333         conf_.set(Conf::EvsKeepalivePeriod, gu::to_string(retrans_period_));
334         reset_timer(T_RETRANS);
335         return true;
336     }
337     else if (key == Conf::EvsCausalKeepalivePeriod)
338     {
339         causal_keepalive_period_ = check_range(
340             Conf::EvsCausalKeepalivePeriod,
341             gu::from_string<gu::datetime::Period>(val),
342             gu::datetime::Period(0),
343             gu::datetime::Period::max());
344         conf_.set(Conf::EvsCausalKeepalivePeriod,
345                   gu::to_string(causal_keepalive_period_));
346         // no timer reset here, causal keepalives don't rely on timer
347         return true;
348     }
349     else if (key == Conf::EvsJoinRetransPeriod)
350     {
351         join_retrans_period_ = check_range(
352             Conf::EvsJoinRetransPeriod,
353             gu::from_string<gu::datetime::Period>(val),
354             gu::from_string<gu::datetime::Period>(Defaults::EvsRetransPeriodMin),
355             gu::datetime::Period::max());
356         conf_.set(Conf::EvsJoinRetransPeriod, gu::to_string(join_retrans_period_));
357         reset_timer(T_RETRANS);
358         return true;
359     }
360     else if (key == Conf::EvsInstallTimeout)
361     {
362         install_timeout_ = check_range(
363             Conf::EvsInstallTimeout,
364             gu::from_string<gu::datetime::Period>(val),
365             retrans_period_*2, inactive_timeout_ + 1);
366         conf_.set(Conf::EvsInstallTimeout, gu::to_string(install_timeout_));
367         reset_timer(T_INSTALL);
368         return true;
369     }
370     else if (key == Conf::EvsUseAggregate)
371     {
372         use_aggregate_ = gu::from_string<bool>(val);
373         conf_.set(Conf::EvsUseAggregate, gu::to_string(use_aggregate_));
374         return true;
375     }
376     else if (key == Conf::EvsDelayMargin)
377     {
378         delay_margin_ = gu::from_string<gu::datetime::Period>(val);
379         conf_.set(Conf::EvsDelayMargin, gu::to_string(delay_margin_));
380         return true;
381     }
382     else if (key == Conf::EvsDelayedKeepPeriod)
383     {
384         delayed_keep_period_ = gu::from_string<gu::datetime::Period>(val);
385         conf_.set(Conf::EvsDelayedKeepPeriod,
386                   gu::to_string(delayed_keep_period_));
387         return true;
388     }
389     else if (key == Conf::EvsEvict)
390     {
391         if (val.size())
392         {
393             UUID uuid;
394             std::istringstream is(val);
395             is >> uuid;
396             log_info << "Evicting node " << uuid << " permanently from cluster";
397             evict(uuid);
398             if (state() == S_OPERATIONAL && current_view_.is_member(uuid) == true)
399             {
400                 shift_to(S_GATHER, true);
401             }
402         }
403         else
404         {
405             Protolay::EvictList::const_iterator i, i_next;
406             for (i = evict_list().begin(); i != evict_list().end(); i = i_next)
407             {
408                 i_next = i, ++i_next;
409                 log_info << "unevicting " << Protolay::EvictList::key(i);
410                 unevict(Protolay::EvictList::key(i));
411             }
412         }
413         return true;
414     }
415     else if (key == Conf::EvsAutoEvict)
416     {
417         auto_evict_ = gu::from_string<size_t>(val);
418         conf_.set(Conf::EvsAutoEvict, gu::to_string(auto_evict_));
419         return true;
420     }
421     else if (key == Conf::EvsViewForgetTimeout ||
422              key == Conf::EvsInactiveCheckPeriod)
423     {
424         gu_throw_error(EPERM) << "can't change value for '"
425                               << key << "' during runtime";
426     }
427     return false;
428 }
429 
430 
handle_get_status(gu::Status & status) const431 void gcomm::evs::Proto::handle_get_status(gu::Status& status) const
432 {
433     status.insert("evs_state", to_string(state_));
434     status.insert("evs_repl_latency", safe_deliv_latency_.to_string());
435     std::string delayed_list_str;
436     for (DelayedList::const_iterator i(delayed_list_.begin());
437          i != delayed_list_.end(); ++i)
438     {
439         if (is_evicted(i->first)              == false ||
440             current_view_.is_member(i->first) == true)
441         {
442             delayed_list_str += i->first.full_str()
443                 + ":"
444                 + i->second.addr()
445                 + ":"
446                 + gu::to_string(i->second.state_change_cnt());
447             delayed_list_str += ",";
448         }
449     }
450     // Strip trailing comma
451     if (delayed_list_str.empty() == false)
452     {
453         delayed_list_str.resize(delayed_list_str.size() - 1);
454     }
455     status.insert("evs_delayed", delayed_list_str);
456 
457     std::string evict_list_str;
458     for (Protolay::EvictList::const_iterator i(evict_list().begin());
459          i != evict_list().end(); )
460     {
461         evict_list_str += EvictList::key(i).full_str();
462         if (++i != evict_list().end()) evict_list_str += ",";
463     }
464     status.insert("evs_evict_list", evict_list_str);
465 
466     if (info_mask_ & I_STATISTICS)
467     {
468         status.insert("evs_safe_hs", hs_safe_.to_string());
469         status.insert("evs_causal_hs", hs_local_causal_.to_string());
470         status.insert("evs_outq_avg",
471                       gu::to_string(std::fabs(double(send_queue_s_)/
472                                               double(n_send_queue_s_))));
473         status.insert("evs_sent_user",
474                       gu::to_string(sent_msgs_[Message::EVS_T_USER]));
475         status.insert("evs_sent_delegate",
476                       gu::to_string(sent_msgs_[Message::EVS_T_DELEGATE]));
477         status.insert("evs_sent_gap",
478                       gu::to_string(sent_msgs_[Message::EVS_T_GAP]));
479         status.insert("evs_sent_join",
480                       gu::to_string(sent_msgs_[Message::EVS_T_JOIN]));
481         status.insert("evs_sent_install",
482                       gu::to_string(sent_msgs_[Message::EVS_T_INSTALL]));
483         status.insert("evs_sent_leave",
484                       gu::to_string(sent_msgs_[Message::EVS_T_LEAVE]));
485         status.insert("evs_retransmitted", gu::to_string(retrans_msgs_));
486         status.insert("evs_recovered", gu::to_string(recovered_msgs_));
487         status.insert("evs_deliv_safe",
488                       gu::to_string(delivered_msgs_[O_SAFE]));
489     }
490 }
491 
492 
operator <<(std::ostream & os,const Proto & p)493 std::ostream& gcomm::evs::operator<<(std::ostream& os, const Proto& p)
494 {
495     os << "evs::proto("
496        << p.self_string() << ", "
497        << p.to_string(p.state()) << ") {\n";
498     os << "current_view=" << p.current_view_ << ",\n";
499     os << "input_map=" << *p.input_map_ << ",\n";
500     os << "fifo_seq=" << p.fifo_seq_ << ",\n";
501     os << "last_sent=" << p.last_sent_ << ",\n";
502     os << "known:\n";
503     for (NodeMap::const_iterator i(p.known_.begin()); i != p.known_.end(); ++i)
504     {
505         os << NodeMap::key(i) << " at "
506            << p.get_address(NodeMap::key(i)) << "\n";
507         os << NodeMap::value(i) << "\n";
508     }
509     if (p.install_message_ != 0)
510         os << "install msg=" << *p.install_message_ << "\n";
511     os << " }";
512     return os;
513 }
514 
stats() const515 std::string gcomm::evs::Proto::stats() const
516 {
517     std::ostringstream os;
518     os << "\n\tnodes " << current_view_.members().size();
519     os << "\n\tagreed deliv hist {" << hs_agreed_ << "} ";
520     os << "\n\tsafe deliv hist {" << hs_safe_ << "} ";
521     os << "\n\tcaus deliv hist {" << hs_local_causal_ << "} ";
522     os << "\n\toutq avg " << double(send_queue_s_)/double(n_send_queue_s_);
523     os << "\n\tsent {";
524     std::copy(sent_msgs_.begin(), sent_msgs_.end(),
525          std::ostream_iterator<long long int>(os, ","));
526     os << "}\n\tsent per sec {";
527     const double norm(double(gu::datetime::Date::monotonic().get_utc()
528                              - last_stats_report_.get_utc())/gu::datetime::Sec);
529     std::vector<double> result(7, norm);
530     std::transform(sent_msgs_.begin(), sent_msgs_.end(),
531                    result.begin(), result.begin(), std::divides<double>());
532     std::copy(result.begin(), result.end(),
533               std::ostream_iterator<double>(os, ","));
534     os << "}\n\trecvd { ";
535     std::copy(recvd_msgs_.begin(), recvd_msgs_.end(),
536               std::ostream_iterator<long long int>(os, ","));
537     os << "}\n\trecvd per sec {";
538     std::fill(result.begin(), result.end(), norm);
539     std::transform(recvd_msgs_.begin(), recvd_msgs_.end(),
540                    result.begin(), result.begin(), std::divides<double>());
541     std::copy(result.begin(), result.end(),
542               std::ostream_iterator<double>(os, ","));
543     os << "}\n\tretransmitted " << retrans_msgs_ << " ";
544     os << "\n\trecovered " << recovered_msgs_;
545     os << "\n\tdelivered {";
546     std::copy(delivered_msgs_.begin(), delivered_msgs_.end(),
547               std::ostream_iterator<long long int>(os, ", "));
548     os << "}\n\teff(delivered/sent) " <<
549         double(accumulate(delivered_msgs_.begin() + 1,
550                           delivered_msgs_.begin() + O_SAFE + 1, 0))
551         /double(accumulate(sent_msgs_.begin(), sent_msgs_.end(), 0));
552     return os.str();
553 }
554 
reset_stats()555 void gcomm::evs::Proto::reset_stats()
556 {
557     hs_agreed_.clear();
558     hs_safe_.clear();
559     hs_local_causal_.clear();
560     safe_deliv_latency_.clear();
561     send_queue_s_ = 0;
562     n_send_queue_s_ = 0;
563     last_stats_report_ = gu::datetime::Date::monotonic();
564 }
565 
566 
is_msg_from_previous_view(const Message & msg)567 bool gcomm::evs::Proto::is_msg_from_previous_view(const Message& msg)
568 {
569     ViewList::const_iterator i;
570     if ((i = previous_views_.find(msg.source_view_id()))
571         != previous_views_.end())
572     {
573         evs_log_debug(D_FOREIGN_MSGS) << " message " << msg
574                                       << " from previous view " << i->first;
575         return true;
576     }
577 
578     // If node is in current view, check message source view seq, if it is
579     // smaller than current view seq then the message is also from some
580     // previous (but unknown to us) view
581     NodeList::const_iterator ni(current_view_.members().find(msg.source()));
582     if (ni != current_view_.members().end())
583     {
584         if (msg.source_view_id().seq() <
585             current_view_.id().seq())
586         {
587             log_warn << "stale message from unknown origin " << msg;
588             return true;
589         }
590     }
591 
592     return false;
593 }
594 
595 
handle_inactivity_timer()596 void gcomm::evs::Proto::handle_inactivity_timer()
597 {
598     gu_trace(check_inactive());
599     gu_trace(cleanup_views());
600     gu_trace(cleanup_evicted());
601 }
602 
603 
handle_retrans_timer()604 void gcomm::evs::Proto::handle_retrans_timer()
605 {
606     evs_log_debug(D_TIMERS) << "retrans timer";
607     if (state() == S_GATHER)
608     {
609         if (install_message_ != 0)
610         {
611             // Retransmit install message if representative and all commit
612             // gaps have not been received yet.
613             if (is_all_committed()         == false &&
614                 install_message_->source() == uuid())
615             {
616                 evs_log_debug(D_INSTALL_MSGS) << "retrans install";
617                 gu::Buffer buf;
618                 install_message_->set_flags(
619                     install_message_->flags() | Message::F_RETRANS);
620                 (void)serialize(*install_message_, buf);
621                 Datagram dg(buf);
622                 // Must not be sent as delegate, newly joining node
623                 // will filter them out in handle_msg().
624                 gu_trace(send_down(dg, ProtoDownMeta()));
625             }
626             evs_log_debug(D_GAP_MSGS) << "resend commit gap";
627             // Resend commit gap
628             gu_trace(send_gap(EVS_CALLER, UUID::nil(),
629                               install_message_->install_view_id(),
630                               Range(), true));
631         }
632         else
633         {
634             evs_log_debug(D_JOIN_MSGS) << "retrans join";
635             gu_trace(send_join(true));
636         }
637     }
638     else if (state() == S_INSTALL)
639     {
640         gcomm_assert(install_message_ != 0);
641         gu_trace(send_gap(EVS_CALLER, UUID::nil(),
642                           install_message_->install_view_id(),
643                           Range(), true));
644         gu_trace(send_gap(EVS_CALLER, UUID::nil(),
645                           install_message_->install_view_id(),
646                           Range()));
647     }
648     else if (state() == S_OPERATIONAL)
649     {
650         const seqno_t prev_last_sent(last_sent_);
651         evs_log_debug(D_TIMERS) << "sending keepalive, last_sent=" << last_sent_;
652         Datagram dg;
653         gu_trace((void)send_user(dg, 0xff, O_DROP, -1, -1));
654         if (prev_last_sent == last_sent_)
655         {
656             log_warn << "could not send keepalive";
657         }
658     }
659     else if (state() == S_LEAVING)
660     {
661         evs_log_debug(D_TIMERS) << "send leave timer";
662         send_leave(false);
663         retrans_missing();
664     }
665 }
666 
isolate(gu::datetime::Period period)667 void gcomm::evs::Proto::isolate(gu::datetime::Period period)
668 {
669     isolation_end_ = gu::datetime::Date::monotonic() + period;
670 }
671 
672 
handle_install_timer()673 void gcomm::evs::Proto::handle_install_timer()
674 {
675     gcomm_assert(state() == S_GATHER || state() == S_INSTALL);
676     log_warn << self_string() << " install timer expired";
677 
678     bool is_cons(consensus_.is_consensus());
679     bool is_repr(is_representative(uuid()));
680     evs_log_info(I_STATE) << "before inspection:";
681     evs_log_info(I_STATE) << "consensus: " << is_cons;
682     evs_log_info(I_STATE) << "repr     : " << is_repr;
683     evs_log_info(I_STATE) << "state dump for diagnosis:";
684     std::cerr << *this << std::endl;
685 
686     if (install_timeout_count_ < max_install_timeouts_ )
687     {
688         // before reaching max_install_timeouts, declare only inconsistent
689         // nodes as inactive
690         for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
691         {
692             const UUID& node_uuid(NodeMap::key(i));
693             const Node& node(NodeMap::value(i));
694             if (node_uuid != uuid() &&
695                 (node.join_message() == 0 ||
696                  consensus_.is_consistent(*node.join_message()) == false))
697             {
698                 evs_log_info(I_STATE)
699                     << " setting source " << NodeMap::key(i)
700                     << " as inactive due to expired install timer";
701                 set_inactive(NodeMap::key(i));
702             }
703         }
704     }
705     else if (install_timeout_count_ == max_install_timeouts_)
706     {
707         // max install timeouts reached, declare all other nodes
708         // as inactive
709         for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
710         {
711             if (NodeMap::key(i) != uuid())
712             {
713                 evs_log_info(I_STATE)
714                     << " setting source " << NodeMap::key(i)
715                     << " as inactive due to expired install timer";
716                 set_inactive(NodeMap::key(i));
717             }
718         }
719         log_info << "max install timeouts reached, will isolate node "
720                  << "for " << suspect_timeout_ + inactive_timeout_;
721         isolate(suspect_timeout_ + inactive_timeout_);
722     }
723     else if (install_timeout_count_ > max_install_timeouts_)
724     {
725         log_info << "going to give up, state dump for diagnosis:";
726         std::cerr << *this << std::endl;
727         gu_throw_fatal << self_string()
728                        << " failed to form singleton view after exceeding "
729                        << "max_install_timeouts " << max_install_timeouts_
730                        << ", giving up";
731     }
732 
733 
734     if (install_message_ != 0)
735     {
736         for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
737         {
738             if (NodeMap::value(i).committed() == false)
739             {
740                 log_info << self_string() << " node " << NodeMap::key(i)
741                          << " failed to commit for install message, "
742                          << "declaring inactive";
743                 if (NodeMap::key(i) != uuid())
744                 {
745                     set_inactive(NodeMap::key(i));
746                 }
747             }
748         }
749     }
750     else
751     {
752         log_info << "no install message received";
753     }
754 
755     shift_to(S_GATHER, true);
756     is_cons = consensus_.is_consensus();
757     is_repr = is_representative(uuid());
758     evs_log_info(I_STATE) << "after inspection:";
759     evs_log_info(I_STATE) << "consensus: " << is_cons;
760     evs_log_info(I_STATE) << "repr     : " << is_repr;
761     if (is_cons == true && is_repr == true)
762     {
763         send_install(EVS_CALLER);
764     }
765     install_timeout_count_++;
766 }
767 
handle_stats_timer()768 void gcomm::evs::Proto::handle_stats_timer()
769 {
770     reset_stats();
771 }
772 
773 
774 
775 class TimerSelectOp
776 {
777 public:
TimerSelectOp(const gcomm::evs::Proto::Timer t_)778     TimerSelectOp(const gcomm::evs::Proto::Timer t_) : t(t_) { }
operator ()(const gcomm::evs::Proto::TimerList::value_type & vt) const779     bool operator()(const gcomm::evs::Proto::TimerList::value_type& vt) const
780     {
781         return (gcomm::evs::Proto::TimerList::value(vt) == t);
782     }
783 private:
784     gcomm::evs::Proto::Timer const t;
785 };
786 
787 
next_expiration(const Timer t) const788 gu::datetime::Date gcomm::evs::Proto::next_expiration(const Timer t) const
789 {
790     gcomm_assert(state() != S_CLOSED);
791     gu::datetime::Date now(gu::datetime::Date::monotonic());
792     switch (t)
793     {
794     case T_INACTIVITY:
795         return (now + inactive_check_period_);
796     case T_RETRANS:
797         switch (state())
798         {
799         case S_OPERATIONAL:
800         case S_LEAVING:
801             return (now + retrans_period_);
802         case S_GATHER:
803         case S_INSTALL:
804             return (now + join_retrans_period_);
805         default:
806             gu_throw_fatal;
807         }
808     case T_INSTALL:
809 
810         switch (state())
811         {
812         case S_GATHER:
813         case S_INSTALL:
814             return (now + install_timeout_);
815         default:
816             return gu::datetime::Date::max();
817         }
818     case T_STATS:
819         return (now + stats_report_period_);
820     }
821     gu_throw_fatal;
822 }
823 
824 
timer_list_erase_by_type(gcomm::evs::Proto::TimerList & timer_list,gcomm::evs::Proto::Timer timer)825 void timer_list_erase_by_type(gcomm::evs::Proto::TimerList& timer_list,
826                               gcomm::evs::Proto::Timer timer)
827 {
828     gcomm::evs::Proto::TimerList::iterator i, i_next;
829     for (i = timer_list.begin(); i != timer_list.end(); i = i_next)
830     {
831         i_next = i, ++i_next;
832         if (gcomm::evs::Proto::TimerList::value(i) == timer)
833         {
834             timer_list.erase(i);
835         }
836     }
837 }
838 
reset_timer(Timer t)839 void gcomm::evs::Proto::reset_timer(Timer t)
840 {
841     timer_list_erase_by_type(timers_, t);
842     timers_.insert(std::make_pair(next_expiration(t), t));
843 }
844 
cancel_timer(Timer t)845 void gcomm::evs::Proto::cancel_timer(Timer t)
846 {
847     timer_list_erase_by_type(timers_, t);
848 }
849 
handle_timers()850 gu::datetime::Date gcomm::evs::Proto::handle_timers()
851 {
852     gu::datetime::Date now(gu::datetime::Date::monotonic());
853 
854     while (timers_.empty() == false &&
855            TimerList::key(timers_.begin()) <= now)
856     {
857         Timer t(TimerList::value(timers_.begin()));
858         timers_.erase(timers_.begin());
859         switch (t)
860         {
861         case T_INACTIVITY:
862             handle_inactivity_timer();
863             break;
864         case T_RETRANS:
865             handle_retrans_timer();
866             break;
867         case T_INSTALL:
868             handle_install_timer();
869             break;
870         case T_STATS:
871             handle_stats_timer();
872             break;
873         }
874         if (state() == S_CLOSED)
875         {
876             return gu::datetime::Date::max();
877         }
878         reset_timer(t);
879     }
880 
881     if (timers_.empty() == true)
882     {
883         evs_log_debug(D_TIMERS) << "no timers set";
884         return gu::datetime::Date::max();
885     }
886     return TimerList::key(timers_.begin());
887 }
888 
889 
check_inactive()890 void gcomm::evs::Proto::check_inactive()
891 {
892     const gu::datetime::Date now(gu::datetime::Date::monotonic());
893     if (last_inactive_check_ + inactive_check_period_*3 < now)
894     {
895         log_warn << "last inactive check more than " << inactive_check_period_*3
896                  << " ago (" << (now - last_inactive_check_)
897                  << "), skipping check";
898         last_inactive_check_ = now;
899         return;
900     }
901 
902     NodeMap::value(self_i_).set_tstamp(gu::datetime::Date::monotonic());
903     std::for_each(known_.begin(), known_.end(), InspectNode());
904 
905     bool has_inactive(false);
906     size_t n_suspected(0);
907     bool do_send_delayed_list(false);
908 
909     // Iterate over known nodes and check inactive/suspected/delayed status
910     for (NodeMap::iterator i(known_.begin()); i != known_.end(); ++i)
911     {
912         if (i == self_i_) continue; // No need to check self
913 
914         const UUID& node_uuid(NodeMap::key(i));
915         Node& node(NodeMap::value(i));
916         if (node_uuid                  != uuid()    &&
917             (node.is_inactive()     == true      ||
918              node.is_suspected()    == true           ))
919         {
920             if (node.operational() == true && node.is_inactive() == true)
921             {
922                 log_info << self_string() << " detected inactive node: " << node_uuid;
923             }
924             else if (node.is_suspected() == true && node.is_inactive() == false)
925             {
926                 log_info << self_string() << " suspecting node: " << node_uuid;
927             }
928             if (node.is_inactive() == true)
929             {
930                 set_inactive(node_uuid);
931             }
932             if (node.is_suspected() == true && node.operational() == true)
933             {
934                 ++n_suspected;
935                 if (node.join_message() == 0)
936                 {
937                     log_info << self_string()
938                              << " suspected node without join message, declaring inactive";
939                     set_inactive(node_uuid);
940                 }
941             }
942             has_inactive = true;
943         }
944 
945         DelayedList::iterator dli(delayed_list_.find(node_uuid));
946         if (auto_evict_ &&
947             node.seen_tstamp() + retrans_period_ + delay_margin_ <= now)
948         {
949             if (node.index() != Node::invalid_index)
950             {
951                 // Delayed node in group, check input map state and request
952                 // message recovery if necessary
953                 Range range(input_map_->range(node.index()));
954                 log_info << "delayed node: "
955                          << node_uuid << ", requesting range "
956                          << Range(range.lu(), last_sent_);
957                 if (last_sent_ >= range.lu())
958                 {
959                     // Request missing message range from delayed node.
960                     request_retrans(node_uuid, node_uuid,
961                                     Range(range.lu(), last_sent_));
962                 }
963             }
964 
965             if (dli == delayed_list_.end())
966             {
967                 delayed_list_.insert(
968                     std::make_pair(node_uuid,
969                                    DelayedEntry(get_address(node_uuid))));
970             }
971             else
972             {
973                 dli->second.set_tstamp(now);
974                 dli->second.set_state(DelayedEntry::S_DELAYED,
975                                       delayed_keep_period_, now);
976                 evs_log_debug(D_STATE) << "set '" << dli->first <<
977                         "' delayed state to S_DELAYED , cnt = " <<
978                         dli->second.state_change_cnt();
979                 // todo(dirlt): make threshold as a configurable variable ?
980                 if (dli->second.state_change_cnt() > 0)
981                 {
982                     do_send_delayed_list = true;
983                 }
984             }
985         }
986         else if (dli != delayed_list_.end())
987         {
988             const size_t prev_cnt(dli->second.state_change_cnt());
989             dli->second.set_state(DelayedEntry::S_OK,
990                                   delayed_keep_period_, now);
991             if (prev_cnt != dli->second.state_change_cnt())
992             {
993                 dli->second.set_tstamp(now);
994             }
995             evs_log_debug(D_STATE) << "set '" << dli->first <<
996                     "' delayed state to S_OK. prev_cnt = " << prev_cnt <<
997                     ", cur_cnt = " << dli->second.state_change_cnt();
998             if (dli->second.state_change_cnt() > 0)
999             {
1000                 do_send_delayed_list = true;
1001             }
1002         }
1003     }
1004 
1005     // Clean up delayed list and evict list messages
1006     {
1007         DelayedList::iterator i, i_next;
1008         for (i = delayed_list_.begin(); i != delayed_list_.end(); i = i_next)
1009         {
1010             i_next = i, ++i_next;
1011             // State change count has decayed back to zero
1012             // or node is already evicted and not in the current view
1013             // anymore.
1014             if ((i->second.state_change_cnt() == 0 &&
1015                  i->second.state() == DelayedEntry::S_OK) ||
1016                 (is_evicted(i->first) == true &&
1017                  current_view_.is_member(i->first) == false))
1018             {
1019                 log_debug << "remove '" << i->first << "' from delayed_list";
1020                 delayed_list_.erase(i);
1021             }
1022         }
1023         for (NodeMap::iterator i(known_.begin()); i != known_.end(); ++i)
1024         {
1025             Node& node(NodeMap::value(i));
1026             const DelayedListMessage* const elm(node.delayed_list_message());
1027             if (elm != 0 && elm->tstamp() + delayed_keep_period_ < now)
1028             {
1029                 log_debug << "discarding expired elm from " << elm->source();
1030                 node.set_delayed_list_message(0);
1031             }
1032         }
1033     }
1034 
1035     if (current_view_.version() > 0 &&
1036         do_send_delayed_list == true && auto_evict_ > 0)
1037     {
1038         send_delayed_list();
1039     }
1040 
1041     // All other nodes are under suspicion, set all others as inactive.
1042     // This will speed up recovery when this node has been isolated from
1043     // other group. Note that this should be done only if known size is
1044     // greater than 2 in order to avoid immediate split brain.
1045     if (known_.size() > 2 && n_suspected + 1 == known_.size())
1046     {
1047         for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
1048         {
1049             if (NodeMap::key(i) != uuid())
1050             {
1051                 evs_log_info(I_STATE)
1052                     << " setting source " << NodeMap::key(i)
1053                     << " inactive (other nodes under suspicion)";
1054                 set_inactive(NodeMap::key(i));
1055             }
1056         }
1057     }
1058 
1059     if (has_inactive == true && state() == S_OPERATIONAL)
1060     {
1061         gu_trace(shift_to(S_GATHER, true));
1062     }
1063     else if (has_inactive    == true &&
1064              state()     == S_LEAVING &&
1065              n_operational() == 1)
1066     {
1067         gu_trace(shift_to(S_CLOSED));
1068     }
1069 
1070     last_inactive_check_ = now;
1071 
1072 
1073     // Check if isolation period has ended
1074     if (isolation_end_ != gu::datetime::Date::zero() &&
1075         isolation_end_ <= now)
1076     {
1077         log_info << "ending isolation";
1078         isolation_end_ = gu::datetime::Date::zero();
1079     }
1080 }
1081 
1082 
set_inactive(const UUID & node_uuid)1083 void gcomm::evs::Proto::set_inactive(const UUID& node_uuid)
1084 {
1085     NodeMap::iterator i;
1086     gcomm_assert(node_uuid != uuid());
1087     gu_trace(i = known_.find_checked(node_uuid));
1088     evs_log_debug(D_STATE) << "setting " << node_uuid << " inactive";
1089     Node& node(NodeMap::value(i));
1090     node.set_tstamp(gu::datetime::Date::zero());
1091     node.set_join_message(0);
1092     // node.set_leave_message(0);
1093     node.set_operational(false);
1094 }
1095 
1096 
is_inactive(const UUID & uuid) const1097 bool gcomm::evs::Proto::is_inactive(const UUID& uuid) const
1098 {
1099     NodeMap::const_iterator i;
1100     gu_trace(i = known_.find_checked(uuid));
1101     const Node& node(NodeMap::value(i));
1102     return (node.operational() == false);
1103 }
1104 
cleanup_foreign(const InstallMessage & im)1105 void gcomm::evs::Proto::cleanup_foreign(const InstallMessage& im)
1106 {
1107     NodeMap::iterator i, i_next;
1108     for (i = known_.begin(); i != known_.end(); i = i_next)
1109     {
1110         const UUID& uuid(NodeMap::key(i));
1111         i_next = i, ++i_next;
1112         const MessageNodeList::const_iterator mni(im.node_list().find(uuid));
1113         if (mni == im.node_list().end() ||
1114             MessageNodeList::value(mni).operational() == false)
1115         {
1116             known_.erase(i);
1117         }
1118     }
1119 }
1120 
cleanup_views()1121 void gcomm::evs::Proto::cleanup_views()
1122 {
1123     gu::datetime::Date now(gu::datetime::Date::monotonic());
1124 
1125     ViewList::iterator i, i_next;
1126     for (i = previous_views_.begin(); i != previous_views_.end(); i = i_next)
1127     {
1128         i_next = i, ++i_next;
1129         if (i->second + view_forget_timeout_ <= now)
1130         {
1131             evs_log_debug(D_STATE) << " erasing view: " << i->first;
1132             previous_views_.erase(i);
1133         }
1134     }
1135 }
1136 
cleanup_evicted()1137 void gcomm::evs::Proto::cleanup_evicted()
1138 {
1139     gu::datetime::Date now(gu::datetime::Date::monotonic());
1140     Protolay::EvictList::const_iterator i, i_next;
1141     for (i = evict_list().begin(); i != evict_list().end(); i = i_next)
1142     {
1143         i_next = i, ++i_next;
1144         if (Protolay::EvictList::value(i) + view_forget_timeout_ <= now)
1145         {
1146             log_info << "unevicting " << Protolay::EvictList::key(i);
1147             unevict(Protolay::EvictList::key(i));
1148         }
1149     }
1150 }
1151 
n_operational() const1152 size_t gcomm::evs::Proto::n_operational() const
1153 {
1154     NodeMap::const_iterator i;
1155     size_t ret = 0;
1156     for (i = known_.begin(); i != known_.end(); ++i) {
1157         if (i->second.operational() == true)
1158             ret++;
1159     }
1160     return ret;
1161 }
1162 
deliver_reg_view(const InstallMessage & im,const View & prev_view)1163 void gcomm::evs::Proto::deliver_reg_view(const InstallMessage& im,
1164                                          const View& prev_view)
1165 {
1166 
1167     View view(im.version(), im.install_view_id());
1168     for (MessageNodeList::const_iterator i(im.node_list().begin());
1169          i != im.node_list().end(); ++i)
1170     {
1171         const UUID& uuid(MessageNodeList::key(i));
1172         const MessageNode& mn(MessageNodeList::value(i));
1173 
1174         // 1) Operational nodes will be members of new view
1175         // 2) Operational nodes that were not present in previous
1176         //    view are going also to joined set
1177         // 3) Leaving nodes go to left set
1178         // 4) All other nodes present in previous view but not in
1179         //    member of left set are considered partitioned
1180         if (mn.operational() == true)
1181         {
1182             view.add_member(uuid, mn.segment());
1183             if (prev_view.is_member(uuid) == false)
1184             {
1185                 view.add_joined(uuid, mn.segment());
1186             }
1187         }
1188         else if (mn.leaving() == true)
1189         {
1190             view.add_left(uuid, mn.segment());
1191         }
1192         else
1193         {
1194             // Partitioned set is constructed after this loop
1195         }
1196 
1197         // If node has been evicted, it should have been added to
1198         // evicted list via JOIN messages.
1199         assert(mn.evicted() == false || is_evicted(uuid) == true);
1200     }
1201 
1202     // Loop over previous view and add each node not in new view
1203     // member of left set as partitioned.
1204     for (NodeList::const_iterator i(prev_view.members().begin());
1205          i != prev_view.members().end(); ++i)
1206     {
1207         const UUID& uuid(NodeList::key(i));
1208         const gcomm::Node& mn(NodeList::value(i));
1209         if (view.is_member(uuid)  == false &&
1210             view.is_leaving(uuid) == false)
1211         {
1212             view.add_partitioned(uuid, mn.segment());
1213         }
1214     }
1215 
1216     evs_log_info(I_VIEWS) << "delivering view " << view;
1217 
1218     // This node must be a member of the view it delivers and
1219     // view id UUID must be of one of the members.
1220     gcomm_assert(view.is_member(uuid()) == true);
1221     gcomm_assert(view.is_member(view.id().uuid()) == true)
1222         << "view id UUID " << view.id().uuid()
1223         << " not found from reg view members "
1224         << view.members()
1225         << " must abort to avoid possibility of two groups "
1226         << "with the same view id";
1227 
1228     set_stable_view(view);
1229     ProtoUpMeta up_meta(UUID::nil(), ViewId(), &view);
1230     send_up(Datagram(), up_meta);
1231 }
1232 
deliver_trans_view(const InstallMessage & im,const View & curr_view)1233 void gcomm::evs::Proto::deliver_trans_view(const InstallMessage& im,
1234                                            const View& curr_view)
1235 {
1236 
1237     // Trans view is intersection of members in curr_view
1238     // and members going to be in the next view that come from
1239     // curr_view according to install message
1240 
1241     View view(current_view_.version(),
1242               ViewId(V_TRANS,
1243                      curr_view.id().uuid(),
1244                      curr_view.id().seq()));
1245 
1246     for (MessageNodeList::const_iterator i(im.node_list().begin());
1247          i != im.node_list().end(); ++i)
1248     {
1249         const UUID& uuid(MessageNodeList::key(i));
1250         const MessageNode& mn(MessageNodeList::value(i));
1251 
1252         if (curr_view.id()            == mn.view_id() &&
1253             curr_view.is_member(uuid) == true)
1254         {
1255             // 1) Operational nodes go to next view
1256             // 2) Leaving nodes go to left set
1257             // 3) All other nodes present in previous view but not in
1258             //    member of left set are considered partitioned
1259             if (mn.operational() == true)
1260             {
1261                 view.add_member(uuid, mn.segment());
1262             }
1263             else if (mn.leaving() == true)
1264             {
1265                 view.add_left(uuid, mn.segment());
1266             }
1267             else
1268             {
1269                 // Partitioned set is constructed after this loop
1270             }
1271         }
1272     }
1273 
1274     // Loop over current view and add each node not in new view
1275     // member of left set as partitioned.
1276     for (NodeList::const_iterator i(curr_view.members().begin());
1277          i != curr_view.members().end(); ++i)
1278     {
1279         const UUID& uuid(NodeList::key(i));
1280         const gcomm::Node& mn(NodeList::value(i));
1281 
1282         if (view.is_member(uuid)  == false &&
1283             view.is_leaving(uuid) == false)
1284         {
1285             view.add_partitioned(uuid, mn.segment());
1286         }
1287     }
1288 
1289     // This node must be a member of the view it delivers and
1290     // if the view is the last transitional, view must have
1291     // exactly one member and no-one in left set.
1292     gcomm_assert(view.is_member(uuid()) == true);
1293 
1294     evs_log_info(I_VIEWS) << " delivering view " << view;
1295 
1296     ProtoUpMeta up_meta(UUID::nil(), ViewId(), &view);
1297     gu_trace(send_up(Datagram(), up_meta));
1298 }
1299 
1300 
deliver_empty_view()1301 void gcomm::evs::Proto::deliver_empty_view()
1302 {
1303     View view(0, V_REG);
1304 
1305     evs_log_info(I_VIEWS) << "delivering view " << view;
1306 
1307     ProtoUpMeta up_meta(UUID::nil(), ViewId(), &view);
1308     send_up(Datagram(), up_meta);
1309 }
1310 
1311 
setall_committed(bool val)1312 void gcomm::evs::Proto::setall_committed(bool val)
1313 {
1314     for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
1315     {
1316         NodeMap::value(i).set_committed(val);
1317     }
1318 }
1319 
1320 // Check if commit gaps from all known nodes found from install message have
1321 // been seen.
is_all_committed() const1322 bool gcomm::evs::Proto::is_all_committed() const
1323 {
1324     gcomm_assert(install_message_ != 0);
1325     for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
1326     {
1327         const UUID& uuid(NodeMap::key(i));
1328         const Node& inst(NodeMap::value(i));
1329         if (install_message_->node_list().find(uuid) !=
1330             install_message_->node_list().end()               &&
1331             inst.operational()                        == true &&
1332             inst.committed()                          == false)
1333         {
1334             return false;
1335         }
1336     }
1337     return true;
1338 }
1339 
setall_installed(bool val)1340 void gcomm::evs::Proto::setall_installed(bool val)
1341 {
1342     for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
1343     {
1344         NodeMap::value(i).set_installed(val);
1345     }
1346 }
1347 
1348 // Check if gaps from new view from all known nodes found from install
1349 // message have been seen.
is_all_installed() const1350 bool gcomm::evs::Proto::is_all_installed() const
1351 {
1352     gcomm_assert(install_message_ != 0);
1353     for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
1354     {
1355         const UUID& uuid(NodeMap::key(i));
1356         const Node& inst(NodeMap::value(i));
1357         if (install_message_->node_list().find(uuid) !=
1358             install_message_->node_list().end()              &&
1359             inst.operational()                       == true &&
1360             inst.installed()                         == false)
1361         {
1362             return false;
1363         }
1364     }
1365     return true;
1366 }
1367 
cleanup_joins()1368 void gcomm::evs::Proto::cleanup_joins()
1369 {
1370     for (NodeMap::iterator i = known_.begin(); i != known_.end(); ++i)
1371     {
1372         NodeMap::value(i).set_join_message(0);
1373     }
1374 }
1375 
is_representative(const UUID & uuid) const1376 bool gcomm::evs::Proto::is_representative(const UUID& uuid) const
1377 {
1378     for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
1379     {
1380         if (NodeMap::value(i).operational() == true &&
1381             NodeMap::value(i).is_inactive() == false)
1382         {
1383             assert(NodeMap::value(i).leave_message() == 0);
1384             if (NodeMap::value(i).leave_message() != 0)
1385             {
1386                 log_warn << "operational node " << NodeMap::key(i)
1387                          << " with leave message: " << NodeMap::value(i);
1388                 continue;
1389             }
1390             return (uuid == NodeMap::key(i));
1391         }
1392     }
1393 
1394     return false;
1395 }
1396 
is_all_suspected(const UUID & uuid) const1397 bool gcomm::evs::Proto::is_all_suspected(const UUID& uuid) const
1398 {
1399     for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
1400     {
1401         const Node& node(NodeMap::value(i));
1402         if (node.operational() == true) {
1403             const JoinMessage* jm(node.join_message());
1404             if (!jm) return false;
1405             const MessageNodeList::const_iterator j(jm->node_list().find(uuid));
1406             if (!(j != jm->node_list().end() &&
1407                   MessageNodeList::value(j).suspected()))
1408                 return false;
1409         }
1410     }
1411     return true;
1412 }
1413 
1414 
1415 
1416 /////////////////////////////////////////////////////////////////////////////
1417 // Message sending
1418 /////////////////////////////////////////////////////////////////////////////
1419 
1420 
1421 
1422 
is_flow_control(const seqno_t seq,const seqno_t win) const1423 bool gcomm::evs::Proto::is_flow_control(const seqno_t seq, const seqno_t win) const
1424 {
1425     gcomm_assert(seq != -1 && win != -1);
1426 
1427     const seqno_t base(input_map_->safe_seq());
1428     if (seq > base + win)
1429     {
1430         return true;
1431     }
1432     return false;
1433 }
1434 
request_user_msg_feedback(const gcomm::Datagram & dg) const1435 bool gcomm::evs::Proto::request_user_msg_feedback(const gcomm::Datagram& dg)
1436     const
1437 {
1438     // Request feedback from peers at least once per 128kB chunk. This will
1439     // force the nodes to complete their seqnos.
1440     if (bytes_since_request_user_msg_feedback_ + dg.len() >= (size_t(1) << 17))
1441     {
1442         evs_log_debug(D_USER_MSGS) << "bytes since request user msg feedback: "
1443                                    << bytes_since_request_user_msg_feedback_
1444                                    << " dg len: " << dg.len();
1445         return true;
1446     }
1447     return false;
1448 }
1449 
send_user(Datagram & dg,uint8_t const user_type,Order const order,seqno_t const win,seqno_t const up_to_seqno,size_t const n_aggregated)1450 int gcomm::evs::Proto::send_user(Datagram& dg,
1451                                  uint8_t const user_type,
1452                                  Order  const order,
1453                                  seqno_t const win,
1454                                  seqno_t const up_to_seqno,
1455                                  size_t const n_aggregated)
1456 {
1457     assert(state() == S_LEAVING ||
1458            state() == S_GATHER ||
1459            state() == S_OPERATIONAL);
1460     assert(dg.offset() == 0);
1461     assert(n_aggregated == 1 || output_.size() >= n_aggregated);
1462 
1463     gcomm_assert(up_to_seqno == -1 || up_to_seqno >= last_sent_);
1464     gcomm_assert(up_to_seqno == -1 || win == -1);
1465 
1466     int ret;
1467     const seqno_t seq(last_sent_ + 1);
1468 
1469     if (win                       != -1   &&
1470         is_flow_control(seq, win) == true)
1471     {
1472         return EAGAIN;
1473     }
1474 
1475     // seq_range max 0xff because of Message seq_range_ field limitation
1476     seqno_t seq_range(
1477         std::min(up_to_seqno == -1 ? 0 : up_to_seqno - seq,
1478                  evs::seqno_t(0xff)));
1479     seqno_t last_msg_seq(seq + seq_range);
1480     uint8_t flags;
1481 
1482     // If output queue wont contain messages after this patch,
1483     // up_to_seqno is given (msg completion) or flow contol would kick in
1484     // at next batch, don't set F_MSG_MORE. Also if the number of bytes
1485     // in send pipeline exceeds predefined value as reported by
1486     // request_user_msg_feedback(), the F_MSG_MORE will not get set.
1487     if (output_.size() <= n_aggregated ||
1488         up_to_seqno != -1 ||
1489         (win != -1 && (is_flow_control(last_msg_seq + 1, win) ||
1490                        request_user_msg_feedback(dg))))
1491     {
1492         flags = 0;
1493         bytes_since_request_user_msg_feedback_ = 0;
1494     }
1495     else
1496     {
1497         flags = Message::F_MSG_MORE;
1498         bytes_since_request_user_msg_feedback_ += dg.len();
1499     }
1500     if (n_aggregated > 1)
1501     {
1502         flags |= Message::F_AGGREGATE;
1503     }
1504 
1505     // Maximize seq range in the case next message batch won't be sent
1506     // immediately.
1507     if ((flags & Message::F_MSG_MORE) == 0 && up_to_seqno == -1)
1508     {
1509         seq_range = input_map_->max_hs() - seq;
1510         seq_range = std::max(static_cast<seqno_t>(0), seq_range);
1511         seq_range = std::min(static_cast<seqno_t>(0xff), seq_range);
1512         if (seq_range != 0)
1513         {
1514             log_debug << "adjusted seq range to: " << seq_range;
1515             last_msg_seq = seq + seq_range;
1516         }
1517     }
1518 
1519     gcomm_assert(last_msg_seq >= seq && last_msg_seq - seq <= 0xff);
1520     gcomm_assert(seq_range >= 0 && seq_range <= 0xff);
1521 
1522     UserMessage msg(version_,
1523                     uuid(),
1524                     current_view_.id(),
1525                     seq,
1526                     input_map_->aru_seq(),
1527                     seq_range,
1528                     order,
1529                     ++fifo_seq_,
1530                     user_type,
1531                     flags);
1532 
1533     // Insert first to input map to determine correct aru seq
1534     Range range;
1535     gu_trace(range = input_map_->insert(NodeMap::value(self_i_).index(),
1536                                        msg, dg));
1537 
1538     gcomm_assert(range.hs() == last_msg_seq)
1539         << msg << " " << *input_map_ << " " << *this;
1540 
1541     last_sent_ = last_msg_seq;
1542     assert(range.hs() == last_sent_);
1543 
1544     update_im_safe_seq(NodeMap::value(self_i_).index(),
1545                        input_map_->aru_seq());
1546 
1547     msg.set_aru_seq(input_map_->aru_seq());
1548     evs_log_debug(D_USER_MSGS) << " sending " << msg;
1549     gu_trace(push_header(msg, dg));
1550     if ((ret = send_down(dg, ProtoDownMeta())) != 0)
1551     {
1552         log_debug << "send failed: "  << strerror(ret);
1553     }
1554     gu_trace(pop_header(msg, dg));
1555     sent_msgs_[Message::EVS_T_USER]++;
1556 
1557     if (delivering_ == false)
1558     {
1559         gu_trace(deliver());
1560         gu_trace(deliver_local());
1561     }
1562 
1563     return 0;
1564 }
1565 
aggregate_len() const1566 size_t gcomm::evs::Proto::aggregate_len() const
1567 {
1568     bool is_aggregate(false);
1569     size_t ret(0);
1570     AggregateMessage am;
1571     out_queue::const_iterator i(output_.begin());
1572     const Order ord(i->second.order());
1573     ret += i->first.len() + am.serial_size();
1574     for (++i; i != output_.end() && i->second.order() == ord; ++i)
1575     {
1576         if (ret + i->first.len() + am.serial_size() <= mtu())
1577         {
1578             ret += i->first.len() + am.serial_size();
1579             is_aggregate = true;
1580         }
1581         else
1582         {
1583             break;
1584         }
1585     }
1586     evs_log_debug(D_USER_MSGS) << "is aggregate " << is_aggregate << " ret " << ret;
1587     return (is_aggregate == true ? ret : 0);
1588 }
1589 
send_user(const seqno_t win)1590 int gcomm::evs::Proto::send_user(const seqno_t win)
1591 {
1592     gcomm_assert(output_.empty() == false);
1593     gcomm_assert(state() == S_OPERATIONAL);
1594     gcomm_assert(win <= send_window_);
1595     int ret;
1596     size_t alen;
1597     if (use_aggregate_ == true && (alen = aggregate_len()) > 0)
1598     {
1599         // Messages can be aggregated into single message
1600         send_buf_.resize(alen);
1601         size_t offset(0);
1602         size_t n(0);
1603 
1604         out_queue::const_iterator i(output_.begin());
1605         Order ord(i->second.order());
1606         while ((alen > 0 && i != output_.end()))
1607         {
1608             const Datagram& dg(i->first);
1609             const ProtoDownMeta dm(i->second);
1610             AggregateMessage am(0, dg.len(), dm.user_type());
1611             gcomm_assert(alen >= dg.len() + am.serial_size());
1612 
1613             gu_trace(offset = am.serialize(&send_buf_[0],
1614                                            send_buf_.size(), offset));
1615             std::copy(dg.header() + dg.header_offset(),
1616                       dg.header() + dg.header_size(),
1617                       &send_buf_[0] + offset);
1618             offset += (dg.header_len());
1619             std::copy(dg.payload().begin(), dg.payload().end(),
1620                       &send_buf_[0] + offset);
1621             offset += dg.payload().size();
1622             alen -= dg.len() + am.serial_size();
1623             ++n;
1624             ++i;
1625         }
1626         Datagram dg(gu::SharedBuffer(new gu::Buffer(send_buf_.begin(),
1627                                                         send_buf_.end())));
1628         if ((ret = send_user(dg, 0xff, ord, win, -1, n)) == 0)
1629         {
1630             while (n-- > 0)
1631             {
1632                 output_.pop_front();
1633             }
1634         }
1635     }
1636     else
1637     {
1638         std::pair<Datagram, ProtoDownMeta> wb(output_.front());
1639         if ((ret = send_user(wb.first,
1640                              wb.second.user_type(),
1641                              wb.second.order(),
1642                              win,
1643                              -1)) == 0)
1644         {
1645             output_.pop_front();
1646         }
1647     }
1648     return ret;
1649 }
1650 
1651 
complete_user(const seqno_t high_seq)1652 void gcomm::evs::Proto::complete_user(const seqno_t high_seq)
1653 {
1654     gcomm_assert(state() == S_OPERATIONAL || state() == S_GATHER);
1655 
1656     evs_log_debug(D_USER_MSGS) << "completing seqno to " << high_seq;;
1657 
1658     Datagram wb;
1659     int err;
1660     err = send_user(wb, 0xff, O_DROP, -1, high_seq);
1661     if (err != 0)
1662     {
1663         log_debug << "failed to send completing msg " << strerror(err)
1664                   << " seq=" << high_seq << " send_window=" << send_window_
1665                   << " last_sent=" << last_sent_;
1666     }
1667 
1668 }
1669 
send_delegate(Datagram & wb,const UUID & target)1670 int gcomm::evs::Proto::send_delegate(Datagram& wb, const UUID& target)
1671 {
1672     DelegateMessage dm(version_, uuid(), current_view_.id(),
1673                        ++fifo_seq_);
1674     push_header(dm, wb);
1675     int ret = send_down(wb, ProtoDownMeta(target));
1676     pop_header(dm, wb);
1677     sent_msgs_[Message::EVS_T_DELEGATE]++;
1678     return ret;
1679 }
1680 
gap_rate_limit(const UUID & target,const Range & range) const1681 bool gcomm::evs::Proto::gap_rate_limit(const UUID& target, const Range& range)
1682     const
1683 {
1684     NodeMap::const_iterator target_i(known_.find(target));
1685     // Sanity check: The target should always be in the set
1686     // of known nodes. If it is not, skip sending the gap message
1687     // in production.
1688     assert(target_i != known_.end());
1689     if (target_i == known_.end())
1690     {
1691         return true;
1692     }
1693     const Node& target_node(target_i->second);
1694     // Limit requesting ranges with the same highest seen within
1695     // 50msec period.
1696     gu::datetime::Date now(gu::datetime::Date::monotonic());
1697     if (now < target_node.last_requested_range_tstamp() + gu::datetime::MSec*100)
1698     {
1699         evs_log_debug(D_GAP_MSGS) << "Rate limiting gap: now " << now
1700                                   << " requested range tstamp: "
1701                                   << target_node.last_requested_range_tstamp()
1702                                   << " requested range: "
1703                                   << target_node.last_requested_range();
1704         return true;
1705     }
1706     return false;
1707 }
1708 
send_gap(EVS_CALLER_ARG,const UUID & range_uuid,const ViewId & source_view_id,const Range range,const bool commit)1709 void gcomm::evs::Proto::send_gap(EVS_CALLER_ARG,
1710                                  const UUID&   range_uuid,
1711                                  const ViewId& source_view_id,
1712                                  const Range   range,
1713                                  const bool    commit)
1714 {
1715     assert(range_uuid == UUID::nil());
1716     assert(range.is_empty());
1717     gcomm_assert((commit == false && source_view_id == current_view_.id())
1718                  || install_message_ != 0);
1719     uint8_t flags(0);
1720     if (commit == true) flags |= Message::F_COMMIT;
1721 
1722     GapMessage gm(version_,
1723                   uuid(),
1724                   source_view_id,
1725                   (source_view_id == current_view_.id() ? last_sent_ :
1726                    (commit == true ? install_message_->fifo_seq() : -1)),
1727                   (source_view_id == current_view_.id() ?
1728                    input_map_->aru_seq() : -1),
1729                   ++fifo_seq_,
1730                   range_uuid,
1731                   range,
1732                   flags);
1733 
1734     evs_log_debug(D_GAP_MSGS) << EVS_LOG_METHOD << gm;
1735     gu::Buffer buf;
1736     serialize(gm, buf);
1737     Datagram dg(buf);
1738     int err = send_down(dg, ProtoDownMeta(range_uuid));
1739     if (err != 0)
1740     {
1741         log_debug << "send failed: " << strerror(err);
1742     }
1743     sent_msgs_[Message::EVS_T_GAP]++;
1744     gu_trace(handle_gap(gm, self_i_));
1745 }
1746 
1747 
populate_node_list(MessageNodeList * node_list) const1748 void gcomm::evs::Proto::populate_node_list(MessageNodeList* node_list) const
1749 {
1750     for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
1751     {
1752         const UUID& node_uuid(NodeMap::key(i));
1753         const Node& node(NodeMap::value(i));
1754         MessageNode mnode(node.operational(), node.suspected(),
1755                           is_evicted(node_uuid));
1756         if (node_uuid != uuid())
1757         {
1758             const JoinMessage* jm(node.join_message());
1759             const LeaveMessage* lm(node.leave_message());
1760 
1761             //
1762             if (jm != 0)
1763             {
1764                 const ViewId& nsv(jm->source_view_id());
1765                 const MessageNode& mn(MessageNodeList::value(jm->node_list().find_checked(node_uuid)));
1766                 mnode = MessageNode(node.operational(),
1767                                     node.is_suspected(),
1768                                     node.segment(),
1769                                     is_evicted(node_uuid),
1770                                     -1,
1771                                     jm->source_view_id(),
1772                                     (nsv == current_view_.id() ?
1773                                      input_map_->safe_seq(node.index()) :
1774                                      mn.safe_seq()),
1775                                     (nsv == current_view_.id() ?
1776                                      input_map_->range(node.index()) :
1777                                      mn.im_range()));
1778             }
1779             else if (lm != 0)
1780             {
1781                 const ViewId& nsv(lm->source_view_id());
1782                 mnode = MessageNode(node.operational(),
1783                                     node.is_suspected(),
1784                                     node.segment(),
1785                                     is_evicted(node_uuid),
1786                                     lm->seq(),
1787                                     nsv,
1788                                     (nsv == current_view_.id() ?
1789                                      input_map_->safe_seq(node.index()) :
1790                                      -1),
1791                                     (nsv == current_view_.id() ?
1792                                      input_map_->range(node.index()) :
1793                                      Range()));
1794             }
1795             else if (current_view_.is_member(node_uuid) == true)
1796             {
1797                 mnode = MessageNode(node.operational(),
1798                                     node.is_suspected(),
1799                                     node.segment(),
1800                                     is_evicted(node_uuid),
1801                                     -1,
1802                                     current_view_.id(),
1803                                     input_map_->safe_seq(node.index()),
1804                                     input_map_->range(node.index()));
1805             }
1806         }
1807         else
1808         {
1809             mnode = MessageNode(true,
1810                                 false,
1811                                 node.segment(),
1812                                 is_evicted(node_uuid),
1813                                 -1,
1814                                 current_view_.id(),
1815                                 input_map_->safe_seq(node.index()),
1816                                 input_map_->range(node.index()));
1817         }
1818         gu_trace((void)node_list->insert_unique(std::make_pair(node_uuid, mnode)));
1819     }
1820 
1821     // Iterate over evicted_list and add evicted nodes not yet in node list.
1822     for (Protolay::EvictList::const_iterator i(evict_list().begin());
1823          i != evict_list().end(); ++i)
1824     {
1825         if (node_list->find(Protolay::EvictList::key(i)) == node_list->end())
1826         {
1827             // default arguments are evil.
1828             MessageNode mnode(false, false, 0, true);
1829             gu_trace((void)node_list->insert_unique(
1830                          std::make_pair(Protolay::EvictList::key(i), mnode)));
1831         }
1832     }
1833 
1834     evs_log_debug(D_CONSENSUS) << "populate node list:\n" << *node_list;
1835 }
1836 
create_join()1837 const gcomm::evs::JoinMessage& gcomm::evs::Proto::create_join()
1838 {
1839 
1840     MessageNodeList node_list;
1841 
1842     gu_trace(populate_node_list(&node_list));
1843     JoinMessage jm(version_,
1844                    uuid(),
1845                    current_view_.id(),
1846                    input_map_->safe_seq(),
1847                    input_map_->aru_seq(),
1848                    ++fifo_seq_,
1849                    node_list);
1850     NodeMap::value(self_i_).set_join_message(&jm);
1851 
1852     evs_log_debug(D_JOIN_MSGS) << " created join message " << jm;
1853 
1854     return *NodeMap::value(self_i_).join_message();
1855 }
1856 
1857 
set_join(const JoinMessage & jm,const UUID & source)1858 void gcomm::evs::Proto::set_join(const JoinMessage& jm, const UUID& source)
1859 {
1860     NodeMap::iterator i;
1861     gu_trace(i = known_.find_checked(source));
1862     NodeMap::value(i).set_join_message(&jm);;
1863 }
1864 
1865 
set_leave(const LeaveMessage & lm,const UUID & source)1866 void gcomm::evs::Proto::set_leave(const LeaveMessage& lm, const UUID& source)
1867 {
1868     NodeMap::iterator i;
1869     gu_trace(i = known_.find_checked(source));
1870     Node& inst(NodeMap::value(i));
1871 
1872     if (inst.leave_message())
1873     {
1874         evs_log_debug(D_LEAVE_MSGS) << "Duplicate leave:\told: "
1875                                     << *inst.leave_message()
1876                                     << "\tnew: " << lm;
1877     }
1878     else
1879     {
1880         inst.set_leave_message(&lm);
1881     }
1882 }
1883 
1884 
send_join(bool handle)1885 void gcomm::evs::Proto::send_join(bool handle)
1886 {
1887     assert(output_.empty() == true);
1888 
1889     JoinMessage jm(create_join());
1890 
1891     gu::Buffer buf;
1892     serialize(jm, buf);
1893     Datagram dg(buf);
1894     int err = send_down(dg, ProtoDownMeta());
1895 
1896     if (err != 0)
1897     {
1898         log_debug << "send failed: " << strerror(err);
1899     }
1900     else
1901     {
1902         last_sent_join_tstamp_ = gu::datetime::Date::monotonic();
1903     }
1904     sent_msgs_[Message::EVS_T_JOIN]++;
1905     if (handle == true)
1906     {
1907         handle_join(jm, self_i_);
1908     }
1909 }
1910 
1911 
send_leave(bool handle)1912 void gcomm::evs::Proto::send_leave(bool handle)
1913 {
1914     gcomm_assert(state() == S_LEAVING);
1915 
1916     // If no messages have been sent, generate one dummy to
1917     // trigger message acknowledgement mechanism
1918     if (last_sent_ == -1 && output_.empty() == true)
1919     {
1920         Datagram wb;
1921         gu_trace(send_user(wb, 0xff, O_DROP, -1, -1));
1922     }
1923 
1924     /* Move all pending messages from output to input map */
1925     while (output_.empty() == false)
1926     {
1927         std::pair<Datagram, ProtoDownMeta> wb = output_.front();
1928         if (send_user(wb.first,
1929                       wb.second.user_type(),
1930                       wb.second.order(),
1931                       -1, -1) != 0)
1932         {
1933             gu_throw_fatal << "send_user() failed";
1934         }
1935         output_.pop_front();
1936     }
1937 
1938 
1939     LeaveMessage lm(version_,
1940                     uuid(),
1941                     current_view_.id(),
1942                     last_sent_,
1943                     input_map_->aru_seq(),
1944                     ++fifo_seq_);
1945 
1946     evs_log_debug(D_LEAVE_MSGS) << "sending leave msg " << lm;
1947 
1948     gu::Buffer buf;
1949     serialize(lm, buf);
1950     Datagram dg(buf);
1951     int err = send_down(dg, ProtoDownMeta());
1952     if (err != 0)
1953     {
1954         log_debug << "send failed " << strerror(err);
1955     }
1956 
1957     sent_msgs_[Message::EVS_T_LEAVE]++;
1958 
1959     if (handle == true)
1960     {
1961         handle_leave(lm, self_i_);
1962     }
1963 }
1964 
1965 
1966 struct ViewIdCmp
1967 {
operator ()ViewIdCmp1968     bool operator()(const gcomm::evs::NodeMap::value_type& a,
1969                     const gcomm::evs::NodeMap::value_type& b) const
1970     {
1971         using gcomm::evs::NodeMap;
1972         gcomm_assert(NodeMap::value(a).join_message() != 0 &&
1973                      NodeMap::value(b).join_message() != 0);
1974         return (NodeMap::value(a).join_message()->source_view_id().seq() <
1975                 NodeMap::value(b).join_message()->source_view_id().seq());
1976 
1977     }
1978 };
1979 
1980 
1981 struct ProtoVerCmp
1982 {
operator ()ProtoVerCmp1983     bool operator()(const gcomm::evs::NodeMap::value_type& a,
1984                     const gcomm::evs::NodeMap::value_type& b) const
1985     {
1986         using gcomm::evs::NodeMap;
1987         gcomm_assert(NodeMap::value(a).join_message() != 0 &&
1988                      NodeMap::value(b).join_message() != 0);
1989         return (NodeMap::value(a).join_message()->version() <
1990                 NodeMap::value(b).join_message()->version());
1991 
1992     }
1993 };
1994 
send_install(EVS_CALLER_ARG)1995 void gcomm::evs::Proto::send_install(EVS_CALLER_ARG)
1996 {
1997     gcomm_assert(consensus_.is_consensus() == true &&
1998                  is_representative(uuid()) == true) << *this;
1999 
2000     // Select list of operational nodes from known
2001     NodeMap oper_list;
2002     for_each(known_.begin(), known_.end(), OperationalSelect(oper_list));
2003     NodeMap::const_iterator max_node =
2004         max_element(oper_list.begin(), oper_list.end(), ViewIdCmp());
2005 
2006     // Compute maximum known view id seq
2007     max_view_id_seq_ =
2008         std::max(max_view_id_seq_,
2009                  NodeMap::value(max_node).join_message()->source_view_id().seq());
2010 
2011     // Compute highest commonly supported protocol version.
2012     // Oper_list is non-empty, join message existence is asserted.
2013     const int version(
2014         NodeMap::value(
2015             std::min_element(oper_list.begin(), oper_list.end(),
2016                              ProtoVerCmp())).join_message()->version());
2017 
2018     MessageNodeList node_list;
2019     populate_node_list(&node_list);
2020 
2021     InstallMessage imsg(version,
2022                         uuid(),
2023                         current_view_.id(),
2024                         ViewId(V_REG, uuid(), max_view_id_seq_ + attempt_seq_),
2025                         input_map_->safe_seq(),
2026                         input_map_->aru_seq(),
2027                         ++fifo_seq_,
2028                         node_list);
2029     ++attempt_seq_;
2030     evs_log_debug(D_INSTALL_MSGS) << EVS_LOG_METHOD << imsg;
2031     evs_log_info(I_STATE) << "sending install message" << imsg;
2032     gcomm_assert(consensus_.is_consistent(imsg));
2033 
2034     gu::Buffer buf;
2035     serialize(imsg, buf);
2036     Datagram dg(buf);
2037     int err = send_down(dg, ProtoDownMeta());
2038     if (err != 0)
2039     {
2040         log_debug << "send failed: " << strerror(err);
2041     }
2042 
2043     sent_msgs_[Message::EVS_T_INSTALL]++;
2044     handle_install(imsg, self_i_);
2045 }
2046 
2047 
send_delayed_list()2048 void gcomm::evs::Proto::send_delayed_list()
2049 {
2050     DelayedListMessage elm(version_, uuid(), current_view_.id(), ++fifo_seq_);
2051     for (DelayedList::const_iterator i(delayed_list_.begin());
2052          i != delayed_list_.end(); ++i)
2053     {
2054         elm.add(i->first, i->second.state_change_cnt());
2055     }
2056     gu::Buffer buf;
2057     serialize(elm, buf);
2058     Datagram dg(buf);
2059     (void)send_down(dg, ProtoDownMeta());
2060     handle_delayed_list(elm, self_i_);
2061 }
2062 
resend(const UUID & gap_source,const Range range)2063 void gcomm::evs::Proto::resend(const UUID& gap_source, const Range range)
2064 {
2065     gcomm_assert(gap_source != uuid());
2066     gcomm_assert(range.lu() <= range.hs()) <<
2067         "lu (" << range.lu() << ") > hs(" << range.hs() << ")";
2068 
2069     if (range.lu() <= input_map_->safe_seq())
2070     {
2071         evs_log_debug(D_RETRANS) << self_string() << "lu (" << range.lu()
2072                                  << ") <= safe_seq("
2073                                  << input_map_->safe_seq()
2074                                  << "), can't recover message";
2075         return;
2076     }
2077 
2078     evs_log_debug(D_RETRANS) << " retrans requested by "
2079                              << gap_source
2080                              << " "
2081                              << range.lu() << " -> "
2082                              << range.hs();
2083 
2084     // All of the nodes have received all messages up to input_map_->safe_seq(),
2085     // therefore it does not make sense to retransmit anything below that.
2086     seqno_t seq(std::max(range.lu(), input_map_->safe_seq() + 1));
2087     evs_log_debug(D_RETRANS) << "retransmitting from " << seq;
2088     while (seq <= range.hs())
2089     {
2090         InputMap::iterator msg_i = input_map_->find(
2091             NodeMap::value(self_i_).index(), seq);
2092         if (msg_i == input_map_->end())
2093         {
2094             try
2095             {
2096                 gu_trace(msg_i = input_map_->recover(
2097                              NodeMap::value(self_i_).index(), seq));
2098             }
2099             catch (...)
2100             {
2101                 evs_log_debug(D_RETRANS) << "could not recover message "
2102                                          << gap_source << ":" << seq;
2103                 seq = seq + 1;
2104                 continue;
2105             }
2106         }
2107 
2108         const UserMessage& msg(InputMapMsgIndex::value(msg_i).msg());
2109         gcomm_assert(msg.source() == uuid());
2110         Datagram rb(InputMapMsgIndex::value(msg_i).rb());
2111         assert(rb.offset() == 0);
2112 
2113         UserMessage um(msg.version(),
2114                        msg.source(),
2115                        msg.source_view_id(),
2116                        msg.seq(),
2117                        input_map_->aru_seq(),
2118                        msg.seq_range(),
2119                        msg.order(),
2120                        msg.fifo_seq(),
2121                        msg.user_type(),
2122                        static_cast<uint8_t>(
2123                            Message::F_RETRANS |
2124                            (msg.flags() & Message::F_AGGREGATE)));
2125 
2126         push_header(um, rb);
2127 
2128         int err = send_down(rb, ProtoDownMeta(gap_source));
2129         if (err != 0)
2130         {
2131             log_debug << "send failed: " << strerror(err);
2132             break;
2133         }
2134         else
2135         {
2136             evs_log_debug(D_RETRANS) << "retransmitted " << um;
2137         }
2138         seq = seq + msg.seq_range() + 1;
2139         retrans_msgs_++;
2140     }
2141 }
2142 
2143 
recover(const UUID & gap_source,const UUID & range_uuid,const Range range)2144 void gcomm::evs::Proto::recover(const UUID& gap_source,
2145                                 const UUID& range_uuid,
2146                                 const Range range)
2147 {
2148     gcomm_assert(gap_source != uuid())
2149         << "gap_source (" << gap_source << ") == uuid() (" << uuid()
2150         << " state " << *this;
2151     gcomm_assert(range.lu() <= range.hs())
2152         << "lu (" << range.lu() << ") > hs (" << range.hs() << ")";
2153 
2154     if (range.lu() <= input_map_->safe_seq())
2155     {
2156         evs_log_debug(D_RETRANS) << "lu (" << range.lu()
2157                                  << ") <= safe_seq(" << input_map_->safe_seq()
2158                                  << "), can't recover message";
2159         return;
2160     }
2161 
2162     const Node& range_node(NodeMap::value(known_.find_checked(range_uuid)));
2163     const Range im_range(input_map_->range(range_node.index()));
2164 
2165     evs_log_debug(D_RETRANS) << " recovering message from "
2166                              << range_uuid
2167                              << " requested by "
2168                              << gap_source
2169                              << " requested range " << range
2170                              << " available " << im_range;
2171 
2172     // All of the nodes have received all messages up to input_map_->safe_seq(),
2173     // therefore it does not make sense to retransmit anything below that.
2174     seqno_t seq(std::max(range.lu(), input_map_->safe_seq() + 1));
2175     evs_log_debug(D_RETRANS) << "recovering from " << seq;
2176     size_t n_recovered(0);
2177     while (seq <= range.hs() && seq <= im_range.hs())
2178     {
2179         InputMap::iterator msg_i = input_map_->find(range_node.index(), seq);
2180         if (msg_i == input_map_->end())
2181         {
2182             try
2183             {
2184                 gu_trace(msg_i = input_map_->recover(range_node.index(), seq));
2185             }
2186             catch (...)
2187             {
2188                 seq = seq + 1;
2189                 continue;
2190             }
2191         }
2192 
2193         const UserMessage& msg(InputMapMsgIndex::value(msg_i).msg());
2194         assert(msg.source() == range_uuid);
2195 
2196         Datagram rb(InputMapMsgIndex::value(msg_i).rb());
2197         assert(rb.offset() == 0);
2198         UserMessage um(msg.version(),
2199                        msg.source(),
2200                        msg.source_view_id(),
2201                        msg.seq(),
2202                        msg.aru_seq(),
2203                        msg.seq_range(),
2204                        msg.order(),
2205                        msg.fifo_seq(),
2206                        msg.user_type(),
2207                        static_cast<uint8_t>(
2208                            Message::F_SOURCE |
2209                            Message::F_RETRANS |
2210                            (msg.flags() & Message::F_AGGREGATE)));
2211 
2212         push_header(um, rb);
2213 
2214         ++n_recovered;
2215         int err = send_delegate(rb, gap_source);
2216         if (err != 0)
2217         {
2218             log_debug << "send failed: " << strerror(err);
2219             break;
2220         }
2221         else
2222         {
2223             evs_log_debug(D_RETRANS) << "recover " << um;
2224         }
2225         seq = seq + msg.seq_range() + 1;
2226         recovered_msgs_++;
2227     }
2228     evs_log_debug(D_RETRANS) << "recovered: " << n_recovered;
2229 }
2230 
2231 
2232 class UUIDFixedPartCmp
2233 {
2234 public:
UUIDFixedPartCmp(const gcomm::UUID & uuid)2235     UUIDFixedPartCmp(const gcomm::UUID& uuid) : uuid_(uuid) { }
operator ()(const gcomm::evs::NodeMap::value_type & vt) const2236     bool operator()(const gcomm::evs::NodeMap::value_type& vt) const
2237     {
2238         return uuid_.fixed_part_matches(vt.first);
2239     }
2240 private:
2241     const gcomm::UUID& uuid_;
2242 };
2243 
handle_foreign(const Message & msg)2244 void gcomm::evs::Proto::handle_foreign(const Message& msg)
2245 {
2246     // no need to handle foreign LEAVE message
2247     if (msg.type() == Message::EVS_T_LEAVE)
2248     {
2249         return;
2250     }
2251 
2252     // Don't handle foreign messages in install phase.
2253     // This includes not only INSTALL state, but also
2254     // GATHER state after receiving install message.
2255     if (install_message_ != 0)
2256     {
2257         evs_log_debug(D_FOREIGN_MSGS)
2258             << " dropping foreign message from "
2259             << msg.source() << " in install state";
2260         return;
2261     }
2262 
2263     if (is_msg_from_previous_view(msg) == true)
2264     {
2265         return;
2266     }
2267 
2268     const UUID& source(msg.source());
2269 
2270     if (source == UUID::nil())
2271     {
2272         log_warn << "Received message with nil source UUDI, dropping";
2273         return;
2274     }
2275 
2276     NodeMap::iterator i;
2277     if ((i = std::find_if(known_.begin(), known_.end(), UUIDFixedPartCmp(source)))
2278         != known_.end())
2279     {
2280         // Keep the new incarnation out of the group until a new view has been
2281         // established.
2282         evs_log_debug(D_FOREIGN_MSGS)
2283             << "Dropping message from new incarnation of already known "
2284             "node in current view, old: " << i->first << " new: " << source;
2285         return;
2286     }
2287     evs_log_info(I_STATE) << " detected new message source "
2288                           << source;
2289 
2290     gu_trace(i = known_.insert_unique(
2291                  std::make_pair(source, Node(*this))));
2292     assert(NodeMap::value(i).operational() == true);
2293 
2294     if (state() == S_JOINING || state() == S_GATHER ||
2295         state() == S_OPERATIONAL)
2296     {
2297         evs_log_info(I_STATE)
2298             << " shift to GATHER due to foreign message from "
2299             << msg.source();
2300         gu_trace(shift_to(S_GATHER, false));
2301         // Reset install timer each time foreign message is seen to
2302         // synchronize install timers.
2303         reset_timer(T_INSTALL);
2304     }
2305 
2306     // Set join message after shift to recovery, shift may clean up
2307     // join messages
2308     if (msg.type() == Message::EVS_T_JOIN)
2309     {
2310         set_join(static_cast<const JoinMessage&>(msg), msg.source());
2311     }
2312     send_join(true);
2313 }
2314 
handle_msg(const Message & msg,const Datagram & rb,bool direct)2315 void gcomm::evs::Proto::handle_msg(const Message& msg,
2316                                    const Datagram& rb,
2317                                    bool direct)
2318 {
2319     assert(msg.type() <= Message::EVS_T_DELAYED_LIST);
2320     if (msg.type() > Message::EVS_T_DELAYED_LIST)
2321     {
2322         return;
2323     }
2324 
2325     if (state() == S_CLOSED)
2326     {
2327         return;
2328     }
2329 
2330     if (isolation_end_ != gu::datetime::Date::zero())
2331     {
2332         evs_log_debug(D_STATE) << " dropping message due to isolation";
2333         // Isolation period is on
2334         return;
2335     }
2336 
2337     if (msg.source() == uuid())
2338     {
2339         evs_log_debug(D_FOREIGN_MSGS) << " dropping own message";
2340         return;
2341     }
2342 
2343     if (msg.version() > GCOMM_PROTOCOL_MAX_VERSION)
2344     {
2345         log_info << "incompatible protocol version "
2346                  << static_cast<int>(msg.version());
2347         return;
2348     }
2349 
2350     gcomm_assert(msg.source() != UUID::nil());
2351 
2352     // Figure out if the message is from known source
2353     NodeMap::iterator ii = known_.find(msg.source());
2354 
2355     if (ii == known_.end())
2356     {
2357         gu_trace(handle_foreign(msg));
2358         return;
2359     }
2360 
2361     Node& node(NodeMap::value(ii));
2362     if (direct == true)
2363     {
2364         node.set_seen_tstamp(gu::datetime::Date::monotonic());
2365     }
2366 
2367     if (state() == S_LEAVING && msg.source_view_id() == current_view_.id())
2368     {
2369         // Allow messages in leaving state. This is needed for both
2370         // updating the join messages for retransmission and for handling
2371         // retransmitted messages.
2372         evs_log_debug(D_FOREIGN_MSGS) << "Allow message from current view "
2373                                       << "in leaving state" << msg;
2374     }
2375     else if (node.operational()                 == false &&
2376              node.leave_message()               == 0     &&
2377              (msg.flags() & Message::F_RETRANS) == 0)
2378     {
2379         // We have set this node unoperational and there was
2380         // probably good reason to do so. Don't accept messages
2381         // from it before new view has been formed.
2382         // Exceptions:
2383         // - Node that is leaving
2384         // - Retransmitted messages.
2385 
2386         // why we accept retransimted messages?
2387         // a node sends a message, some nodes(A) get it, but some(B) don't
2388         // then this node is non-operational(or unreachable)
2389         // so A need to send B the missing message(in envelope as delegate message)
2390         // otherwise the input map will not be consistent forever.
2391         // and user message in delegate message always comes with F_RETRANS flag.
2392         evs_log_debug(D_FOREIGN_MSGS)
2393             << " dropping message from unoperational source " << node;
2394         return;
2395     }
2396 
2397     // Filter out non-fifo messages
2398     if (msg.fifo_seq() != -1 && (msg.flags() & Message::F_RETRANS) == 0)
2399     {
2400 
2401         if (node.fifo_seq() >= msg.fifo_seq())
2402         {
2403             evs_log_debug(D_FOREIGN_MSGS)
2404                 << "droppoing non-fifo message " << msg
2405                 << " fifo seq " << node.fifo_seq();
2406             return;
2407         }
2408         else
2409         {
2410             node.set_fifo_seq(msg.fifo_seq());
2411         }
2412     }
2413 
2414     // Accept non-membership messages only from current view
2415     // or from view to be installed
2416     if (msg.is_membership()                  == false                    &&
2417         msg.source_view_id()                 != current_view_.id()    &&
2418         (install_message_                    == 0                     ||
2419          install_message_->install_view_id() != msg.source_view_id()))
2420     {
2421         // If source node seems to be operational but it has proceeded
2422         // into new view, mark it as unoperational in order to create
2423         // intermediate views before re-merge.
2424         if (node.installed()           == true      &&
2425             node.operational()         == true      &&
2426             is_msg_from_previous_view(msg) == false     &&
2427             state()                    != S_LEAVING)
2428         {
2429             if (new_view_logged_ == false)
2430             {
2431                 evs_log_info(I_STATE)
2432                     << " detected new view from operational source "
2433                     << msg.source() << ": "
2434                     << msg.source_view_id();
2435                 new_view_logged_ = true;
2436             }
2437             // Note: Commented out, this causes problems with
2438             // attempt_seq. Newly (remotely?) generated install message
2439             // followed by commit gap may cause undesired
2440             // node inactivation and shift to gather.
2441             //
2442             // set_inactive(msg.source());
2443             // gu_trace(shift_to(S_GATHER, true));
2444         }
2445         evs_log_debug(D_FOREIGN_MSGS)
2446             << "dropping non-membership message from foreign view";
2447         return;
2448     }
2449     else if (NodeMap::value(ii).index() == Node::invalid_index &&
2450              msg.source_view_id()       == current_view_.id())
2451     {
2452         log_warn << "Message from node that claims to come from same view but is not in current view " << msg;
2453         assert(0);
2454         return;
2455     }
2456 
2457     recvd_msgs_[msg.type()]++;
2458 
2459     switch (msg.type())
2460     {
2461     case Message::EVS_T_USER:
2462         gu_trace(handle_user(static_cast<const UserMessage&>(msg), ii, rb));
2463         break;
2464     case Message::EVS_T_DELEGATE:
2465         gu_trace(handle_delegate(static_cast<const DelegateMessage&>(msg), ii, rb));
2466         break;
2467     case Message::EVS_T_GAP:
2468         gu_trace(handle_gap(static_cast<const GapMessage&>(msg), ii));
2469         break;
2470     case Message::EVS_T_JOIN:
2471         gu_trace(handle_join(static_cast<const JoinMessage&>(msg), ii));
2472         break;
2473     case Message::EVS_T_LEAVE:
2474         gu_trace(handle_leave(static_cast<const LeaveMessage&>(msg), ii));
2475         break;
2476     case Message::EVS_T_INSTALL:
2477         gu_trace(handle_install(static_cast<const InstallMessage&>(msg), ii));
2478         break;
2479     case Message::EVS_T_DELAYED_LIST:
2480         gu_trace(handle_delayed_list(
2481                      static_cast<const DelayedListMessage&>(msg), ii));
2482         break;
2483     default:
2484         log_warn << "invalid message type " << msg.type();
2485     }
2486 }
2487 
2488 ////////////////////////////////////////////////////////////////////////
2489 // Protolay interface
2490 ////////////////////////////////////////////////////////////////////////
2491 
unserialize_message(const UUID & source,const Datagram & rb,Message * msg)2492 size_t gcomm::evs::Proto::unserialize_message(const UUID& source,
2493                                               const Datagram& rb,
2494                                               Message* msg)
2495 {
2496     size_t offset;
2497     const gu::byte_t* begin(gcomm::begin(rb));
2498     const size_t available(gcomm::available(rb));
2499     gu_trace(offset = msg->unserialize(begin,
2500                                        available,
2501                                        0));
2502     if ((msg->flags() & Message::F_SOURCE) == 0)
2503     {
2504         assert(source != UUID::nil());
2505         gcomm_assert(source != UUID::nil());
2506         msg->set_source(source);
2507     }
2508 
2509     switch (msg->type())
2510     {
2511     case Message::EVS_T_NONE:
2512         gu_throw_fatal;
2513         break;
2514     case Message::EVS_T_USER:
2515         gu_trace(offset = static_cast<UserMessage&>(*msg).unserialize(
2516                      begin, available, offset, true));
2517         break;
2518     case Message::EVS_T_DELEGATE:
2519         gu_trace(offset = static_cast<DelegateMessage&>(*msg).unserialize(
2520                      begin, available, offset, true));
2521         break;
2522     case Message::EVS_T_GAP:
2523         gu_trace(offset = static_cast<GapMessage&>(*msg).unserialize(
2524                      begin, available, offset, true));
2525         break;
2526     case Message::EVS_T_JOIN:
2527         gu_trace(offset = static_cast<JoinMessage&>(*msg).unserialize(
2528                      begin, available, offset, true));
2529         break;
2530     case Message::EVS_T_INSTALL:
2531         gu_trace(offset = static_cast<InstallMessage&>(*msg).unserialize(
2532                      begin, available, offset, true));
2533         break;
2534     case Message::EVS_T_LEAVE:
2535         gu_trace(offset = static_cast<LeaveMessage&>(*msg).unserialize(
2536                      begin, available, offset, true));
2537         break;
2538     case Message::EVS_T_DELAYED_LIST:
2539         gu_trace(offset = static_cast<DelayedListMessage&>(*msg).unserialize(
2540                      begin, available, offset, true));
2541         break;
2542     }
2543     return (offset + rb.offset());
2544 }
2545 
handle_up(const void * cid,const Datagram & rb,const ProtoUpMeta & um)2546 void gcomm::evs::Proto::handle_up(const void* cid,
2547                                   const Datagram& rb,
2548                                   const ProtoUpMeta& um)
2549 {
2550 
2551     Message msg;
2552 
2553     if (state() == S_CLOSED || um.source() == uuid() || is_evicted(um.source()))
2554     {
2555         // Silent drop
2556         return;
2557     }
2558 
2559     gcomm_assert(um.source() != UUID::nil());
2560 
2561     try
2562     {
2563         size_t offset;
2564         gu_trace(offset = unserialize_message(um.source(), rb, &msg));
2565         handle_msg(msg, Datagram(rb, offset),
2566                    (msg.flags() & Message::F_RETRANS) == 0);
2567     }
2568     catch (gu::Exception& e)
2569     {
2570         switch (e.get_errno())
2571         {
2572         case EPROTONOSUPPORT:
2573             log_warn << e.what();
2574             break;
2575 
2576         case EINVAL:
2577             log_warn << "invalid message: " << msg;
2578             break;
2579 
2580         default:
2581             log_fatal << "exception caused by message: " << msg;
2582             std::cerr << " state after handling message: " << *this;
2583             throw;
2584         }
2585     }
2586 }
2587 
2588 
handle_down(Datagram & wb,const ProtoDownMeta & dm)2589 int gcomm::evs::Proto::handle_down(Datagram& wb, const ProtoDownMeta& dm)
2590 {
2591     if (state() == S_GATHER || state() == S_INSTALL)
2592     {
2593         return EAGAIN;
2594     }
2595 
2596     else if (state() != S_OPERATIONAL)
2597     {
2598         log_warn << "user message in state " << to_string(state());
2599         return ENOTCONN;
2600     }
2601 
2602     if (dm.order() == O_LOCAL_CAUSAL)
2603     {
2604         gu::datetime::Date now(gu::datetime::Date::monotonic());
2605         if (causal_queue_.empty() == true &&
2606             last_sent_ == input_map_->safe_seq() &&
2607             causal_keepalive_period_ > gu::datetime::Period(0) &&
2608             last_causal_keepalive_ + causal_keepalive_period_ > now)
2609         {
2610 
2611             assert(last_sent_ == input_map_->aru_seq());
2612             // Input map should either be empty (all messages
2613             // delivered) or the undelivered messages have higher
2614             // seqno than safe_seq. Even if the delivry is
2615             // done below if needed, this assertion should stay
2616             // to catch errors in logic elsewhere in the code.
2617             assert(input_map_->begin() == input_map_->end() ||
2618                    input_map_->is_safe(input_map_->begin()) == false);
2619 
2620 
2621             if (input_map_->begin() != input_map_->end() &&
2622                 input_map_->is_safe(input_map_->begin()) == true)
2623             {
2624                 gu_trace(deliver());
2625                 if (input_map_->begin() != input_map_->end() &&
2626                     input_map_->is_safe(input_map_->begin()) == true)
2627                 {
2628                     // If the input map state is still not good for fast path,
2629                     // the situation is not likely to clear immediately. Return
2630                     // error to retry later.
2631                     return EAGAIN;
2632                 }
2633             }
2634 
2635             hs_local_causal_.insert(0.0);
2636             deliver_causal(dm.user_type(), last_sent_, wb);
2637         }
2638         else
2639         {
2640             seqno_t causal_seqno(input_map_->aru_seq());
2641             if (causal_keepalive_period_ == gu::datetime::Period(0) ||
2642                 last_causal_keepalive_ + causal_keepalive_period_ <= now)
2643             {
2644                 // generate traffic to make sure that group is live
2645                 Datagram dg;
2646                 int err(send_user(dg, 0xff, O_DROP, -1, -1));
2647                 if (err != 0)
2648                 {
2649                     return err;
2650                 }
2651                 // reassign causal_seqno to be last_sent:
2652                 // in order to make sure that the group is live,
2653                 // safe seqno must be advanced and in this case
2654                 // safe seqno equals to aru seqno.
2655                 causal_seqno = last_sent_;
2656                 last_causal_keepalive_ = now;
2657             }
2658             causal_queue_.push_back(CausalMessage(dm.user_type(),
2659                                                   causal_seqno, wb));
2660         }
2661         return 0;
2662     }
2663 
2664     // Limit outbound bytes to out_queue::max_outbound_bytes (1MB)
2665     // to limit the time it takes to transmit all outbound messages
2666     // during configuration change.
2667     if (output_.outbound_bytes() >= out_queue::max_outbound_bytes)
2668     {
2669         return EAGAIN;
2670     }
2671 
2672     send_queue_s_ += output_.size();
2673     ++n_send_queue_s_;
2674 
2675     int ret = 0;
2676 
2677     if (output_.empty() == true)
2678     {
2679         int err;
2680         err = send_user(wb,
2681                         dm.user_type(),
2682                         dm.order(),
2683                         user_send_window_,
2684                         -1);
2685 
2686         switch (err)
2687         {
2688         case EAGAIN:
2689             output_.push_back(std::make_pair(wb, dm));
2690             // fall through
2691         case 0:
2692             ret = 0;
2693             break;
2694         default:
2695             log_error << "send error: " << err;
2696             ret = err;
2697         }
2698     }
2699     else
2700     {
2701         output_.push_back(std::make_pair(wb, dm));
2702     }
2703 
2704     return ret;
2705 }
2706 
send_down(Datagram & dg,const ProtoDownMeta & dm)2707 int gcomm::evs::Proto::send_down(Datagram& dg, const ProtoDownMeta& dm)
2708 {
2709     if (isolation_end_ != gu::datetime::Date::zero())
2710     {
2711         // Node has isolated itself, don't emit any messages
2712         return 0;
2713     }
2714     else
2715     {
2716         return Protolay::send_down(dg, dm);
2717     }
2718 }
2719 
2720 
2721 /////////////////////////////////////////////////////////////////////////////
2722 // State handler
2723 /////////////////////////////////////////////////////////////////////////////
2724 
shift_to(const State s,const bool send_j)2725 void gcomm::evs::Proto::shift_to(const State s, const bool send_j)
2726 {
2727     if (shift_to_rfcnt_ > 0) gu_throw_fatal << *this;
2728 
2729     shift_to_rfcnt_++;
2730 
2731     static const bool allowed[S_MAX][S_MAX] = {
2732         // CLOSED JOINING LEAVING GATHER INSTALL OPERAT
2733         {  false,  true,   false, false, false,  false }, // CLOSED
2734 
2735         {  false,  false,  true,  true,  false,  false }, // JOINING
2736 
2737         {  true,   false,  false, false, false,  false }, // LEAVING
2738 
2739         {  false,  false,  true,  true,  true,   false }, // GATHER
2740 
2741         {  false,  false,  false, true,  false,  true  },  // INSTALL
2742 
2743         {  false,  false,  true,  true,  false,  false }  // OPERATIONAL
2744     };
2745 
2746     assert(s < S_MAX);
2747 
2748     if (allowed[state_][s] == false) {
2749         gu_throw_fatal << "Forbidden state transition: "
2750                        << to_string(state_) << " -> " << to_string(s);
2751     }
2752 
2753     if (state() != s)
2754     {
2755         evs_log_info(I_STATE) << " state change: "
2756                               << to_string(state_) << " -> " << to_string(s);
2757     }
2758     switch (s) {
2759     case S_CLOSED:
2760     {
2761         gcomm_assert(state() == S_LEAVING);
2762         gu_trace(deliver());
2763         gu_trace(deliver_local());
2764         setall_installed(false);
2765         NodeMap::value(self_i_).set_installed(true);
2766         // Construct install message containing only one node for
2767         // last trans view.
2768         MessageNodeList node_list;
2769         (void)node_list.insert_unique(
2770             std::make_pair(uuid(),
2771                            MessageNode(true,
2772                                        false,
2773                                        NodeMap::value(self_i_).segment(),
2774                                        false,
2775                                        -1,
2776                                        current_view_.id(),
2777                                        input_map_->safe_seq(
2778                                            NodeMap::value(self_i_).index()),
2779                                        input_map_->range(
2780                                            NodeMap::value(self_i_).index()))));
2781         InstallMessage im(0,
2782                           uuid(),
2783                           current_view_.id(),
2784                           ViewId(V_REG, uuid(), current_view_.id().seq() + 1),
2785                           input_map_->safe_seq(),
2786                           input_map_->aru_seq(),
2787                           ++fifo_seq_,
2788                           node_list);
2789         gu_trace(deliver_trans_view(im, current_view_));
2790         gu_trace(deliver_trans());
2791         gu_trace(deliver_local(true));
2792         gcomm_assert(causal_queue_.empty() == true);
2793         if (collect_stats_ == true)
2794         {
2795             handle_stats_timer();
2796         }
2797         gu_trace(deliver_empty_view());
2798         cleanup_foreign(im);
2799         cleanup_views();
2800         timers_.clear();
2801         state_ = S_CLOSED;
2802         break;
2803     }
2804     case S_JOINING:
2805         state_ = S_JOINING;
2806         reset_timer(T_STATS);
2807         break;
2808     case S_LEAVING:
2809         state_ = S_LEAVING;
2810         reset_timer(T_INACTIVITY);
2811         reset_timer(T_RETRANS);
2812         reset_timer(T_INSTALL);
2813         break;
2814     case S_GATHER:
2815     {
2816         setall_committed(false);
2817         setall_installed(false);
2818         delete install_message_;
2819         install_message_ = 0;
2820 
2821         if (state() == S_OPERATIONAL)
2822         {
2823             while (output_.empty() == false)
2824             {
2825                 int err;
2826                 gu_trace(err = send_user(-1));
2827                 if (err != 0)
2828                 {
2829                     gu_throw_fatal << self_string()
2830                                    << "send_user() failed in shifto "
2831                                    << "to S_GATHER: "
2832                                    << strerror(err);
2833                 }
2834             }
2835         }
2836         else
2837         {
2838             gcomm_assert(output_.empty() == true);
2839         }
2840 
2841         State prev_state(state_);
2842         state_ = S_GATHER;
2843         if (send_j == true)
2844         {
2845             gu_trace(send_join(false));
2846         }
2847         gcomm_assert(state() == S_GATHER);
2848         reset_timer(T_INACTIVITY);
2849         if (prev_state == S_OPERATIONAL || prev_state == S_JOINING)
2850         {
2851             reset_timer(T_RETRANS);
2852             reset_timer(T_INSTALL);
2853         }
2854         break;
2855     }
2856     case S_INSTALL:
2857     {
2858         gcomm_assert(install_message_ != 0);
2859         gcomm_assert(is_all_committed() == true);
2860         state_ = S_INSTALL;
2861         reset_timer(T_INACTIVITY);
2862         reset_timer(T_RETRANS);
2863         break;
2864     }
2865     case S_OPERATIONAL:
2866     {
2867         gcomm_assert(output_.empty() == true);
2868         gcomm_assert(install_message_ != 0);
2869         gcomm_assert(NodeMap::value(self_i_).join_message() != 0 &&
2870                      consensus_.equal(
2871                          *NodeMap::value(self_i_).join_message(),
2872                          *install_message_))
2873             << "install message not consistent with own join, state: " << *this;
2874         gcomm_assert(is_all_installed() == true);
2875         gu_trace(deliver());
2876         gu_trace(deliver_local());
2877         gu_trace(deliver_trans_view(*install_message_, current_view_));
2878         gu_trace(deliver_trans());
2879         gu_trace(deliver_local(true));
2880         gcomm_assert(causal_queue_.empty() == true);
2881         input_map_->clear();
2882         if (collect_stats_ == true)
2883         {
2884             handle_stats_timer();
2885         }
2886         // End of previous view
2887 
2888         // Construct new view and shift to S_OPERATIONAL before calling
2889         // deliver_reg_view(). Reg view delivery may trigger message
2890         // exchange on upper layer and operating view is needed to
2891         // handle messages.
2892 
2893         previous_view_ = current_view_;
2894         std::copy(gather_views_.begin(), gather_views_.end(),
2895                   std::inserter(previous_views_, previous_views_.end()));
2896         gather_views_.clear();
2897 
2898         if (install_message_->version() > current_view_.version())
2899         {
2900             log_info << "EVS version upgrade " << current_view_.version()
2901                      << " -> " << static_cast<int>(install_message_->version());
2902         }
2903         else if (install_message_->version() < current_view_.version())
2904         {
2905             log_info << "EVS version downgrade " << current_view_.version()
2906                      << " -> " << static_cast<int>(install_message_->version());
2907         }
2908 
2909         current_view_ = View(install_message_->version(),
2910                              install_message_->install_view_id());
2911         size_t idx = 0;
2912 
2913         const MessageNodeList& imnl(install_message_->node_list());
2914 
2915         for (MessageNodeList::const_iterator i(imnl.begin());
2916              i != imnl.end(); ++i)
2917         {
2918             const UUID& uuid(MessageNodeList::key(i));
2919             const MessageNode& n(MessageNodeList::value(i));
2920 
2921             // Add operational nodes to new view, assign input map index
2922             NodeMap::iterator nmi(known_.find(uuid));
2923             gcomm_assert(nmi != known_.end()) << "node " << uuid
2924                                               << " not found from known map";
2925             if (n.operational() == true)
2926             {
2927                 current_view_.add_member(uuid, NodeMap::value(nmi).segment());
2928                 NodeMap::value(nmi).set_index(idx++);
2929             }
2930             else
2931             {
2932                 NodeMap::value(nmi).set_index(
2933                     Node::invalid_index);
2934             }
2935 
2936         }
2937 
2938         if (previous_view_.id().type() == V_REG &&
2939             previous_view_.members() == current_view_.members())
2940         {
2941             evs_log_info(I_VIEWS)
2942                 << "subsequent views have same members, prev view "
2943                 << previous_view_ << " current view " << current_view_;
2944         }
2945 
2946         input_map_->reset(current_view_.members().size());
2947         last_sent_ = -1;
2948         state_ = S_OPERATIONAL;
2949         deliver_reg_view(*install_message_, previous_view_);
2950 
2951         cleanup_foreign(*install_message_);
2952         cleanup_views();
2953         cleanup_joins();
2954 
2955         delete install_message_;
2956         install_message_ = 0;
2957         attempt_seq_ = 1;
2958         install_timeout_count_ = 0;
2959         gu_trace(send_gap(EVS_CALLER, UUID::nil(), current_view_.id(), Range()));;
2960         gcomm_assert(state() == S_OPERATIONAL);
2961         reset_timer(T_INACTIVITY);
2962         reset_timer(T_RETRANS);
2963         cancel_timer(T_INSTALL);
2964         new_view_logged_ = false;
2965         break;
2966     }
2967     default:
2968         gu_throw_fatal << "invalid state";
2969     }
2970     shift_to_rfcnt_--;
2971 }
2972 
2973 ////////////////////////////////////////////////////////////////////////////
2974 // Message delivery
2975 ////////////////////////////////////////////////////////////////////////////
2976 
deliver_causal(uint8_t user_type,seqno_t seqno,const Datagram & datagram)2977 void gcomm::evs::Proto::deliver_causal(uint8_t user_type,
2978                                        seqno_t seqno,
2979                                        const Datagram& datagram)
2980 {
2981     send_up(datagram, ProtoUpMeta(uuid(),
2982                                   current_view_.id(),
2983                                   0,
2984                                   user_type,
2985                                   O_LOCAL_CAUSAL,
2986                                   seqno));
2987     ++delivered_msgs_[O_LOCAL_CAUSAL];
2988 }
2989 
2990 
deliver_local(bool trans)2991 void gcomm::evs::Proto::deliver_local(bool trans)
2992 {
2993     // local causal
2994     const seqno_t causal_seq(trans == false ? input_map_->safe_seq() : last_sent_);
2995     gu::datetime::Date now(gu::datetime::Date::monotonic());
2996 
2997     assert(input_map_->begin() == input_map_->end() ||
2998            input_map_->is_safe(input_map_->begin()) == false);
2999 
3000     while (causal_queue_.empty() == false &&
3001            causal_queue_.front().seqno() <= causal_seq)
3002     {
3003         const CausalMessage& cm(causal_queue_.front());
3004         hs_local_causal_.insert(double(now.get_utc() - cm.tstamp().get_utc())/gu::datetime::Sec);
3005         deliver_causal(cm.user_type(), cm.seqno(), cm.datagram());
3006         causal_queue_.pop_front();
3007     }
3008 }
3009 
validate_reg_msg(const UserMessage & msg)3010 void gcomm::evs::Proto::validate_reg_msg(const UserMessage& msg)
3011 {
3012     if (msg.source_view_id() != current_view_.id())
3013     {
3014         // Note: This implementation should guarantee same view delivery,
3015         // this is sanity check for that.
3016         gu_throw_fatal << "reg validate: not current view";
3017     }
3018 
3019     // Update statistics for locally generated messages
3020     if (msg.source() == uuid())
3021     {
3022         if (msg.order() == O_SAFE)
3023         {
3024             gu::datetime::Date now(gu::datetime::Date::monotonic());
3025             double lat(double(now.get_utc() - msg.tstamp().get_utc())/
3026                        gu::datetime::Sec);
3027             if (info_mask_ & I_STATISTICS) hs_safe_.insert(lat);
3028             safe_deliv_latency_.insert(lat);
3029         }
3030         else if (msg.order() == O_AGREED)
3031         {
3032             if (info_mask_ & I_STATISTICS)
3033             {
3034                 gu::datetime::Date now(gu::datetime::Date::monotonic());
3035                 hs_agreed_.insert(double(now.get_utc() - msg.tstamp().get_utc())/gu::datetime::Sec);
3036             }
3037         }
3038     }
3039 }
3040 
3041 
deliver_finish(const InputMapMsg & msg)3042 void gcomm::evs::Proto::deliver_finish(const InputMapMsg& msg)
3043 {
3044     if ((msg.msg().flags() & Message::F_AGGREGATE) == 0)
3045     {
3046         ++delivered_msgs_[msg.msg().order()];
3047         if (msg.msg().order() != O_DROP)
3048         {
3049             gu_trace(validate_reg_msg(msg.msg()));
3050             ProtoUpMeta um(msg.msg().source(),
3051                            msg.msg().source_view_id(),
3052                            0,
3053                            msg.msg().user_type(),
3054                            msg.msg().order(),
3055                            msg.msg().seq());
3056             try
3057             {
3058                 send_up(msg.rb(), um);
3059             }
3060             catch (...)
3061             {
3062                 log_info << msg.msg() << " " << msg.rb().len();
3063                 throw;
3064             }
3065         }
3066     }
3067     else
3068     {
3069         gu_trace(validate_reg_msg(msg.msg()));
3070         size_t offset(0);
3071         while (offset < msg.rb().len())
3072         {
3073             ++delivered_msgs_[msg.msg().order()];
3074             AggregateMessage am;
3075             gu_trace(am.unserialize(msg.rb().payload().data(),
3076                                     msg.rb().payload().size(),
3077                                     offset));
3078             Datagram dg(
3079                 gu::SharedBuffer(
3080                     new gu::Buffer(
3081                         msg.rb().payload().data()
3082                         + offset
3083                         + am.serial_size(),
3084                         msg.rb().payload().data()
3085                         + offset
3086                         + am.serial_size()
3087                         + am.len())));
3088             ProtoUpMeta um(msg.msg().source(),
3089                            msg.msg().source_view_id(),
3090                            0,
3091                            am.user_type(),
3092                            msg.msg().order(),
3093                            msg.msg().seq());
3094             gu_trace(send_up(dg, um));
3095             offset += am.serial_size() + am.len();
3096         }
3097         gcomm_assert(offset == msg.rb().len());
3098     }
3099 }
3100 
deliver()3101 void gcomm::evs::Proto::deliver()
3102 {
3103     if (delivering_ == true)
3104     {
3105         gu_throw_fatal << "Recursive enter to delivery";
3106     }
3107 
3108     delivering_ = true;
3109 
3110     if (state() != S_OPERATIONAL &&
3111         state() != S_GATHER      &&
3112         state() != S_INSTALL     &&
3113         state() != S_LEAVING)
3114     {
3115         gu_throw_fatal << "invalid state: " << to_string(state());
3116     }
3117 
3118     evs_log_debug(D_DELIVERY)
3119         << " aru_seq="   << input_map_->aru_seq()
3120         << " safe_seq=" << input_map_->safe_seq();
3121 
3122     // Read input map head until a message which cannot be
3123     // delivered is enountered.
3124     InputMapMsgIndex::iterator i;
3125     while ((i = input_map_->begin()) != input_map_->end())
3126     {
3127         const InputMapMsg& msg(InputMapMsgIndex::value(i));
3128         if ((msg.msg().order() <= O_SAFE &&
3129              input_map_->is_safe(i) == true) ||
3130             (msg.msg().order() <= O_AGREED &&
3131              input_map_->is_agreed(i) == true) ||
3132             (msg.msg().order() <= O_FIFO &&
3133              input_map_->is_fifo(i) == true))
3134         {
3135             deliver_finish(msg);
3136             gu_trace(input_map_->erase(i));
3137         }
3138         else
3139         {
3140             if (msg.msg().order() > O_SAFE)
3141             {
3142                 gu_throw_fatal << "Message with order " << msg.msg().order()
3143                                << " in input map, cannot continue safely";
3144             }
3145             break;
3146         }
3147     }
3148     delivering_ = false;
3149 
3150     assert(input_map_->begin() == input_map_->end() ||
3151            input_map_->is_safe(input_map_->begin()) == false);
3152 
3153 }
3154 
3155 
deliver_trans()3156 void gcomm::evs::Proto::deliver_trans()
3157 {
3158     if (delivering_ == true)
3159     {
3160         gu_throw_fatal << "Recursive enter to delivery";
3161     }
3162 
3163     delivering_ = true;
3164 
3165     if (state() != S_INSTALL &&
3166         state() != S_LEAVING)
3167         gu_throw_fatal << "invalid state";
3168 
3169     evs_log_debug(D_DELIVERY)
3170         << " aru_seq="  << input_map_->aru_seq()
3171         << " safe_seq=" << input_map_->safe_seq();
3172 
3173     // In transitional configuration we must deliver all messages that
3174     // are fifo. This is because:
3175     // - We know that it is possible to deliver all fifo messages originated
3176     //   from partitioned component as safe in partitioned component
3177     // - Aru in this component is at least the max known fifo seq
3178     //   from partitioned component due to message recovery
3179     // - All FIFO messages originated from this component must be
3180     //   delivered to fulfill self delivery requirement and
3181     // - FIFO messages originated from this component qualify as AGREED
3182     //   in transitional configuration
3183 
3184     InputMap::iterator i, i_next;
3185     for (i = input_map_->begin(); i != input_map_->end(); i = i_next)
3186     {
3187         i_next = i;
3188         ++i_next;
3189         const InputMapMsg& msg(InputMapMsgIndex::value(i));
3190         bool deliver = false;
3191         switch (msg.msg().order())
3192         {
3193         case O_SAFE:
3194         case O_AGREED:
3195         case O_FIFO:
3196         case O_DROP:
3197             if (input_map_->is_fifo(i) == true)
3198             {
3199                 deliver = true;
3200             }
3201             break;
3202         default:
3203             gu_throw_fatal;
3204         }
3205 
3206         if (deliver == true)
3207         {
3208             if (install_message_ != 0)
3209             {
3210                 const MessageNode& mn(
3211                     MessageNodeList::value(
3212                         install_message_->node_list().find_checked(
3213                             msg.msg().source())));
3214                 if (msg.msg().seq() <= mn.im_range().hs())
3215                 {
3216                     deliver_finish(msg);
3217                 }
3218                 else
3219                 {
3220                     gcomm_assert(mn.operational() == false);
3221                     log_info << "filtering out trans message higher than "
3222                              << "install message hs "
3223                              << mn.im_range().hs()
3224                              << ": " << msg.msg();
3225                 }
3226             }
3227             else
3228             {
3229                 deliver_finish(msg);
3230             }
3231             gu_trace(input_map_->erase(i));
3232         }
3233     }
3234 
3235     // Sanity check:
3236     // There must not be any messages left that
3237     // - Are originated from outside of trans conf and are FIFO
3238     // - Are originated from trans conf
3239     for (i = input_map_->begin(); i != input_map_->end(); i = i_next)
3240     {
3241         i_next = i;
3242         ++i_next;
3243         const InputMapMsg& msg(InputMapMsgIndex::value(i));
3244         NodeMap::iterator ii;
3245         gu_trace(ii = known_.find_checked(msg.msg().source()));
3246 
3247         if (NodeMap::value(ii).installed() == true)
3248         {
3249             gu_throw_fatal << "Protocol error in transitional delivery "
3250                            << "(self delivery constraint)";
3251         }
3252         else if (input_map_->is_fifo(i) == true)
3253         {
3254             gu_throw_fatal << "Protocol error in transitional delivery "
3255                            << "(fifo from partitioned component)";
3256         }
3257         gu_trace(input_map_->erase(i));
3258     }
3259     delivering_ = false;
3260 }
3261 
3262 
3263 /////////////////////////////////////////////////////////////////////////////
3264 // Message handlers
3265 /////////////////////////////////////////////////////////////////////////////
3266 
3267 
3268 
update_im_safe_seq(const size_t uuid,const seqno_t seq)3269 gcomm::evs::seqno_t gcomm::evs::Proto::update_im_safe_seq(const size_t uuid,
3270                                                           const seqno_t seq)
3271 {
3272     const seqno_t im_safe_seq(input_map_->safe_seq(uuid));
3273     if (im_safe_seq  < seq)
3274     {
3275         input_map_->set_safe_seq(uuid, seq);
3276     }
3277     return im_safe_seq;
3278 }
3279 
send_request_retrans_gap(const UUID & target,const UUID & origin,const Range & range)3280 void gcomm::evs::Proto::send_request_retrans_gap(const UUID& target,
3281                                                  const UUID& origin,
3282                                                  const Range& range)
3283 {
3284     GapMessage gm(version_,
3285                   uuid(),
3286                   current_view_.id(),
3287                   last_sent_,
3288                   input_map_->aru_seq(),
3289                   ++fifo_seq_,
3290                   origin,
3291                   range,
3292                   Message::F_RETRANS);
3293     gu::Buffer buf;
3294     serialize(gm, buf);
3295     Datagram dg(buf);
3296     int err = send_down(dg, ProtoDownMeta(target));
3297     if (err != 0)
3298     {
3299         log_debug << "send failed: " << strerror(err);
3300     }
3301     sent_msgs_[Message::EVS_T_GAP]++;
3302 }
3303 
request_retrans(const UUID & target,const UUID & origin,const Range & range)3304 void gcomm::evs::Proto::request_retrans(const UUID& target, const UUID& origin,
3305                                         const Range& range)
3306 {
3307     NodeMap::const_iterator origin_node_i(known_.find(origin));
3308     assert(origin_node_i != known_.end());
3309     if (origin_node_i == known_.end())
3310     {
3311         log_warn << "Origin " << origin << " not found from known nodes";
3312         return;
3313     }
3314     const Node& origin_node(NodeMap::value(origin_node_i));
3315     if (origin_node.index() == Node::invalid_index)
3316     {
3317         log_warn << "Origin " << origin << " has no index";
3318         return;
3319     }
3320     if (not gap_rate_limit(target, range))
3321     {
3322         evs_log_debug(D_RETRANS) << self_string()
3323                                  << " requesting retrans from " << target
3324                                  << " origin " << origin
3325                                  << " range " << range
3326                                  << " due to input map gap, aru "
3327                                  << input_map_->aru_seq();
3328         std::vector<Range> gap_ranges(input_map_->gap_range_list(
3329                                           origin_node.index(), range));
3330         for (std::vector<Range>::const_iterator ri(gap_ranges.begin());
3331              ri != gap_ranges.end(); ++ri)
3332         {
3333             evs_log_debug(D_RETRANS)
3334                 << "Requesting retransmssion from " << target
3335                 << " origin: " << origin
3336                 << " range: " << *ri;
3337             send_request_retrans_gap(target, origin, *ri);
3338         }
3339         NodeMap::iterator target_i(known_.find(target));
3340         if (target_i != known_.end())
3341         {
3342             target_i->second.last_requested_range(range);
3343         }
3344     }
3345 }
3346 
3347 // Select suitable node for recovering missing messages. The node
3348 // is chosen to be one with join message originating from the same
3349 // view and highest lowest unseen for origin.
3350 
3351 struct SelectRecoveryNodeForMissingResult
3352 {
3353     gcomm::evs::seqno_t lowest_unseen;
3354     gcomm::UUID target;
SelectRecoveryNodeForMissingResultSelectRecoveryNodeForMissingResult3355     SelectRecoveryNodeForMissingResult()
3356         : lowest_unseen(-1)
3357         , target()
3358     { }
3359 };
3360 
3361 class SelectRecoveryNodeForMissing
3362 {
3363 public:
SelectRecoveryNodeForMissing(const gcomm::evs::Proto & evs,const gcomm::UUID & origin,const gcomm::ViewId & view_id,SelectRecoveryNodeForMissingResult & result)3364     SelectRecoveryNodeForMissing(const gcomm::evs::Proto& evs,
3365                                  const gcomm::UUID& origin,
3366                                  const gcomm::ViewId& view_id,
3367                                  SelectRecoveryNodeForMissingResult&
3368                                  result /* Out parameter */)
3369         : evs_(evs)
3370         , origin_(origin)
3371         , view_id_(view_id)
3372         , result_(result)
3373     { }
3374 
operator ()(const gcomm::evs::NodeMap::value_type & node_v)3375     void operator()(const gcomm::evs::NodeMap::value_type& node_v)
3376     {
3377         // Do not try to recover from self.
3378         if (evs_.uuid() == node_v.first) return;
3379 
3380         if (node_v.second.operational())
3381         {
3382             gcomm::evs::seqno_t lu(get_lu_for(origin_, node_v.second));
3383             if (lu > result_.lowest_unseen)
3384             {
3385                 result_.lowest_unseen = lu;
3386                 result_.target = node_v.first;
3387             }
3388         }
3389     }
3390 
3391 private:
get_lu_from_join_for(const gcomm::UUID & origin,const gcomm::evs::JoinMessage & jm)3392     gcomm::evs::seqno_t get_lu_from_join_for(const gcomm::UUID& origin,
3393                                              const gcomm::evs::JoinMessage& jm)
3394     {
3395         gcomm::evs::MessageNodeList::const_iterator origin_i(
3396             jm.node_list().find(origin));
3397         if (origin_i != jm.node_list().end())
3398         {
3399             return origin_i->second.im_range().lu();
3400         }
3401         return -1;
3402     }
3403 
get_lu_for(const gcomm::UUID & origin,const gcomm::evs::Node & node)3404     gcomm::evs::seqno_t get_lu_for(const gcomm::UUID& origin,
3405                                    const gcomm::evs::Node& node)
3406     {
3407         const gcomm::evs::JoinMessage* jm(node.join_message());
3408         // No join message received
3409         if (not jm) return -1;
3410         // Not in the same view
3411         if (jm->source_view_id() != view_id_) return -1;
3412         return get_lu_from_join_for(origin, *jm);
3413     }
3414 
3415     const gcomm::evs::Proto& evs_;
3416     const gcomm::UUID& origin_;
3417     const gcomm::ViewId& view_id_;
3418     SelectRecoveryNodeForMissingResult& result_; // Reference to out parameter
3419 };
3420 
request_missing()3421 void gcomm::evs::Proto::request_missing()
3422 {
3423     // This method should be called only during configuration changes.
3424     // In operational state requests should be done based on
3425     // detected gaps and on delayed node checks.
3426     assert(state() != S_OPERATIONAL);
3427     for (NodeMap::const_iterator node_i(known_.begin()); node_i != known_.end();
3428          ++node_i)
3429     {
3430         const UUID& origin(node_i->first);
3431         if (origin == my_uuid_) continue; // No need to request from self.
3432         const Node& node(node_i->second);
3433         // Node has no index assigned, so it was not in the current group.
3434         if (node.index() == Node::invalid_index) continue;
3435 
3436         Range range(input_map_->range(node.index()));
3437         if ((not range.is_empty() || range.hs() < last_sent_) &&
3438             (node.leave_message() == 0 ||
3439              node.leave_message()->seq() > range.hs()))
3440         {
3441             // Missing messages from node. If it is still considerd operational,
3442             // send a retransimission request to it. Otherwise locate some
3443             // other node to recover the missing messages.
3444             if (node.operational())
3445             {
3446                 const Range request_range(range.lu(), last_sent_);
3447                 if (not request_range.is_empty())
3448                 {
3449                     request_retrans(origin, origin, request_range);
3450                 }
3451             }
3452             else
3453             {
3454                 // Try to find suitable node to recover the missing messages
3455                 // from origin.
3456                 SelectRecoveryNodeForMissingResult result;
3457                 std::for_each(known_.begin(), known_.end(),
3458                               SelectRecoveryNodeForMissing(
3459                                   *this, origin, current_view_.id(), result));
3460                 // If the target node was found, it has messages up to
3461                 // result.lowest_unseen - 1 from origin.
3462                 const Range request_range(range.lu(), result.lowest_unseen - 1);
3463                 if (result.target != UUID::nil() && not request_range.is_empty())
3464                 {
3465                     request_retrans(result.target, origin, request_range);
3466                 }
3467                 else
3468                 {
3469                     evs_log_debug(D_RETRANS)
3470                         << "Could not find a node to recover messages "
3471                         << "from, missing from " << origin
3472                         << " range: " << range
3473                         << " last_sent: " << last_sent_;
3474                 }
3475             }
3476         }
3477     }
3478 }
3479 
3480 class ResendMissingRanges
3481 {
3482 public:
ResendMissingRanges(gcomm::evs::Proto & evs,gcomm::evs::seqno_t last_sent,const gcomm::ViewId & view_id)3483     ResendMissingRanges(gcomm::evs::Proto& evs,
3484                         gcomm::evs::seqno_t last_sent,
3485                         const gcomm::ViewId& view_id)
3486         : evs_(evs)
3487         , last_sent_(last_sent)
3488         , view_id_(view_id)
3489     { }
3490 
operator ()(const gcomm::evs::NodeMap::value_type & node_v)3491     void operator()(const gcomm::evs::NodeMap::value_type& node_v)
3492     {
3493         if (node_v.first == evs_.uuid()) return; // No need to inspect self
3494 
3495         const gcomm::evs::JoinMessage* jm(node_v.second.join_message());
3496         if (jm && jm->source_view_id() == view_id_)
3497         {
3498             resend_missing_from_join_message(*jm);
3499         }
3500 
3501         const gcomm::evs::LeaveMessage* lm(node_v.second.leave_message());
3502         if (lm && lm->source_view_id() == view_id_)
3503         {
3504             resend_missing_from_leave_message(*lm);
3505         }
3506     }
3507 
3508 private:
resend_missing_from_join_message(const gcomm::evs::JoinMessage & jm)3509     void resend_missing_from_join_message(const gcomm::evs::JoinMessage& jm)
3510     {
3511         gcomm::evs::MessageNodeList::const_iterator self_i(
3512             jm.node_list().find(evs_.uuid()));
3513         if (self_i == jm.node_list().end())
3514         {
3515             log_warn << "Node join message claims to be from the same "
3516                      << "view but does not list this node, "
3517                      << "own uuid: " << evs_.uuid()
3518                      << " join message: " << jm;
3519             return;
3520         }
3521         if (self_i->second.im_range().lu() <= last_sent_)
3522         {
3523             evs_.resend(jm.source(),
3524                         gcomm::evs::Range(self_i->second.im_range().lu(),
3525                                           last_sent_));
3526         }
3527     }
3528 
resend_missing_from_leave_message(const gcomm::evs::LeaveMessage & lm)3529     void resend_missing_from_leave_message(const gcomm::evs::LeaveMessage& lm)
3530     {
3531         if (lm.aru_seq() < last_sent_)
3532         {
3533             evs_.resend(lm.source(),
3534                         gcomm::evs::Range(lm.aru_seq() + 1, last_sent_));
3535         }
3536     }
3537 
3538     gcomm::evs::Proto& evs_;
3539     const gcomm::evs::seqno_t last_sent_;
3540     const gcomm::ViewId& view_id_;
3541 };
3542 
retrans_missing()3543 void gcomm::evs::Proto::retrans_missing()
3544 {
3545     // This method should be called only during configuration changes.
3546     // In operational state retransmits should happen only by
3547     // responding to retrans request Gap messages.
3548     assert(state() != S_OPERATIONAL);
3549 
3550     // Iterate over join messages and retransmit is some nodes
3551     // have not received all messages.
3552     ResendMissingRanges resend_missing(*this, last_sent_, current_view_.id());
3553     std::for_each(known_.begin(), known_.end(), resend_missing);
3554 }
3555 
handle_user_from_different_view(const Node & source_node,const UserMessage & msg)3556 void gcomm::evs::Proto::handle_user_from_different_view(
3557     const Node& source_node, const UserMessage& msg)
3558 {
3559     if (state() == S_LEAVING)
3560     {
3561         // Silent drop
3562         return;
3563     }
3564 
3565     if (is_msg_from_previous_view(msg) == true)
3566     {
3567         evs_log_debug(D_FOREIGN_MSGS) << "user message "
3568                                       << msg
3569                                       << " from previous view";
3570         return;
3571     }
3572 
3573     if (source_node.operational() == false)
3574     {
3575         evs_log_debug(D_STATE)
3576             << "dropping message from unoperational source "
3577             << msg.source();
3578     }
3579     else if (source_node.installed() == false)
3580     {
3581         if (install_message_ != 0 &&
3582             msg.source_view_id() == install_message_->install_view_id())
3583         {
3584             assert(state() == S_GATHER || state() == S_INSTALL);
3585             evs_log_debug(D_STATE) << " recovery user message "
3586                                    << msg;
3587 
3588             // This is possible if install timer expires just before
3589             // new view is established on this source_node and retransmitted
3590             // install message is received just before user this message.
3591             if (state() == S_GATHER)
3592             {
3593                 // Sanity check
3594                 MessageNodeList::const_iterator self(
3595                     install_message_->node_list().find(uuid()));
3596                 gcomm_assert(self != install_message_->node_list().end()
3597                              && MessageNodeList::value(self).operational() == true);
3598                 // Mark all operational nodes in install message as
3599                 // committed
3600                 for (MessageNodeList::const_iterator
3601                          mi = install_message_->node_list().begin();
3602                      mi != install_message_->node_list().end(); ++mi)
3603                 {
3604                     if (MessageNodeList::value(mi).operational() == true)
3605                     {
3606                         NodeMap::iterator jj;
3607                         gu_trace(jj = known_.find_checked(
3608                                      MessageNodeList::key(mi)));
3609                         NodeMap::value(jj).set_committed(true);
3610                     }
3611                 }
3612                 shift_to(S_INSTALL);
3613             }
3614 
3615             // Other instances installed view before this one, so it is
3616             // safe to shift to S_OPERATIONAL
3617 
3618             // Mark all operational nodes in install message as installed
3619             for (MessageNodeList::const_iterator
3620                      mi = install_message_->node_list().begin();
3621                  mi != install_message_->node_list().end(); ++mi)
3622             {
3623                 if (MessageNodeList::value(mi).operational() == true)
3624                 {
3625                     NodeMap::iterator jj;
3626                     gu_trace(jj = known_.find_checked(
3627                                  MessageNodeList::key(mi)));
3628                     NodeMap::value(jj).set_installed(true);
3629                 }
3630             }
3631 
3632             gu_trace(shift_to(S_OPERATIONAL));
3633             if (pending_leave_ == true)
3634             {
3635                 close();
3636             }
3637         }
3638     }
3639     else
3640     {
3641         log_debug << self_string() << " unhandled user message " << msg;
3642     }
3643 }
3644 
handle_user(const UserMessage & msg,NodeMap::iterator ii,const Datagram & rb)3645 void gcomm::evs::Proto::handle_user(const UserMessage& msg,
3646                                     NodeMap::iterator ii,
3647                                     const Datagram& rb)
3648 
3649 {
3650     assert(ii != known_.end());
3651     assert(state() != S_CLOSED && state() != S_JOINING);
3652     Node& inst(NodeMap::value(ii));
3653 
3654     evs_log_debug(D_USER_MSGS) << "received " << msg;
3655 
3656     if (msg.source_view_id() != current_view_.id())
3657     {
3658         handle_user_from_different_view(inst, msg);
3659         // Handling user message from different view may cause shift
3660         // to operational or leaving state. Check the view ID again and if it
3661         // matches to current view proceed to handling the message.
3662         if (msg.source_view_id() != current_view_.id())
3663         {
3664             return;
3665         }
3666         assert(state() == S_OPERATIONAL || state() == S_LEAVING);
3667     }
3668 
3669     if (install_message_)
3670     {
3671         // Install message has been received, which means that the
3672         // members of the group already got into agreement about the
3673         // set of delivered messages.
3674         return;
3675     }
3676 
3677     Range range;
3678     Range prev_range;
3679     seqno_t prev_aru;
3680     seqno_t prev_safe;
3681 
3682     prev_aru = input_map_->aru_seq();
3683     prev_range = input_map_->range(inst.index());
3684 
3685     // Insert only if msg seq is greater or equal than current lowest unseen
3686     if (msg.seq() >= prev_range.lu())
3687     {
3688         Datagram im_dgram(rb, rb.offset());
3689         im_dgram.normalize();
3690         gu_trace(range = input_map_->insert(inst.index(), msg, im_dgram));
3691         if (range.lu() > prev_range.lu())
3692         {
3693             inst.set_tstamp(gu::datetime::Date::monotonic());
3694         }
3695         else
3696         {
3697             evs_log_debug(D_USER_MSGS)
3698                 << "Not timestamping due to user msg: range.lu: "
3699                 << range.lu()
3700                 << " prev_range.lu(): "
3701                 << prev_range.lu();
3702         }
3703     }
3704     else
3705     {
3706         evs_log_debug(D_USER_MSGS)
3707             << "Not timestamping due to user msg: msg.seq: "
3708             << msg.seq()
3709             << " prev_range.lu(): "
3710             << prev_range.lu();
3711         range = prev_range;
3712     }
3713 
3714     // Update im safe seq for self
3715     update_im_safe_seq(NodeMap::value(self_i_).index(),
3716                        input_map_->aru_seq());
3717 
3718     // Update safe seq for message source
3719     prev_safe = update_im_safe_seq(inst.index(), msg.aru_seq());
3720 
3721     // Check for missing messages
3722     if (range.hs() >  range.lu() &&
3723         (msg.flags() & Message::F_RETRANS) == 0)
3724     {
3725         request_retrans(msg.source(), msg.source(), range);
3726     }
3727 
3728     // Seqno range completion and acknowledgement
3729     const seqno_t max_hs(input_map_->max_hs());
3730     if (output_.empty()                          == true            &&
3731         (state() == S_OPERATIONAL || state() == S_GATHER)  &&
3732         (msg.flags() & Message::F_MSG_MORE) == 0               &&
3733         (last_sent_                              <  max_hs))
3734     {
3735         // Message not originated from this instance, output queue is empty
3736         // and last_sent seqno should be advanced
3737         gu_trace(complete_user(max_hs));
3738     }
3739     else if (output_.empty()           == true  &&
3740              input_map_->aru_seq() != prev_aru)
3741     {
3742         // Output queue empty and aru changed, send gap to inform others
3743         evs_log_debug(D_GAP_MSGS) << "sending empty gap";
3744         gu_trace(send_gap(EVS_CALLER, UUID::nil(), current_view_.id(), Range()));
3745     }
3746 
3747     // Send messages
3748     if (state() == S_OPERATIONAL)
3749     {
3750         size_t n_sent(0);
3751         while (output_.empty() == false)
3752         {
3753             int err;
3754             gu_trace(err = send_user(send_window_));
3755             if (err != 0)
3756             {
3757                 if (err == EAGAIN && n_sent == 0)
3758                 {
3759                     // If the send window was exhausted, send a gap
3760                     // message to advance aru_seq/safe_seq on peers.
3761                     gu_trace(send_gap(EVS_CALLER, UUID::nil(),
3762                                       current_view_.id(), Range()));
3763                 }
3764                 break;
3765             }
3766             else
3767             {
3768                 ++n_sent;
3769             }
3770         }
3771     }
3772 
3773     // Deliver messages
3774     gu_trace(deliver());
3775     gu_trace(deliver_local());
3776 
3777     // If in recovery state, send join each time input map aru seq reaches
3778     // last sent and either input map aru or safe seq has changed.
3779     if (state()                  == S_GATHER &&
3780         consensus_.highest_reachable_safe_seq() == input_map_->aru_seq() &&
3781         (prev_aru                    != input_map_->aru_seq() ||
3782          prev_safe                   != input_map_->safe_seq()) &&
3783         (msg.flags() & Message::F_RETRANS) == 0)
3784     {
3785         gcomm_assert(output_.empty() == true);
3786         if (consensus_.is_consensus() == false)
3787         {
3788             gu_trace(send_join());
3789         }
3790     }
3791 }
3792 
3793 
handle_delegate(const DelegateMessage & msg,NodeMap::iterator ii,const Datagram & rb)3794 void gcomm::evs::Proto::handle_delegate(const DelegateMessage& msg,
3795                                         NodeMap::iterator ii,
3796                                         const Datagram& rb)
3797 {
3798     gcomm_assert(ii != known_.end());
3799     evs_log_debug(D_DELEGATE_MSGS) << "delegate message " << msg;
3800     Message umsg;
3801     size_t offset;
3802     gu_trace(offset = unserialize_message(UUID::nil(), rb, &umsg));
3803     gu_trace(handle_msg(umsg, Datagram(rb, offset), false));
3804 }
3805 
3806 
handle_gap(const GapMessage & msg,NodeMap::iterator ii)3807 void gcomm::evs::Proto::handle_gap(const GapMessage& msg, NodeMap::iterator ii)
3808 {
3809     assert(ii != known_.end());
3810     assert(state() != S_CLOSED && state() != S_JOINING);
3811 
3812     Node& inst(NodeMap::value(ii));
3813     evs_log_debug(D_GAP_MSGS) << "gap message " << msg;
3814 
3815 
3816     if ((msg.flags() & Message::F_COMMIT) != 0)
3817     {
3818         log_debug << self_string() << " commit gap from " << msg.source();
3819         if (state()                             == S_GATHER             &&
3820             install_message_                    != 0                    &&
3821             install_message_->install_view_id() == msg.source_view_id() &&
3822             install_message_->fifo_seq()        == msg.seq())
3823         {
3824             inst.set_committed(true);
3825             inst.set_tstamp(gu::datetime::Date::monotonic());
3826             if (is_all_committed() == true)
3827             {
3828                 shift_to(S_INSTALL);
3829                 gu_trace(send_gap(EVS_CALLER, UUID::nil(),
3830                                   install_message_->install_view_id(),
3831                                   Range()));;
3832             }
3833         }
3834         else if (state()                             == S_GATHER             &&
3835                  install_message_                    != 0                    &&
3836                  install_message_->install_view_id() == msg.source_view_id() &&
3837                  install_message_->fifo_seq()        < msg.seq())
3838         {
3839             // new install message has been generated
3840             shift_to(S_GATHER, true);
3841         }
3842         else
3843         {
3844             evs_log_debug(D_GAP_MSGS) << " unhandled commit gap " << msg;
3845         }
3846         return;
3847     }
3848     else if (state()                           == S_INSTALL  &&
3849              install_message_                       != 0          &&
3850              install_message_->install_view_id() == msg.source_view_id())
3851     {
3852         evs_log_debug(D_STATE) << "install gap " << msg;
3853         inst.set_installed(true);
3854         inst.set_tstamp(gu::datetime::Date::monotonic());
3855         if (is_all_installed() == true)
3856         {
3857             gu_trace(shift_to(S_OPERATIONAL));
3858             if (pending_leave_ == true)
3859             {
3860                 close();
3861             }
3862         }
3863         return;
3864     }
3865     else if (msg.source_view_id() != current_view_.id())
3866     {
3867         if (state() == S_LEAVING)
3868         {
3869             // Silently drop
3870             return;
3871         }
3872 
3873         if (is_msg_from_previous_view(msg) == true)
3874         {
3875             evs_log_debug(D_FOREIGN_MSGS) << "gap message from previous view";
3876             return;
3877         }
3878 
3879         if (inst.operational() == false)
3880         {
3881             evs_log_debug(D_STATE)
3882                 << "dropping message from unoperational source "
3883                 << msg.source();
3884         }
3885         else if (inst.installed() == false)
3886         {
3887             evs_log_debug(D_STATE)
3888                 << "dropping message from uninstalled source "
3889                 << msg.source();
3890         }
3891         else
3892         {
3893             log_debug << "unhandled gap message " << msg;
3894         }
3895         return;
3896     }
3897 
3898     gcomm_assert(msg.source_view_id() == current_view_.id());
3899 
3900     //
3901     seqno_t prev_safe;
3902 
3903     prev_safe = update_im_safe_seq(inst.index(), msg.aru_seq());
3904 
3905     // Deliver messages and update tstamp only if safe_seq changed
3906     // for the source.
3907     if (prev_safe != input_map_->safe_seq(inst.index()))
3908     {
3909         inst.set_tstamp(gu::datetime::Date::monotonic());
3910     }
3911 
3912     //
3913     if (msg.range_uuid() == uuid())
3914     {
3915         if (msg.range().hs() > last_sent_ &&
3916             (state() == S_OPERATIONAL || state() == S_GATHER))
3917         {
3918             // This could be leaving node requesting messages up to
3919             // its last sent.
3920             gu_trace(complete_user(msg.range().hs()));
3921         }
3922         const seqno_t upper_bound(
3923             std::min(msg.range().hs(), last_sent_));
3924         if (msg.range().lu() <= upper_bound)
3925         {
3926             gu_trace(resend(msg.source(),
3927                             Range(msg.range().lu(), upper_bound)));
3928         }
3929     }
3930     else if ((msg.flags() & Message::F_RETRANS) != 0 &&
3931              msg.source() != uuid())
3932     {
3933         gu_trace(recover(msg.source(), msg.range_uuid(), msg.range()));
3934     }
3935 
3936     //
3937     if (state() == S_OPERATIONAL)
3938     {
3939         if (output_.empty() == false)
3940         {
3941             while (output_.empty() == false)
3942             {
3943                 int err;
3944                 gu_trace(err = send_user(send_window_));
3945                 if (err != 0)
3946                     break;
3947             }
3948         }
3949         else
3950         {
3951             const seqno_t max_hs(input_map_->max_hs());
3952             if (last_sent_ <  max_hs)
3953             {
3954                 gu_trace(complete_user(max_hs));
3955             }
3956         }
3957     }
3958 
3959     gu_trace(deliver());
3960     gu_trace(deliver_local());
3961 
3962     //
3963     if (state()                            == S_GATHER                  &&
3964         consensus_.highest_reachable_safe_seq() == input_map_->aru_seq()  &&
3965         prev_safe                              != input_map_->safe_seq()   )
3966     {
3967         gcomm_assert(output_.empty() == true);
3968         if (consensus_.is_consensus() == false)
3969         {
3970             gu_trace(send_join());
3971         }
3972     }
3973 }
3974 
3975 
update_im_safe_seqs(const MessageNodeList & node_list)3976 bool gcomm::evs::Proto::update_im_safe_seqs(const MessageNodeList& node_list)
3977 {
3978     bool updated = false;
3979     // Update input map state
3980     for (MessageNodeList::const_iterator i = node_list.begin();
3981          i != node_list.end(); ++i)
3982     {
3983         const UUID& node_uuid(MessageNodeList::key(i));
3984         const Node& local_node(NodeMap::value(known_.find_checked(node_uuid)));
3985         const MessageNode& node(MessageNodeList::value(i));
3986         gcomm_assert(node.view_id() == current_view_.id());
3987         const seqno_t safe_seq(node.safe_seq());
3988         seqno_t prev_safe_seq;
3989         gu_trace(prev_safe_seq = update_im_safe_seq(local_node.index(), safe_seq));
3990         if (prev_safe_seq                 != safe_seq &&
3991             input_map_->safe_seq(local_node.index()) == safe_seq)
3992         {
3993             updated = true;
3994         }
3995     }
3996     return updated;
3997 }
3998 
retrans_leaves(const MessageNodeList & node_list)3999 void gcomm::evs::Proto::retrans_leaves(const MessageNodeList& node_list)
4000 {
4001     for (NodeMap::const_iterator li = known_.begin(); li != known_.end(); ++li)
4002     {
4003         const Node& local_node(NodeMap::value(li));
4004         if (local_node.leave_message() != 0 &&
4005             local_node.is_inactive()       == false)
4006         {
4007             MessageNodeList::const_iterator msg_li(
4008                 node_list.find(NodeMap::key(li)));
4009 
4010             if (msg_li == node_list.end() ||
4011                 MessageNodeList::value(msg_li).leaving() == false)
4012             {
4013                 const LeaveMessage& lm(*NodeMap::value(li).leave_message());
4014                 LeaveMessage send_lm(lm.version(),
4015                                      lm.source(),
4016                                      lm.source_view_id(),
4017                                      lm.seq(),
4018                                      lm.aru_seq(),
4019                                      lm.fifo_seq(),
4020                                      Message::F_RETRANS | Message::F_SOURCE);
4021 
4022                 gu::Buffer buf;
4023                 serialize(send_lm, buf);
4024                 Datagram dg(buf);
4025                 gu_trace(send_delegate(dg, UUID::nil()));
4026             }
4027         }
4028     }
4029 }
4030 
4031 
4032 class SelectSuspectsOp
4033 {
4034 public:
SelectSuspectsOp(gcomm::evs::MessageNodeList & nl)4035     SelectSuspectsOp(gcomm::evs::MessageNodeList& nl) : nl_(nl) { }
4036 
operator ()(const gcomm::evs::MessageNodeList::value_type & vt) const4037     void operator()(const gcomm::evs::MessageNodeList::value_type& vt) const
4038     {
4039         if (gcomm::evs::MessageNodeList::value(vt).suspected() == true)
4040         {
4041             nl_.insert_unique(vt);
4042         }
4043     }
4044 private:
4045     gcomm::evs::MessageNodeList& nl_;
4046 };
4047 
check_suspects(const UUID & source,const MessageNodeList & nl)4048 void gcomm::evs::Proto::check_suspects(const UUID& source,
4049                                        const MessageNodeList& nl)
4050 {
4051     assert(source != uuid());
4052     MessageNodeList suspected;
4053     for_each(nl.begin(), nl.end(), SelectSuspectsOp(suspected));
4054 
4055     for (MessageNodeList::const_iterator i(suspected.begin());
4056          i != suspected.end(); ++i)
4057     {
4058         const UUID& node_uuid(MessageNodeList::key(i));
4059         const MessageNode& node(MessageNodeList::value(i));
4060         if (node.suspected() == true)
4061         {
4062             if (node_uuid != uuid())
4063             {
4064                 size_t s_cnt(0);
4065                 // Iterate over join messages to see if majority of current
4066                 // view agrees with the suspicion
4067                 for (NodeMap::const_iterator j(known_.begin());
4068                      j != known_.end(); ++j)
4069                 {
4070                     const JoinMessage* jm(NodeMap::value(j).join_message());
4071                     if (jm != 0 && jm->source() != node_uuid &&
4072                         current_view_.is_member(jm->source()) == true)
4073                     {
4074                         MessageNodeList::const_iterator mni(jm->node_list().find(node_uuid));
4075                         if (mni != jm->node_list().end())
4076                         {
4077                             const MessageNode& mn(MessageNodeList::value(mni));
4078                             if (mn.suspected() == true)
4079                             {
4080                                 ++s_cnt;
4081                             }
4082                         }
4083                     }
4084                 }
4085                 const Node& kn(NodeMap::value(known_.find_checked(node_uuid)));
4086                 if (kn.operational() == true &&
4087                     s_cnt > current_view_.members().size()/2)
4088                 {
4089                     evs_log_info(I_STATE)
4090                         << " declaring suspected "
4091                         << node_uuid << " as inactive";
4092                     set_inactive(node_uuid);
4093                 }
4094             }
4095         }
4096     }
4097 }
4098 
4099 
cross_check_inactives(const UUID & source,const MessageNodeList & nl)4100 void gcomm::evs::Proto::cross_check_inactives(const UUID& source,
4101                                               const MessageNodeList& nl)
4102 {
4103     assert(source != uuid());
4104 
4105     // Do elimination by suspect status
4106     NodeMap::const_iterator source_i(known_.find_checked(source));
4107 
4108     for (MessageNodeList::const_iterator i(nl.begin()); i != nl.end(); ++i)
4109     {
4110         const UUID& node_uuid(MessageNodeList::key(i));
4111         const MessageNode& node(MessageNodeList::value(i));
4112         if (node.operational() == false)
4113         {
4114             NodeMap::iterator local_i(known_.find(node_uuid));
4115             if (local_i != known_.end() && node_uuid != uuid())
4116             {
4117                 const Node& local_node(NodeMap::value(local_i));
4118                 if (local_node.suspected())
4119                 {
4120                     // This node is suspecting and the source node has
4121                     // already set inactve, mark also locally inactive.
4122                     set_inactive(node_uuid);
4123                 }
4124             }
4125         }
4126     }
4127 }
4128 
4129 
4130 // Asymmetry elimination:
4131 // 1a) Find all joins that has this node marked as operational and which
4132 //     this node considers operational
4133 // 1b) Mark all operational nodes without join message unoperational
4134 // 2) Iterate over join messages gathered in 1a, find all
4135 //    unoperational entries and mark them unoperational too
asymmetry_elimination()4136 void gcomm::evs::Proto::asymmetry_elimination()
4137 {
4138     // Allow some time to pass from setting install timers to get
4139     // join messages accumulated.
4140     const gu::datetime::Date now(gu::datetime::Date::monotonic());
4141     TimerList::const_iterator ti(
4142         find_if(timers_.begin(), timers_.end(), TimerSelectOp(T_INSTALL)));
4143 
4144     assert(ti != timers_.end());
4145     if (ti == timers_.end())
4146     {
4147         log_warn << "install timer not set in asymmetry_elimination()";
4148         return;
4149     }
4150 
4151     if (install_timeout_ - suspect_timeout_ < TimerList::key(ti) - now)
4152     {
4153         // No check yet
4154         return;
4155     }
4156 
4157     // Record initial operational state for logging
4158     std::vector<int> oparr_before(known_.size());
4159     size_t index(0);
4160     for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
4161     {
4162         oparr_before[index] = (NodeMap::value(i).operational() == true);
4163         index++;
4164     }
4165     std::list<const JoinMessage*> joins;
4166 
4167     // Compose list of join messages
4168     for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
4169     {
4170         const UUID& node_uuid(NodeMap::key(i));
4171         const Node& node(NodeMap::value(i));
4172         const JoinMessage* jm(node.join_message());
4173         if (jm != 0)
4174         {
4175             MessageNodeList::const_iterator self_ref(
4176                 jm->node_list().find(uuid()));
4177             if (node.operational() == true                           &&
4178                 self_ref           != jm->node_list().end()          &&
4179                 MessageNodeList::value(self_ref).operational() == true)
4180             {
4181                 joins.push_back(NodeMap::value(i).join_message());
4182             }
4183         }
4184         else if (node.operational() == true)
4185         {
4186             evs_log_info(I_STATE)
4187                 << "marking operational node "
4188                 << node_uuid << " without "
4189                 << "join message inactive in asymmetry elimination";
4190             set_inactive(node_uuid);
4191         }
4192     }
4193 
4194     // Setting node inactive may remove join message and so invalidate
4195     // pointer in joins list, so collect set of UUIDs to set inactive
4196     // and do inactivation in separate loop.
4197     std::set<UUID> to_inactive;
4198     // Iterate over join messages and collect nodes to be set inactive
4199     for (std::list<const JoinMessage*>::const_iterator i(joins.begin());
4200          i != joins.end(); ++i)
4201     {
4202         for (MessageNodeList::const_iterator j((*i)->node_list().begin());
4203              j != (*i)->node_list().end(); ++j)
4204         {
4205             if (MessageNodeList::value(j).operational() == false)
4206             {
4207                 to_inactive.insert(MessageNodeList::key(j));
4208             }
4209         }
4210     }
4211     joins.clear();
4212     for (std::set<UUID>::const_iterator i(to_inactive.begin());
4213          i != to_inactive.end(); ++i)
4214     {
4215         NodeMap::const_iterator ni(known_.find(*i));
4216         if (ni != known_.end())
4217         {
4218             if (NodeMap::value(ni).operational() == true)
4219             {
4220                 evs_log_info(I_STATE) << "setting " << *i
4221                                       << " inactive in asymmetry elimination";
4222                 set_inactive(*i);
4223             }
4224         }
4225         else
4226         {
4227             log_warn << "node " << *i << " not found from known list in ae";
4228         }
4229     }
4230 
4231     // Compute final state and log if it has changed
4232     std::vector<int> oparr_after(known_.size());
4233     index = 0;
4234     for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
4235     {
4236         oparr_after[index] = (NodeMap::value(i).operational() == true);
4237         index++;
4238     }
4239 
4240     if (oparr_before != oparr_after)
4241     {
4242         evs_log_info(I_STATE) << "before asym elimination";
4243         if (info_mask_ & I_STATE)
4244         {
4245             std::copy(oparr_before.begin(), oparr_before.end(),
4246                       std::ostream_iterator<int>(std::cerr, " "));
4247             std::cerr << "\n";
4248         }
4249 
4250         evs_log_info(I_STATE) << "after asym elimination";
4251         if (info_mask_ & I_STATE)
4252         {
4253             std::copy(oparr_after.begin(), oparr_after.end(),
4254                       std::ostream_iterator<int>(std::cerr, " "));
4255             std::cerr << "\n";
4256         }
4257     }
4258 }
4259 
4260 // For each node thas has no join message associated, iterate over other
4261 // known nodes' join messages to find out if the node without join message
4262 // should be declared inactive.
check_unseen()4263 void gcomm::evs::Proto::check_unseen()
4264 {
4265     for (NodeMap::iterator i(known_.begin()); i != known_.end(); ++i)
4266     {
4267 
4268         const UUID& node_uuid(NodeMap::key(i));
4269         Node& node(NodeMap::value(i));
4270 
4271         if (node_uuid                          != uuid() &&
4272             current_view_.is_member(node_uuid) == false  &&
4273             node.join_message()                == 0      &&
4274             node.operational()                 == true)
4275         {
4276             evs_log_debug(D_STATE) << "checking operational unseen "
4277                                    << node_uuid;
4278             size_t cnt(0), inact_cnt(0);
4279             for (NodeMap::iterator j(known_.begin()); j != known_.end(); ++j)
4280             {
4281                 const JoinMessage* jm(NodeMap::value(j).join_message());
4282                 if (jm == 0 || NodeMap::key(j) == uuid())
4283                 {
4284                     continue;
4285                 }
4286                 MessageNodeList::const_iterator mn_i;
4287                 for (mn_i = jm->node_list().begin();
4288                      mn_i != jm->node_list().end(); ++mn_i)
4289                 {
4290                     NodeMap::const_iterator known_i(
4291                         known_.find(MessageNodeList::key(mn_i)));
4292                     if (known_i == known_.end() ||
4293                         (MessageNodeList::value(mn_i).operational() == true &&
4294                          NodeMap::value(known_i).join_message() == 0))
4295                     {
4296                         evs_log_debug(D_STATE)
4297                             << "all joins not locally present for "
4298                             << NodeMap::key(j)
4299                             << " join message node list";
4300                         return;
4301                     }
4302                 }
4303 
4304                 if ((mn_i = jm->node_list().find(node_uuid))
4305                     != jm->node_list().end())
4306                 {
4307                     const MessageNode& mn(MessageNodeList::value(mn_i));
4308                     evs_log_debug(D_STATE)
4309                         << "found " << node_uuid << " from " <<  NodeMap::key(j)
4310                         << " join message: "
4311                         << mn.view_id() << " "
4312                         << mn.operational();
4313                     if (mn.view_id() != ViewId(V_REG))
4314                     {
4315                         ++cnt;
4316                         if (mn.operational() == false) ++inact_cnt;
4317                     }
4318                 }
4319             }
4320             if (cnt > 0 && cnt == inact_cnt)
4321             {
4322                 evs_log_info(I_STATE)
4323                     << "unseen node marked inactive by others (cnt="
4324                     << cnt
4325                     << ", inact_cnt="
4326                     << inact_cnt
4327                     << ")";
4328                 set_inactive(node_uuid);
4329             }
4330         }
4331     }
4332 }
4333 
4334 
4335 // Iterate over all join messages. If some node has nil view id and suspected
4336 // flag true in all present join messages, declare it inactive.
check_nil_view_id()4337 void gcomm::evs::Proto::check_nil_view_id()
4338 {
4339     size_t join_counts(0);
4340     std::map<UUID, size_t > nil_counts;
4341     for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
4342     {
4343         const JoinMessage* jm(NodeMap::value(i).join_message());
4344         if (jm == 0)
4345         {
4346             continue;
4347         }
4348         ++join_counts;
4349         for (MessageNodeList::const_iterator j(jm->node_list().begin());
4350              j != jm->node_list().end(); ++j)
4351         {
4352             const MessageNode& mn(MessageNodeList::value(j));
4353             if (mn.view_id() == ViewId(V_REG))
4354             {
4355                 // todo: investigate why removing mn.suspected() == true
4356                 // condition causes some unit tests to fail
4357                 if (mn.suspected() == true)
4358                 {
4359                     const UUID& uuid(MessageNodeList::key(j));
4360                     ++nil_counts[uuid];
4361                 }
4362             }
4363         }
4364     }
4365     for (std::map<UUID, size_t>::const_iterator
4366              i(nil_counts.begin()); i != nil_counts.end(); ++i)
4367     {
4368         if (i->second == join_counts && is_inactive(i->first) == false)
4369         {
4370             log_info << "node " << i->first
4371                      << " marked with nil view id and suspected in all present"
4372                      << " join messages, declaring inactive";
4373             set_inactive(i->first);
4374         }
4375     }
4376 }
4377 
join_rate_limit() const4378 bool gcomm::evs::Proto::join_rate_limit() const
4379 {
4380     gu::datetime::Date now(gu::datetime::Date::monotonic());
4381     // Limit join message sending. It is likely that
4382     // the transfer of user messages which were flushed into network
4383     // in shift to GATHER state takes some time. Too frequent join message
4384     // send will cause unwanted retransmits which will pile up in the
4385     // socket send queue.
4386     if (now < last_sent_join_tstamp_ + 100*gu::datetime::MSec)
4387     {
4388         evs_log_debug(D_JOIN_MSGS) << "join rate limit";
4389         return true;
4390     }
4391     return false;
4392 }
4393 
handle_join(const JoinMessage & msg,NodeMap::iterator ii)4394 void gcomm::evs::Proto::handle_join(const JoinMessage& msg, NodeMap::iterator ii)
4395 {
4396     assert(ii != known_.end());
4397     assert(state() != S_CLOSED);
4398 
4399     Node& inst(NodeMap::value(ii));
4400 
4401     evs_log_debug(D_JOIN_MSGS) << " " << msg;
4402 
4403     if (state() == S_LEAVING)
4404     {
4405         if (msg.source_view_id() == current_view_.id())
4406         {
4407             inst.set_tstamp(gu::datetime::Date::monotonic());
4408             // Join messages are needed for detecting gaps in message
4409             // sequences on other nodes.
4410             inst.set_join_message(&msg);
4411             MessageNodeList same_view;
4412             for_each(msg.node_list().begin(), msg.node_list().end(),
4413                      SelectNodesOp(same_view, current_view_.id(),
4414                                    true, true));
4415             if (update_im_safe_seqs(same_view) == true)
4416             {
4417                 gu_trace(send_leave(false));
4418             }
4419             request_missing();
4420         }
4421         return;
4422     }
4423     else if (is_msg_from_previous_view(msg) == true)
4424     {
4425         return;
4426     }
4427     else if (install_message_ != 0)
4428     {
4429         // Note: don't send join from this branch yet, join is
4430         // sent at the end of this method
4431         if (install_message_->source() == msg.source())
4432         {
4433             evs_log_info(I_STATE)
4434                 << "shift to gather due to representative "
4435                 << msg.source() << " join";
4436             if (msg.source_view_id() == install_message_->install_view_id())
4437             {
4438                 // Representative reached operational state, we follow
4439                 // Other instances installed view before this one, so it is
4440                 // safe to shift to S_OPERATIONAL
4441 
4442                 // Mark all operational nodes in install message as installed
4443                 for (MessageNodeList::const_iterator
4444                          mi = install_message_->node_list().begin();
4445                      mi != install_message_->node_list().end(); ++mi)
4446                 {
4447                     if (MessageNodeList::value(mi).operational() == true)
4448                     {
4449                         NodeMap::iterator jj;
4450                         gu_trace(jj = known_.find_checked(
4451                                      MessageNodeList::key(mi)));
4452                         NodeMap::value(jj).set_installed(true);
4453                     }
4454                 }
4455                 inst.set_tstamp(gu::datetime::Date::monotonic());
4456                 if (state() == S_INSTALL)
4457                 {
4458                     gu_trace(shift_to(S_OPERATIONAL));
4459                     if (pending_leave_ == true)
4460                     {
4461                         close();
4462                         return;
4463                     }
4464                     // proceed to process actual join message
4465                 }
4466                 else
4467                 {
4468                     log_warn << self_string()
4469                              << "received join message from new "
4470                              << "view while in GATHER, dropping";
4471                     return;
4472                 }
4473             }
4474             gu_trace(shift_to(S_GATHER, false));
4475         }
4476         else if (consensus_.is_consistent(*install_message_) == true)
4477         {
4478             return;
4479             // Commented out: It seems to be better strategy to
4480             // just wait source of inconsistent join to time out
4481             // instead of shifting to gather. #443
4482 
4483             // if (consensus_.is_consistent(msg) == true)
4484             // {
4485             //   return;
4486             // }
4487             // else
4488             // {
4489             //   log_warn << "join message not consistent " << msg;
4490             //   log_info << "state (stderr): ";
4491             //   std::cerr << *this << std::endl;
4492             //
4493             // gu_trace(shift_to(S_GATHER, false));
4494             // }
4495         }
4496         else
4497         {
4498             evs_log_info(I_STATE)
4499                 << "shift to GATHER, install message is "
4500                 << "inconsistent when handling join from "
4501                 << msg.source() << " " << msg.source_view_id();
4502             evs_log_info(I_STATE) << "state: " << *this;
4503             gu_trace(shift_to(S_GATHER, false));
4504         }
4505     }
4506     else if (state() != S_GATHER)
4507     {
4508         evs_log_info(I_STATE)
4509             << " shift to GATHER while handling join message from "
4510             << msg.source() << " " << msg.source_view_id();
4511         gu_trace(shift_to(S_GATHER, false));
4512     }
4513 
4514     gcomm_assert(output_.empty() == true);
4515 
4516     // If source node is member of current view but has already
4517     // formed new view, mark it unoperational
4518     if (current_view_.is_member(msg.source()) == true &&
4519         msg.source_view_id().seq() > current_view_.id().seq())
4520     {
4521         evs_log_info(I_STATE)
4522             << " join source has already formed new view, marking inactive";
4523         set_inactive(msg.source());
4524         return;
4525     }
4526 
4527     // Collect view ids to gather_views_ list.
4528     // Add unseen nodes to known list and evicted nodes to evicted list.
4529     // Evicted nodes must also be added to known list for GATHER time
4530     // bookkeeping.
4531     // No need to adjust node state here, it is done later on in
4532     // check_suspects()/cross_check_inactives().
4533     for (MessageNodeList::const_iterator i(msg.node_list().begin());
4534          i != msg.node_list().end(); ++i)
4535     {
4536         NodeMap::iterator ni(known_.find(MessageNodeList::key(i)));
4537         const UUID mn_uuid(MessageNodeList::key(i));
4538         const MessageNode& mn(MessageNodeList::value(i));
4539         gather_views_.insert(std::make_pair(mn.view_id(),
4540                                             gu::datetime::Date::monotonic()));
4541         if (ni == known_.end())
4542         {
4543             known_.insert_unique(
4544                 std::make_pair(mn_uuid, Node(*this)));
4545         }
4546 
4547         // Evict nodes according to join message
4548         if (mn_uuid != uuid() && mn.evicted() == true)
4549         {
4550             set_inactive(mn_uuid);
4551             if (is_evicted(mn_uuid) == false)
4552             {
4553                 evict(mn_uuid);
4554             }
4555         }
4556     }
4557 
4558     // Timestamp source if it sees processing node as operational.
4559     // Adjust local entry operational status.
4560     MessageNodeList::const_iterator self(msg.node_list().find(uuid()));
4561     if (msg.node_list().end()                      != self)
4562     {
4563         if(MessageNodeList::value(self).operational() == true)
4564         {
4565             inst.set_tstamp(gu::datetime::Date::monotonic());
4566         }
4567         else
4568         {
4569             evs_log_info(I_STATE)
4570                 << " declaring source " << msg.source()
4571                 << " as inactive (mutual exclusion)";
4572             set_inactive(msg.source());
4573         }
4574     }
4575     inst.set_join_message(&msg);
4576 
4577     // Select nodes that are coming from the same view as seen by
4578     // message source
4579     MessageNodeList same_view;
4580     for_each(msg.node_list().begin(), msg.node_list().end(),
4581              SelectNodesOp(same_view, current_view_.id(), true, true));
4582     // Find out self from node list
4583     MessageNodeList::const_iterator nlself_i(same_view.find(uuid()));
4584 
4585     // Other node coming from the same view
4586     if (msg.source()         != uuid() &&
4587         msg.source_view_id() == current_view_.id())
4588     {
4589         gcomm_assert(nlself_i != same_view.end());
4590         // Update input map state
4591         (void)update_im_safe_seqs(same_view);
4592 
4593         // Find out max hs and complete up to that if needed
4594         MessageNodeList::const_iterator max_hs_i(
4595             max_element(same_view.begin(), same_view.end(), RangeHsCmp()));
4596         const seqno_t max_hs(MessageNodeList::value(max_hs_i).im_range().hs());
4597         if (last_sent_ < max_hs)
4598         {
4599             gu_trace(complete_user(max_hs));
4600         }
4601     }
4602 
4603     // Request missing messages from other nodes.
4604     request_missing();
4605     // Retrans leave messages that others are missing
4606     gu_trace(retrans_leaves(same_view));
4607 
4608     // Make cross check to resolve conflict if two nodes
4609     // declare each other inactive. There is no need to make
4610     // this for own messages.
4611     if (msg.source() != uuid())
4612     {
4613         gu_trace(check_suspects(msg.source(), same_view));
4614         gu_trace(cross_check_inactives(msg.source(), same_view));
4615         gu_trace(check_unseen());
4616         gu_trace(check_nil_view_id());
4617     }
4618 
4619     // Eliminate asymmetry according to operational status flags in
4620     // join messages
4621     gu_trace(asymmetry_elimination());
4622 
4623     // If current join message differs from current state, send new join
4624     const JoinMessage* curr_join(NodeMap::value(self_i_).join_message());
4625     MessageNodeList new_nl;
4626     populate_node_list(&new_nl);
4627 
4628     if (curr_join == 0 ||
4629         (curr_join->aru_seq()   != input_map_->aru_seq()  ||
4630          curr_join->seq()       != input_map_->safe_seq() ||
4631          curr_join->node_list() != new_nl))
4632     {
4633         gu_trace(create_join());
4634         if (consensus_.is_consensus() == false && not join_rate_limit())
4635         {
4636             send_join(false);
4637         }
4638     }
4639 
4640     if (consensus_.is_consensus() == true)
4641     {
4642         if (is_representative(uuid()) == true)
4643         {
4644             gu_trace(send_install(EVS_CALLER));
4645         }
4646     }
4647 }
4648 
4649 
handle_leave(const LeaveMessage & msg,NodeMap::iterator ii)4650 void gcomm::evs::Proto::handle_leave(const LeaveMessage& msg,
4651                                      NodeMap::iterator ii)
4652 {
4653     assert(ii != known_.end());
4654     assert(state() != S_CLOSED && state() != S_JOINING);
4655 
4656     Node& node(NodeMap::value(ii));
4657     evs_log_debug(D_LEAVE_MSGS) << "leave message " << msg;
4658 
4659     // Leave messages must be always handled. They carry aru_seq information
4660     // which is used to retrasmit missing messages.
4661 
4662     node.set_leave_message(&msg);
4663     if (msg.source() == uuid())
4664     {
4665         // The last one to live, instant close. Otherwise continue
4666         // serving until it becomes apparent that others have
4667         // leave message.
4668         if (current_view_.members().size() == 1)
4669         {
4670             gu_trace(shift_to(S_CLOSED));
4671         }
4672     }
4673     else
4674     {
4675         // Always set node nonoperational if leave message is seen
4676         node.set_operational(false);
4677         if (msg.source_view_id()       != current_view_.id() ||
4678             is_msg_from_previous_view(msg) == true)
4679         {
4680             // Silent drop
4681             return;
4682         }
4683 
4684         const seqno_t prev_safe_seq(update_im_safe_seq(node.index(), msg.aru_seq()));
4685         if (prev_safe_seq != input_map_->safe_seq(node.index()))
4686         {
4687             node.set_tstamp(gu::datetime::Date::monotonic());
4688         }
4689         if (state() == S_OPERATIONAL)
4690         {
4691             evs_log_info(I_STATE)
4692                 << " shift to GATHER when handling leave from "
4693                 << msg.source() << " " << msg.source_view_id();
4694             gu_trace(shift_to(S_GATHER, true));
4695         }
4696         else if (state() == S_GATHER &&
4697                  prev_safe_seq != input_map_->safe_seq(node.index()))
4698         {
4699             gu_trace(send_join());
4700         }
4701     }
4702 }
4703 
4704 
handle_install(const InstallMessage & msg,NodeMap::iterator ii)4705 void gcomm::evs::Proto::handle_install(const InstallMessage& msg,
4706                                        NodeMap::iterator ii)
4707 {
4708 
4709     assert(ii != known_.end());
4710     assert(state() != S_CLOSED && state() != S_JOINING);
4711 
4712     Node& inst(NodeMap::value(ii));
4713 
4714     evs_log_debug(D_INSTALL_MSGS) << "install msg " << msg;
4715 
4716     if (state() == S_LEAVING)
4717     {
4718         // Check if others have receievd leave message or declared
4719         // as unoperational before shifting to closed.
4720         MessageNodeList::const_iterator mn_i(msg.node_list().find(uuid()));
4721         if (mn_i != msg.node_list().end())
4722         {
4723             const MessageNode& mn(MessageNodeList::value(mn_i));
4724             if (mn.operational() == false || mn.leaving() == true)
4725             {
4726                 gu_trace(shift_to(S_CLOSED));
4727             }
4728         }
4729         return;
4730     }
4731     else if (state() == S_OPERATIONAL)
4732     {
4733         // Drop install messages in operational state.
4734         evs_log_debug(D_INSTALL_MSGS)
4735             << "dropping install message in already installed view";
4736         return;
4737     }
4738     else if (inst.operational() == false)
4739     {
4740         // Message source is not seen as operational, must not accept
4741         // anything from it.
4742         evs_log_debug(D_INSTALL_MSGS)
4743             << "install message source " << msg.source()
4744             << " is not operational, discarding message";
4745         return;
4746     }
4747     else if (is_msg_from_previous_view(msg) == true)
4748     {
4749         // Delayed install message
4750         evs_log_debug(D_FOREIGN_MSGS)
4751             << " dropping install message from previous view";
4752         return;
4753     }
4754     else if (install_message_ != 0)
4755     {
4756         if (msg.source() == install_message_->source() &&
4757             msg.install_view_id().seq() > install_message_->install_view_id().seq())
4758         {
4759             // Representative regenerated install message
4760             evs_log_debug(D_INSTALL_MSGS)
4761                 << "regenerated install message";
4762             setall_committed(false);
4763             setall_installed(false);
4764             delete install_message_;
4765             install_message_ = 0;
4766             // Fall through to process new install message
4767         }
4768         else if (msg.source() == install_message_->source())
4769         {
4770             // Duplicate or delayed install message
4771             evs_log_debug(D_INSTALL_MSGS)
4772                 << "duplicate or delayed install message";
4773             return;
4774         }
4775         else
4776         {
4777             MessageNodeList::const_iterator self(msg.node_list().find(uuid()));
4778             if (msg.node_list().end()                      == self ||
4779                 MessageNodeList::value(self).operational() == false)
4780             {
4781                 evs_log_debug(D_INSTALL_MSGS)
4782                     << "dropping install message, processing node not in "
4783                     << "new view";
4784             }
4785             else
4786             {
4787                 // Two nodes decided to generate install message simultaneously,
4788                 // shift to gather to combine groups in install messages.
4789                 log_warn << self_string()
4790                          << " shift to GATHER due to conflicting install "
4791                          << "messages";
4792                 gu_trace(shift_to(S_GATHER));
4793             }
4794             return;
4795         }
4796     }
4797     else if (inst.installed() == true)
4798     {
4799         log_warn << self_string()
4800                  << " shift to GATHER due to inconsistent state";
4801         gu_trace(shift_to(S_GATHER));
4802         return;
4803     }
4804 
4805     // Construct join from install message so that the most recent
4806     // information from representative is updated to local state.
4807     if (msg.source() != uuid())
4808     {
4809         const MessageNode& mn(
4810             MessageNodeList::value(
4811                 msg.node_list().find_checked(msg.source())));
4812         JoinMessage jm(msg.version(),
4813                        msg.source(),
4814                        mn.view_id(),
4815                        msg.seq(),
4816                        msg.aru_seq(),
4817                        msg.fifo_seq(),
4818                        msg.node_list());
4819         handle_join(jm, ii);
4820     }
4821 
4822     // Drop install message if processing node won't be part of the
4823     // view to be installed.
4824     // Don't set nodes that are forming another view inactive here,
4825     // they should enter new view shortly after install message
4826     // delivery and should be ready to restart GATHER round.
4827     MessageNodeList::const_iterator self(msg.node_list().find(uuid()));
4828     if (msg.node_list().end()                      == self ||
4829         MessageNodeList::value(self).operational() == false)
4830     {
4831         evs_log_debug(D_INSTALL_MSGS)
4832             << "dropping install message, processing node not in new view";
4833         return;
4834     }
4835 
4836     // Proceed to install phase
4837     assert(install_message_ == 0);
4838 
4839     // Run through known nodes and remove each entry that is
4840     // not member of current view or present in install message.
4841     // This is to prevent inconsistent view of group when first message(s)
4842     // from new node are received after install message on representative
4843     // and before install message on other nodes.
4844     bool changed(false);
4845     NodeMap::iterator i, i_next;
4846     for (NodeMap::iterator i(known_.begin()); i != known_.end(); i = i_next)
4847     {
4848         i_next = i, ++i_next;
4849         const UUID& uuid(NodeMap::key(i));
4850         if (msg.node_list().find(uuid)         == msg.node_list().end() &&
4851             current_view_.members().find(uuid) == current_view_.members().end())
4852         {
4853             log_info << self_string() << " temporarily discarding known "
4854                      << uuid << " due to received install message";
4855             known_.erase(i);
4856             changed = true;
4857         }
4858     }
4859 
4860     // Recreate join message to match current state, otherwise is_consistent()
4861     // below will fail.
4862     if (changed == true)
4863     {
4864         (void)create_join();
4865     }
4866 
4867     // See if install message is consistent with local state.
4868     // Is_consistent() checks only local state and local join
4869     // message in case other nodes have already been seen and reported
4870     // nodes that will not be in the next view.
4871     if (consensus_.is_consistent(msg) == true)
4872     {
4873         inst.set_tstamp(gu::datetime::Date::monotonic());
4874         install_message_ = new InstallMessage(msg);
4875         assert(install_message_->source() != UUID::nil());
4876         assert(install_message_->flags() != 0);
4877         // Send commit gap
4878         gu_trace(send_gap(EVS_CALLER, UUID::nil(), install_message_->install_view_id(),
4879                           Range(), true));
4880     }
4881     else
4882     {
4883         evs_log_debug(D_INSTALL_MSGS)
4884             << "install message " << msg
4885             << " not consistent with state " << *this;
4886         gu_trace(shift_to(S_GATHER, true));
4887     }
4888 }
4889 
4890 
handle_delayed_list(const DelayedListMessage & msg,NodeMap::iterator ii)4891 void gcomm::evs::Proto::handle_delayed_list(const DelayedListMessage& msg,
4892                                           NodeMap::iterator ii)
4893 {
4894     if (auto_evict_ == 0)
4895     {
4896         // Ignore evict list messages if auto_evict_ is disabled.
4897         return;
4898     }
4899 
4900     Node& node(NodeMap::value(ii));
4901     node.set_delayed_list_message(&msg);
4902     gu::datetime::Date now(gu::datetime::Date::monotonic());
4903 
4904     // Construct a list of evict candidates that appear in evict list messages
4905     // with cnt greater than local auto_evict_. If evict candidate is reported
4906     // by majority of the current group, evict process is triggered.
4907 
4908     // UUID -> over auto_evict_, total count
4909     typedef std::map<UUID, std::pair<size_t, size_t> > Evicts;
4910     Evicts evicts;
4911     bool found(false);
4912 
4913     for (NodeMap::const_iterator i(known_.begin()); i != known_.end(); ++i)
4914     {
4915         const DelayedListMessage* const dlm(
4916             NodeMap::value(i).delayed_list_message());
4917         if (dlm == 0)
4918         {
4919             continue;
4920         }
4921         else if (dlm->delayed_list().find(uuid()) != dlm->delayed_list().end())
4922         {
4923             evs_log_debug(D_STATE)
4924                 << "found self " << uuid() << " from evict list from "
4925                 << msg.source() << " at " << get_address(msg.source());
4926             continue;
4927         }
4928         else if (dlm->tstamp() + delayed_keep_period_ < now)
4929         {
4930             evs_log_debug(D_STATE) << "ignoring expired evict message";
4931             continue;
4932         }
4933 
4934         for (DelayedListMessage::DelayedList::const_iterator
4935                  dlm_i(dlm->delayed_list().begin());
4936              dlm_i != dlm->delayed_list().end();
4937              ++dlm_i)
4938         {
4939             if (dlm_i->second <= 1)
4940             {
4941                 // Don't consider entries with single delayed event as
4942                 // evict candidates.
4943                 continue;
4944             }
4945 
4946             std::pair<Evicts::iterator, bool> eir(
4947                 evicts.insert(
4948                     std::make_pair(
4949                         dlm_i->first, std::make_pair(0, 0))));
4950             evs_log_debug(D_STATE) << "eir " << eir.first->first
4951                                    << " " << eir.first->second.first
4952                                    << " " << eir.first->second.second;
4953             ++eir.first->second.second; // total count
4954             if (dlm_i->second >= auto_evict_)
4955             {
4956                 ++eir.first->second.first; // over threshold count
4957                 found = true;
4958             }
4959         }
4960     }
4961 
4962     // Evict candidates that have reached threshold count
4963     for (Evicts::const_iterator i(evicts.begin());
4964          found == true && i != evicts.end(); ++i)
4965     {
4966         if (is_evicted(i->first) == true)
4967         {
4968             // Already evicted, avoid spamming
4969             continue;
4970         }
4971         evs_log_info(I_STATE) << "evict candidate "
4972                               << i->first << " " << i->second.first
4973                               << " " << i->second.second;
4974         // If the candidate is in the current view, require majority
4975         // of the view to agree. If the candidate is not in the current
4976         // view, require majority of known nodes to agree. Ability to
4977         // evict nodes outside of the group (even while in non-PC) is
4978         // needed to stabilize cluster also in the case that nodes
4979         // have already partitioned.
4980 
4981         // TODO: Record stable views from PC and use weights from there
4982         // accordingly (need to be added to view).
4983         if (i->second.first != 0 &&
4984             ((current_view_.is_member(i->first) &&
4985               i->second.second > current_view_.members().size()/2) ||
4986              i->second.second > known_.size()/2))
4987         {
4988             log_warn << "evicting member " << i->first
4989                      << " at " << get_address(i->first)
4990                      << " permanently from group";
4991             evict(i->first);
4992             if (state() == S_OPERATIONAL)
4993             {
4994                 shift_to(S_GATHER, true);
4995             }
4996         }
4997     }
4998 }
4999