1 /*
2  * Copyright (C) 2009-2014 Codership Oy <info@codership.com>
3  */
4 
5 #include "evs_consensus.hpp"
6 #include "evs_message2.hpp"
7 #include "evs_input_map2.hpp"
8 #include "evs_node.hpp"
9 #include "evs_proto.hpp"
10 #include "gcomm/view.hpp"
11 
12 #include "gu_logger.hpp"
13 
14 #include <vector>
15 
16 // Disable debug logging until debug mask is available here
17 #define evs_log_debug(i) if ((proto_.debug_mask_ & gcomm::evs::Proto::D_CONSENSUS) == 0) \
18     {} else log_debug << proto_.uuid() << " "
19 
20 //
21 // Helpers
22 //
23 
24 class LeaveSeqCmpOp
25 {
26 public:
27 
operator ()(const gcomm::evs::MessageNodeList::value_type & a,const gcomm::evs::MessageNodeList::value_type & b) const28     bool operator()(const gcomm::evs::MessageNodeList::value_type& a,
29                     const gcomm::evs::MessageNodeList::value_type& b) const
30     {
31         using gcomm::evs::MessageNode;
32         using gcomm::evs::MessageNodeList;
33         const MessageNode& aval(MessageNodeList::value(a));
34         const MessageNode& bval(MessageNodeList::value(b));
35         gcomm_assert(aval.leaving() != false &&
36                      bval.leaving() != false);
37         const gcomm::evs::seqno_t asec(aval.leave_seq());
38         const gcomm::evs::seqno_t bsec(bval.leave_seq());
39         gcomm_assert(asec != -1 && bsec != -1);
40         return (asec < bsec);
41     }
42 };
43 
44 
45 class RangeLuCmp
46 {
47 public:
operator ()(const gcomm::evs::MessageNodeList::value_type & a,const gcomm::evs::MessageNodeList::value_type & b) const48     bool operator()(const gcomm::evs::MessageNodeList::value_type& a,
49                     const gcomm::evs::MessageNodeList::value_type& b) const
50     {
51         return (gcomm::evs::MessageNodeList::value(a).im_range().lu() <
52                 gcomm::evs::MessageNodeList::value(b).im_range().lu());
53     }
54 };
55 
56 
57 class SafeSeqCmp
58 {
59 public:
operator ()(const gcomm::evs::MessageNodeList::value_type & a,const gcomm::evs::MessageNodeList::value_type & b) const60     bool operator()(const gcomm::evs::MessageNodeList::value_type& a,
61                     const gcomm::evs::MessageNodeList::value_type& b) const
62     {
63         return (gcomm::evs::MessageNodeList::value(a).safe_seq() <
64                 gcomm::evs::MessageNodeList::value(b).safe_seq());
65     }
66 };
67 
68 
69 //
70 //
71 //
72 
73 
equal(const Message & m1,const Message & m2) const74 bool gcomm::evs::Consensus::equal(const Message& m1, const Message& m2) const
75 {
76     gcomm_assert(m1.type() == Message::EVS_T_JOIN ||
77                  m1.type() == Message::EVS_T_INSTALL);
78     gcomm_assert(m2.type() == Message::EVS_T_JOIN ||
79                  m2.type() == Message::EVS_T_INSTALL);
80 
81     // Seq and aru seq are comparable only if coming from same view
82     if (m1.source_view_id() == m2.source_view_id())
83     {
84         if (m1.seq() != m2.seq())
85         {
86             evs_log_debug(D_CONSENSUS) << "seq not equal " <<
87                 m1.seq() << " " << m2.seq();
88             return false;
89         }
90         if (m1.aru_seq() != m2.aru_seq())
91         {
92             evs_log_debug(D_CONSENSUS) << "aruseq not equal " <<
93                 m1.aru_seq() << " " << m2.aru_seq();
94             return false;
95         }
96     }
97 
98     MessageNodeList nl1, nl2;
99 
100     // When comparing messages from same source whole node list is comparable,
101     // otherwise only operational part of it.
102     if (m1.source() == m2.source())
103     {
104         for_each(m1.node_list().begin(), m1.node_list().end(),
105                  SelectNodesOp(nl1, m1.source_view_id(), true, true));
106         for_each(m2.node_list().begin(), m2.node_list().end(),
107                  SelectNodesOp(nl2, m2.source_view_id(), true, true));
108     }
109     else
110     {
111         for_each(m1.node_list().begin(), m1.node_list().end(),
112                  SelectNodesOp(nl1, ViewId(), true, false));
113         for_each(m2.node_list().begin(), m2.node_list().end(),
114                  SelectNodesOp(nl2, ViewId(), true, false));
115     }
116 
117     evs_log_debug(D_CONSENSUS) << "nl1: " << nl1 << " nl2: " << nl2;
118 
119     return (nl1 == nl2);
120 }
121 
122 
highest_reachable_safe_seq() const123 gcomm::evs::seqno_t gcomm::evs::Consensus::highest_reachable_safe_seq() const
124 {
125     std::vector<seqno_t> seq_list;
126     seq_list.reserve(known_.size());
127 
128     for (NodeMap::const_iterator i = known_.begin(); i != known_.end();
129          ++i)
130     {
131         const UUID& uuid(NodeMap::key(i));
132         const Node& node(NodeMap::value(i));
133         const JoinMessage* jm(node.join_message());
134         const LeaveMessage* lm(node.leave_message());
135 
136         if ((jm == 0 && current_view_.is_member(NodeMap::key(i)) == true) ||
137             (jm != 0 && jm->source_view_id() == current_view_.id()) ||
138             (lm != 0 && lm->source_view_id() == current_view_.id()))
139         {
140             if (lm != 0)
141             {
142                 if (proto_.is_all_suspected(uuid) == false)
143                 {
144                     seq_list.push_back(lm->seq());
145                 }
146             }
147             else if (node.operational() == false)
148             {
149                 seq_list.push_back(
150                     std::min(
151                         input_map_.safe_seq(node.index()),
152                         input_map_.range(node.index()).lu() - 1));
153             }
154             else
155             {
156                 seq_list.push_back(input_map_.range(node.index()).hs());
157             }
158         }
159     }
160 
161     return *std::min_element(seq_list.begin(), seq_list.end());
162 }
163 
164 gcomm::evs::seqno_t
safe_seq_wo_all_susupected_leaving_nodes() const165 gcomm::evs::Consensus::safe_seq_wo_all_susupected_leaving_nodes() const
166 {
167     seqno_t safe_seq(-2);
168     for(NodeMap::const_iterator i = proto_.known_.begin();
169         i != proto_.known_.end(); ++i)
170     {
171         const UUID& uuid(NodeMap::key(i));
172         const Node& node(NodeMap::value(i));
173         if (node.index() != std::numeric_limits<size_t>::max()) {
174             if (node.operational() == false &&
175                 node.leave_message() &&
176                 proto_.is_all_suspected(uuid)) {
177                 continue;
178             }
179             seqno_t ss = input_map_.safe_seq(node.index());
180             if (safe_seq == -2 ||
181                 ss < safe_seq) {
182                 safe_seq = ss;
183             }
184         }
185     }
186     return safe_seq;
187 }
188 
189 namespace gcomm {
190 namespace evs {
191 
192 class FilterAllSuspectedOp
193 {
194 public:
FilterAllSuspectedOp(MessageNodeList & nl,const Proto & proto)195     FilterAllSuspectedOp(MessageNodeList& nl,
196                          const Proto& proto)
197             :
198             nl_(nl), proto_(proto) {}
operator ()(const MessageNodeList::value_type & vt) const199     void operator()(const MessageNodeList::value_type& vt) const
200     {
201         const UUID& uuid(MessageNodeList::key(vt));
202         if (!proto_.is_all_suspected(uuid)) {
203             nl_.insert_unique(vt);
204         }
205     }
206 private:
207     MessageNodeList& nl_;
208     const Proto& proto_;
209 };
210 
211 } // evs
212 } // gcomm
213 
is_consistent_highest_reachable_safe_seq(const Message & msg) const214 bool gcomm::evs::Consensus::is_consistent_highest_reachable_safe_seq(
215     const Message& msg) const
216 {
217     gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
218                  msg.type() == Message::EVS_T_INSTALL);
219     gcomm_assert(msg.source_view_id() == current_view_.id());
220 
221     const MessageNodeList& node_list(msg.node_list());
222 
223     // Same view
224     MessageNodeList same_view;
225     for_each(node_list.begin(), node_list.end(),
226              SelectNodesOp(same_view, current_view_.id(), true, false));
227     MessageNodeList::const_iterator max_hs_i(max_element(same_view.begin(),
228                                                          same_view.end(),
229                                                          RangeHsCmp()));
230     gcomm_assert(max_hs_i != same_view.end());
231 
232     // Max highest seen
233     const seqno_t max_hs(
234         MessageNodeList::value(max_hs_i).im_range().hs());
235 
236     seqno_t max_reachable_safe_seq(max_hs);
237 
238     // Leaving Nodes
239     MessageNodeList t_leaving;
240     for_each(node_list.begin(), node_list.end(),
241              SelectNodesOp(t_leaving, current_view_.id(), false, true));
242     MessageNodeList leaving;
243     for_each(t_leaving.begin(), t_leaving.end(),
244              FilterAllSuspectedOp(leaving, proto_));
245 
246     if (leaving.empty() == false)
247     {
248         const MessageNodeList::const_iterator min_leave_seq_i(
249             std::min_element(leaving.begin(), leaving.end(),
250                         LeaveSeqCmpOp()));
251         gcomm_assert(min_leave_seq_i != leaving.end());
252         const seqno_t min_leave_seq(
253             MessageNodeList::value(min_leave_seq_i).leave_seq());
254         max_reachable_safe_seq = std::min(max_reachable_safe_seq, min_leave_seq);
255     }
256 
257     // Partitioning nodes
258     MessageNodeList partitioning;
259     for_each(node_list.begin(), node_list.end(),
260              SelectNodesOp(partitioning, current_view_.id(), false, false));
261 
262     if (partitioning.empty() == false)
263     {
264         MessageNodeList::const_iterator min_part_safe_seq_i(
265             std::min_element(partitioning.begin(), partitioning.end(),
266                         SafeSeqCmp()));
267         gcomm_assert(min_part_safe_seq_i != partitioning.end());
268         const seqno_t min_part_safe_seq(
269             MessageNodeList::value(min_part_safe_seq_i).safe_seq());
270         max_reachable_safe_seq = std::min(max_reachable_safe_seq,
271                                           min_part_safe_seq);
272 
273         MessageNodeList::const_iterator min_part_lu_i(
274             std::min_element(partitioning.begin(), partitioning.end(),
275                              RangeLuCmp()));
276         gcomm_assert(min_part_lu_i != partitioning.end());
277         const seqno_t min_part_lu(MessageNodeList::value(min_part_lu_i).im_range().lu() - 1);
278         max_reachable_safe_seq = std::min(max_reachable_safe_seq,
279                                           min_part_lu);
280     }
281 
282     evs_log_debug(D_CONSENSUS)
283         << " max reachable safe seq " << max_reachable_safe_seq
284         << " highest reachable safe seq " << highest_reachable_safe_seq()
285         << " max_hs " << max_hs
286         << " input map max hs " << input_map_.max_hs()
287         << " input map safe_seq " << input_map_.safe_seq()
288         << " safe seq wo suspected leaving nodes " << safe_seq_wo_all_susupected_leaving_nodes();
289 
290     return (input_map_.max_hs()       == max_hs                 &&
291             highest_reachable_safe_seq() == max_reachable_safe_seq &&
292             // input_map_.safe_seq()     == max_reachable_safe_seq);
293             safe_seq_wo_all_susupected_leaving_nodes() == max_reachable_safe_seq);
294 }
295 
296 
is_consistent_input_map(const Message & msg) const297 bool gcomm::evs::Consensus::is_consistent_input_map(const Message& msg) const
298 {
299     gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
300                  msg.type() == Message::EVS_T_INSTALL);
301     gcomm_assert(msg.source_view_id() == current_view_.id());
302 
303 
304     if (msg.aru_seq() != input_map_.aru_seq())
305     {
306         evs_log_debug(D_CONSENSUS) << "message aru seq "
307                                    << msg.aru_seq()
308                                    << " not consistent with input map aru seq "
309                                    << input_map_.aru_seq();
310         return false;
311     }
312 
313     if (msg.seq() != input_map_.safe_seq())
314     {
315         evs_log_debug(D_CONSENSUS) << "message safe seq "
316                                    << msg.seq()
317                                    << " not consistent with input map safe seq "
318                                    << input_map_.safe_seq();
319         return false;
320     }
321 
322     Map<const UUID, Range> local_insts, msg_insts;
323 
324     for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
325     {
326         const UUID& uuid(NodeMap::key(i));
327         const Node& node(NodeMap::value(i));
328         if (current_view_.is_member(uuid) == true)
329         {
330             gu_trace((void)local_insts.insert_unique(
331                          std::make_pair(uuid, input_map_.range(node.index()))));
332         }
333     }
334 
335     const MessageNodeList& m_insts(msg.node_list());
336 
337     for (MessageNodeList::const_iterator i = m_insts.begin();
338          i != m_insts.end(); ++i)
339     {
340         const UUID& msg_uuid(MessageNodeList::key(i));
341         const MessageNode& msg_inst(MessageNodeList::value(i));
342         if (msg_inst.view_id() == current_view_.id())
343         {
344             gu_trace((void)msg_insts.insert_unique(
345                          std::make_pair(msg_uuid, msg_inst.im_range())));
346         }
347     }
348 
349     evs_log_debug(D_CONSENSUS) << " msg_insts " << msg_insts
350                                << " local_insts " << local_insts;
351 
352     return (msg_insts == local_insts);
353 }
354 
355 
is_consistent_partitioning(const Message & msg) const356 bool gcomm::evs::Consensus::is_consistent_partitioning(const Message& msg) const
357 {
358     gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
359                  msg.type() == Message::EVS_T_INSTALL);
360     gcomm_assert(msg.source_view_id() == current_view_.id());
361 
362 
363     // Compare instances that were present in the current view but are
364     // not proceeding in the next view.
365 
366     Map<const UUID, Range> local_insts, msg_insts;
367 
368     for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
369     {
370         const UUID& uuid(NodeMap::key(i));
371         const Node& node(NodeMap::value(i));
372         if (node.operational()       == false &&
373             node.leave_message()     == 0     &&
374             current_view_.is_member(uuid) == true)
375         {
376             gu_trace((void)local_insts.insert_unique(
377                          std::make_pair(uuid,
378                                         input_map_.range(node.index()))));
379         }
380     }
381 
382     const MessageNodeList& m_insts = msg.node_list();
383 
384     for (MessageNodeList::const_iterator i = m_insts.begin();
385          i != m_insts.end(); ++i)
386     {
387         const UUID& m_uuid(MessageNodeList::key(i));
388         const MessageNode& m_inst(MessageNodeList::value(i));
389         if (m_inst.operational() == false &&
390             m_inst.leaving()     == false &&
391             m_inst.view_id()     == current_view_.id())
392         {
393             gu_trace((void)msg_insts.insert_unique(
394                          std::make_pair(m_uuid, m_inst.im_range())));
395         }
396     }
397 
398 
399     evs_log_debug(D_CONSENSUS) << " msg insts:\n" << msg_insts
400                                << " local insts:\n" << local_insts;
401     return (msg_insts == local_insts);
402 }
403 
404 
is_consistent_leaving(const Message & msg) const405 bool gcomm::evs::Consensus::is_consistent_leaving(const Message& msg) const
406 {
407     gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
408                  msg.type() == Message::EVS_T_INSTALL);
409     gcomm_assert(msg.source_view_id() == current_view_.id());
410 
411     // Compare instances that were present in the current view but are
412     // not proceeding in the next view.
413 
414     Map<const UUID, Range> local_insts, msg_insts;
415 
416     for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
417     {
418         const UUID& uuid(NodeMap::key(i));
419         const Node& inst(NodeMap::value(i));
420         const LeaveMessage* lm(inst.leave_message());
421 
422         if (inst.operational()   == false &&
423             lm                       != 0  &&
424             lm->source_view_id() == current_view_.id())
425         {
426             gu_trace((void)local_insts.insert_unique(
427                          std::make_pair(uuid, input_map_.range(inst.index()))));
428         }
429     }
430 
431     const MessageNodeList& m_insts = msg.node_list();
432 
433     for (MessageNodeList::const_iterator i = m_insts.begin();
434          i != m_insts.end(); ++i)
435     {
436         const UUID& m_uuid(MessageNodeList::key(i));
437         const MessageNode& m_inst(MessageNodeList::value(i));
438         if (m_inst.operational() == false &&
439             m_inst.leaving()     == true &&
440             m_inst.view_id()     == current_view_.id())
441         {
442             gu_trace((void)msg_insts.insert_unique(
443                          std::make_pair(m_uuid, m_inst.im_range())));
444         }
445     }
446 
447     evs_log_debug(D_CONSENSUS) << " msg insts " << msg_insts
448                                << " local insts " << local_insts;
449     return (local_insts == msg_insts);
450 }
451 
452 
is_consistent_same_view(const Message & msg) const453 bool gcomm::evs::Consensus::is_consistent_same_view(const Message& msg) const
454 {
455     gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
456                  msg.type() == Message::EVS_T_INSTALL);
457     gcomm_assert(msg.source_view_id() == current_view_.id());
458 
459     if (is_consistent_highest_reachable_safe_seq(msg) == false)
460     {
461         evs_log_debug(D_CONSENSUS)
462             << "highest reachable safe seq not consistent";
463         return false;
464     }
465 
466     if (is_consistent_input_map(msg) == false)
467     {
468         evs_log_debug(D_CONSENSUS) << "input map not consistent with " << msg;
469         return false;
470     }
471 
472     if (is_consistent_partitioning(msg) == false)
473     {
474         evs_log_debug(D_CONSENSUS) << "partitioning not consistent with " << msg;
475         return false;
476     }
477 
478     if (is_consistent_leaving(msg) == false)
479     {
480         evs_log_debug(D_CONSENSUS) << "leaving not consistent with " << msg;
481         return false;
482     }
483 
484     return true;
485 }
486 
487 
is_consistent(const Message & msg) const488 bool gcomm::evs::Consensus::is_consistent(const Message& msg) const
489 {
490     gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
491                  msg.type() == Message::EVS_T_INSTALL);
492 
493     const JoinMessage* my_jm =
494         NodeMap::value(known_.find_checked(proto_.uuid())).join_message();
495     if (my_jm == 0)
496     {
497         return false;
498     }
499     if (msg.source_view_id() == current_view_.id())
500     {
501         return (is_consistent_same_view(msg) == true &&
502                 equal(msg, *my_jm) == true);
503     }
504     else
505     {
506         return equal(msg, *my_jm);
507     }
508 }
509 
is_consensus() const510 bool gcomm::evs::Consensus::is_consensus() const
511 {
512     const JoinMessage* my_jm =
513         NodeMap::value(known_.find_checked(proto_.uuid())).join_message();
514 
515     if (my_jm == 0)
516     {
517         evs_log_debug(D_CONSENSUS) << "no own join message";
518         return false;
519     }
520 
521     if (is_consistent_same_view(*my_jm) == false)
522     {
523         evs_log_debug(D_CONSENSUS) << "own join message not consistent";
524         return false;
525     }
526 
527     for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
528     {
529         const Node& inst(NodeMap::value(i));
530         if (inst.operational() == true)
531         {
532             const JoinMessage* jm = inst.join_message();
533             if (jm == 0)
534             {
535                 evs_log_debug(D_CONSENSUS)
536                     << "no join message for " << NodeMap::key(i);
537                 return false;
538             }
539             // call is_consistent() instead of equal() to enforce strict
540             // check for messages originating from the same view (#541)
541             if (is_consistent(*jm) == false)
542             {
543                 evs_log_debug(D_CONSENSUS)
544                     << "join message " << *jm
545                     << " not consistent with my join " << *my_jm;
546                 return false;
547             }
548         }
549     }
550 
551     return true;
552 }
553