1 /*
2 * Copyright (C) 2009-2014 Codership Oy <info@codership.com>
3 */
4
5 #include "evs_consensus.hpp"
6 #include "evs_message2.hpp"
7 #include "evs_input_map2.hpp"
8 #include "evs_node.hpp"
9 #include "evs_proto.hpp"
10 #include "gcomm/view.hpp"
11
12 #include "gu_logger.hpp"
13
14 #include <vector>
15
16 // Disable debug logging until debug mask is available here
17 #define evs_log_debug(i) if ((proto_.debug_mask_ & gcomm::evs::Proto::D_CONSENSUS) == 0) \
18 {} else log_debug << proto_.uuid() << " "
19
20 //
21 // Helpers
22 //
23
24 class LeaveSeqCmpOp
25 {
26 public:
27
operator ()(const gcomm::evs::MessageNodeList::value_type & a,const gcomm::evs::MessageNodeList::value_type & b) const28 bool operator()(const gcomm::evs::MessageNodeList::value_type& a,
29 const gcomm::evs::MessageNodeList::value_type& b) const
30 {
31 using gcomm::evs::MessageNode;
32 using gcomm::evs::MessageNodeList;
33 const MessageNode& aval(MessageNodeList::value(a));
34 const MessageNode& bval(MessageNodeList::value(b));
35 gcomm_assert(aval.leaving() != false &&
36 bval.leaving() != false);
37 const gcomm::evs::seqno_t asec(aval.leave_seq());
38 const gcomm::evs::seqno_t bsec(bval.leave_seq());
39 gcomm_assert(asec != -1 && bsec != -1);
40 return (asec < bsec);
41 }
42 };
43
44
45 class RangeLuCmp
46 {
47 public:
operator ()(const gcomm::evs::MessageNodeList::value_type & a,const gcomm::evs::MessageNodeList::value_type & b) const48 bool operator()(const gcomm::evs::MessageNodeList::value_type& a,
49 const gcomm::evs::MessageNodeList::value_type& b) const
50 {
51 return (gcomm::evs::MessageNodeList::value(a).im_range().lu() <
52 gcomm::evs::MessageNodeList::value(b).im_range().lu());
53 }
54 };
55
56
57 class SafeSeqCmp
58 {
59 public:
operator ()(const gcomm::evs::MessageNodeList::value_type & a,const gcomm::evs::MessageNodeList::value_type & b) const60 bool operator()(const gcomm::evs::MessageNodeList::value_type& a,
61 const gcomm::evs::MessageNodeList::value_type& b) const
62 {
63 return (gcomm::evs::MessageNodeList::value(a).safe_seq() <
64 gcomm::evs::MessageNodeList::value(b).safe_seq());
65 }
66 };
67
68
69 //
70 //
71 //
72
73
equal(const Message & m1,const Message & m2) const74 bool gcomm::evs::Consensus::equal(const Message& m1, const Message& m2) const
75 {
76 gcomm_assert(m1.type() == Message::EVS_T_JOIN ||
77 m1.type() == Message::EVS_T_INSTALL);
78 gcomm_assert(m2.type() == Message::EVS_T_JOIN ||
79 m2.type() == Message::EVS_T_INSTALL);
80
81 // Seq and aru seq are comparable only if coming from same view
82 if (m1.source_view_id() == m2.source_view_id())
83 {
84 if (m1.seq() != m2.seq())
85 {
86 evs_log_debug(D_CONSENSUS) << "seq not equal " <<
87 m1.seq() << " " << m2.seq();
88 return false;
89 }
90 if (m1.aru_seq() != m2.aru_seq())
91 {
92 evs_log_debug(D_CONSENSUS) << "aruseq not equal " <<
93 m1.aru_seq() << " " << m2.aru_seq();
94 return false;
95 }
96 }
97
98 MessageNodeList nl1, nl2;
99
100 // When comparing messages from same source whole node list is comparable,
101 // otherwise only operational part of it.
102 if (m1.source() == m2.source())
103 {
104 for_each(m1.node_list().begin(), m1.node_list().end(),
105 SelectNodesOp(nl1, m1.source_view_id(), true, true));
106 for_each(m2.node_list().begin(), m2.node_list().end(),
107 SelectNodesOp(nl2, m2.source_view_id(), true, true));
108 }
109 else
110 {
111 for_each(m1.node_list().begin(), m1.node_list().end(),
112 SelectNodesOp(nl1, ViewId(), true, false));
113 for_each(m2.node_list().begin(), m2.node_list().end(),
114 SelectNodesOp(nl2, ViewId(), true, false));
115 }
116
117 evs_log_debug(D_CONSENSUS) << "nl1: " << nl1 << " nl2: " << nl2;
118
119 return (nl1 == nl2);
120 }
121
122
highest_reachable_safe_seq() const123 gcomm::evs::seqno_t gcomm::evs::Consensus::highest_reachable_safe_seq() const
124 {
125 std::vector<seqno_t> seq_list;
126 seq_list.reserve(known_.size());
127
128 for (NodeMap::const_iterator i = known_.begin(); i != known_.end();
129 ++i)
130 {
131 const UUID& uuid(NodeMap::key(i));
132 const Node& node(NodeMap::value(i));
133 const JoinMessage* jm(node.join_message());
134 const LeaveMessage* lm(node.leave_message());
135
136 if ((jm == 0 && current_view_.is_member(NodeMap::key(i)) == true) ||
137 (jm != 0 && jm->source_view_id() == current_view_.id()) ||
138 (lm != 0 && lm->source_view_id() == current_view_.id()))
139 {
140 if (lm != 0)
141 {
142 if (proto_.is_all_suspected(uuid) == false)
143 {
144 seq_list.push_back(lm->seq());
145 }
146 }
147 else if (node.operational() == false)
148 {
149 seq_list.push_back(
150 std::min(
151 input_map_.safe_seq(node.index()),
152 input_map_.range(node.index()).lu() - 1));
153 }
154 else
155 {
156 seq_list.push_back(input_map_.range(node.index()).hs());
157 }
158 }
159 }
160
161 return *std::min_element(seq_list.begin(), seq_list.end());
162 }
163
164 gcomm::evs::seqno_t
safe_seq_wo_all_susupected_leaving_nodes() const165 gcomm::evs::Consensus::safe_seq_wo_all_susupected_leaving_nodes() const
166 {
167 seqno_t safe_seq(-2);
168 for(NodeMap::const_iterator i = proto_.known_.begin();
169 i != proto_.known_.end(); ++i)
170 {
171 const UUID& uuid(NodeMap::key(i));
172 const Node& node(NodeMap::value(i));
173 if (node.index() != std::numeric_limits<size_t>::max()) {
174 if (node.operational() == false &&
175 node.leave_message() &&
176 proto_.is_all_suspected(uuid)) {
177 continue;
178 }
179 seqno_t ss = input_map_.safe_seq(node.index());
180 if (safe_seq == -2 ||
181 ss < safe_seq) {
182 safe_seq = ss;
183 }
184 }
185 }
186 return safe_seq;
187 }
188
189 namespace gcomm {
190 namespace evs {
191
192 class FilterAllSuspectedOp
193 {
194 public:
FilterAllSuspectedOp(MessageNodeList & nl,const Proto & proto)195 FilterAllSuspectedOp(MessageNodeList& nl,
196 const Proto& proto)
197 :
198 nl_(nl), proto_(proto) {}
operator ()(const MessageNodeList::value_type & vt) const199 void operator()(const MessageNodeList::value_type& vt) const
200 {
201 const UUID& uuid(MessageNodeList::key(vt));
202 if (!proto_.is_all_suspected(uuid)) {
203 nl_.insert_unique(vt);
204 }
205 }
206 private:
207 MessageNodeList& nl_;
208 const Proto& proto_;
209 };
210
211 } // evs
212 } // gcomm
213
is_consistent_highest_reachable_safe_seq(const Message & msg) const214 bool gcomm::evs::Consensus::is_consistent_highest_reachable_safe_seq(
215 const Message& msg) const
216 {
217 gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
218 msg.type() == Message::EVS_T_INSTALL);
219 gcomm_assert(msg.source_view_id() == current_view_.id());
220
221 const MessageNodeList& node_list(msg.node_list());
222
223 // Same view
224 MessageNodeList same_view;
225 for_each(node_list.begin(), node_list.end(),
226 SelectNodesOp(same_view, current_view_.id(), true, false));
227 MessageNodeList::const_iterator max_hs_i(max_element(same_view.begin(),
228 same_view.end(),
229 RangeHsCmp()));
230 gcomm_assert(max_hs_i != same_view.end());
231
232 // Max highest seen
233 const seqno_t max_hs(
234 MessageNodeList::value(max_hs_i).im_range().hs());
235
236 seqno_t max_reachable_safe_seq(max_hs);
237
238 // Leaving Nodes
239 MessageNodeList t_leaving;
240 for_each(node_list.begin(), node_list.end(),
241 SelectNodesOp(t_leaving, current_view_.id(), false, true));
242 MessageNodeList leaving;
243 for_each(t_leaving.begin(), t_leaving.end(),
244 FilterAllSuspectedOp(leaving, proto_));
245
246 if (leaving.empty() == false)
247 {
248 const MessageNodeList::const_iterator min_leave_seq_i(
249 std::min_element(leaving.begin(), leaving.end(),
250 LeaveSeqCmpOp()));
251 gcomm_assert(min_leave_seq_i != leaving.end());
252 const seqno_t min_leave_seq(
253 MessageNodeList::value(min_leave_seq_i).leave_seq());
254 max_reachable_safe_seq = std::min(max_reachable_safe_seq, min_leave_seq);
255 }
256
257 // Partitioning nodes
258 MessageNodeList partitioning;
259 for_each(node_list.begin(), node_list.end(),
260 SelectNodesOp(partitioning, current_view_.id(), false, false));
261
262 if (partitioning.empty() == false)
263 {
264 MessageNodeList::const_iterator min_part_safe_seq_i(
265 std::min_element(partitioning.begin(), partitioning.end(),
266 SafeSeqCmp()));
267 gcomm_assert(min_part_safe_seq_i != partitioning.end());
268 const seqno_t min_part_safe_seq(
269 MessageNodeList::value(min_part_safe_seq_i).safe_seq());
270 max_reachable_safe_seq = std::min(max_reachable_safe_seq,
271 min_part_safe_seq);
272
273 MessageNodeList::const_iterator min_part_lu_i(
274 std::min_element(partitioning.begin(), partitioning.end(),
275 RangeLuCmp()));
276 gcomm_assert(min_part_lu_i != partitioning.end());
277 const seqno_t min_part_lu(MessageNodeList::value(min_part_lu_i).im_range().lu() - 1);
278 max_reachable_safe_seq = std::min(max_reachable_safe_seq,
279 min_part_lu);
280 }
281
282 evs_log_debug(D_CONSENSUS)
283 << " max reachable safe seq " << max_reachable_safe_seq
284 << " highest reachable safe seq " << highest_reachable_safe_seq()
285 << " max_hs " << max_hs
286 << " input map max hs " << input_map_.max_hs()
287 << " input map safe_seq " << input_map_.safe_seq()
288 << " safe seq wo suspected leaving nodes " << safe_seq_wo_all_susupected_leaving_nodes();
289
290 return (input_map_.max_hs() == max_hs &&
291 highest_reachable_safe_seq() == max_reachable_safe_seq &&
292 // input_map_.safe_seq() == max_reachable_safe_seq);
293 safe_seq_wo_all_susupected_leaving_nodes() == max_reachable_safe_seq);
294 }
295
296
is_consistent_input_map(const Message & msg) const297 bool gcomm::evs::Consensus::is_consistent_input_map(const Message& msg) const
298 {
299 gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
300 msg.type() == Message::EVS_T_INSTALL);
301 gcomm_assert(msg.source_view_id() == current_view_.id());
302
303
304 if (msg.aru_seq() != input_map_.aru_seq())
305 {
306 evs_log_debug(D_CONSENSUS) << "message aru seq "
307 << msg.aru_seq()
308 << " not consistent with input map aru seq "
309 << input_map_.aru_seq();
310 return false;
311 }
312
313 if (msg.seq() != input_map_.safe_seq())
314 {
315 evs_log_debug(D_CONSENSUS) << "message safe seq "
316 << msg.seq()
317 << " not consistent with input map safe seq "
318 << input_map_.safe_seq();
319 return false;
320 }
321
322 Map<const UUID, Range> local_insts, msg_insts;
323
324 for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
325 {
326 const UUID& uuid(NodeMap::key(i));
327 const Node& node(NodeMap::value(i));
328 if (current_view_.is_member(uuid) == true)
329 {
330 gu_trace((void)local_insts.insert_unique(
331 std::make_pair(uuid, input_map_.range(node.index()))));
332 }
333 }
334
335 const MessageNodeList& m_insts(msg.node_list());
336
337 for (MessageNodeList::const_iterator i = m_insts.begin();
338 i != m_insts.end(); ++i)
339 {
340 const UUID& msg_uuid(MessageNodeList::key(i));
341 const MessageNode& msg_inst(MessageNodeList::value(i));
342 if (msg_inst.view_id() == current_view_.id())
343 {
344 gu_trace((void)msg_insts.insert_unique(
345 std::make_pair(msg_uuid, msg_inst.im_range())));
346 }
347 }
348
349 evs_log_debug(D_CONSENSUS) << " msg_insts " << msg_insts
350 << " local_insts " << local_insts;
351
352 return (msg_insts == local_insts);
353 }
354
355
is_consistent_partitioning(const Message & msg) const356 bool gcomm::evs::Consensus::is_consistent_partitioning(const Message& msg) const
357 {
358 gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
359 msg.type() == Message::EVS_T_INSTALL);
360 gcomm_assert(msg.source_view_id() == current_view_.id());
361
362
363 // Compare instances that were present in the current view but are
364 // not proceeding in the next view.
365
366 Map<const UUID, Range> local_insts, msg_insts;
367
368 for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
369 {
370 const UUID& uuid(NodeMap::key(i));
371 const Node& node(NodeMap::value(i));
372 if (node.operational() == false &&
373 node.leave_message() == 0 &&
374 current_view_.is_member(uuid) == true)
375 {
376 gu_trace((void)local_insts.insert_unique(
377 std::make_pair(uuid,
378 input_map_.range(node.index()))));
379 }
380 }
381
382 const MessageNodeList& m_insts = msg.node_list();
383
384 for (MessageNodeList::const_iterator i = m_insts.begin();
385 i != m_insts.end(); ++i)
386 {
387 const UUID& m_uuid(MessageNodeList::key(i));
388 const MessageNode& m_inst(MessageNodeList::value(i));
389 if (m_inst.operational() == false &&
390 m_inst.leaving() == false &&
391 m_inst.view_id() == current_view_.id())
392 {
393 gu_trace((void)msg_insts.insert_unique(
394 std::make_pair(m_uuid, m_inst.im_range())));
395 }
396 }
397
398
399 evs_log_debug(D_CONSENSUS) << " msg insts:\n" << msg_insts
400 << " local insts:\n" << local_insts;
401 return (msg_insts == local_insts);
402 }
403
404
is_consistent_leaving(const Message & msg) const405 bool gcomm::evs::Consensus::is_consistent_leaving(const Message& msg) const
406 {
407 gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
408 msg.type() == Message::EVS_T_INSTALL);
409 gcomm_assert(msg.source_view_id() == current_view_.id());
410
411 // Compare instances that were present in the current view but are
412 // not proceeding in the next view.
413
414 Map<const UUID, Range> local_insts, msg_insts;
415
416 for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
417 {
418 const UUID& uuid(NodeMap::key(i));
419 const Node& inst(NodeMap::value(i));
420 const LeaveMessage* lm(inst.leave_message());
421
422 if (inst.operational() == false &&
423 lm != 0 &&
424 lm->source_view_id() == current_view_.id())
425 {
426 gu_trace((void)local_insts.insert_unique(
427 std::make_pair(uuid, input_map_.range(inst.index()))));
428 }
429 }
430
431 const MessageNodeList& m_insts = msg.node_list();
432
433 for (MessageNodeList::const_iterator i = m_insts.begin();
434 i != m_insts.end(); ++i)
435 {
436 const UUID& m_uuid(MessageNodeList::key(i));
437 const MessageNode& m_inst(MessageNodeList::value(i));
438 if (m_inst.operational() == false &&
439 m_inst.leaving() == true &&
440 m_inst.view_id() == current_view_.id())
441 {
442 gu_trace((void)msg_insts.insert_unique(
443 std::make_pair(m_uuid, m_inst.im_range())));
444 }
445 }
446
447 evs_log_debug(D_CONSENSUS) << " msg insts " << msg_insts
448 << " local insts " << local_insts;
449 return (local_insts == msg_insts);
450 }
451
452
is_consistent_same_view(const Message & msg) const453 bool gcomm::evs::Consensus::is_consistent_same_view(const Message& msg) const
454 {
455 gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
456 msg.type() == Message::EVS_T_INSTALL);
457 gcomm_assert(msg.source_view_id() == current_view_.id());
458
459 if (is_consistent_highest_reachable_safe_seq(msg) == false)
460 {
461 evs_log_debug(D_CONSENSUS)
462 << "highest reachable safe seq not consistent";
463 return false;
464 }
465
466 if (is_consistent_input_map(msg) == false)
467 {
468 evs_log_debug(D_CONSENSUS) << "input map not consistent with " << msg;
469 return false;
470 }
471
472 if (is_consistent_partitioning(msg) == false)
473 {
474 evs_log_debug(D_CONSENSUS) << "partitioning not consistent with " << msg;
475 return false;
476 }
477
478 if (is_consistent_leaving(msg) == false)
479 {
480 evs_log_debug(D_CONSENSUS) << "leaving not consistent with " << msg;
481 return false;
482 }
483
484 return true;
485 }
486
487
is_consistent(const Message & msg) const488 bool gcomm::evs::Consensus::is_consistent(const Message& msg) const
489 {
490 gcomm_assert(msg.type() == Message::EVS_T_JOIN ||
491 msg.type() == Message::EVS_T_INSTALL);
492
493 const JoinMessage* my_jm =
494 NodeMap::value(known_.find_checked(proto_.uuid())).join_message();
495 if (my_jm == 0)
496 {
497 return false;
498 }
499 if (msg.source_view_id() == current_view_.id())
500 {
501 return (is_consistent_same_view(msg) == true &&
502 equal(msg, *my_jm) == true);
503 }
504 else
505 {
506 return equal(msg, *my_jm);
507 }
508 }
509
is_consensus() const510 bool gcomm::evs::Consensus::is_consensus() const
511 {
512 const JoinMessage* my_jm =
513 NodeMap::value(known_.find_checked(proto_.uuid())).join_message();
514
515 if (my_jm == 0)
516 {
517 evs_log_debug(D_CONSENSUS) << "no own join message";
518 return false;
519 }
520
521 if (is_consistent_same_view(*my_jm) == false)
522 {
523 evs_log_debug(D_CONSENSUS) << "own join message not consistent";
524 return false;
525 }
526
527 for (NodeMap::const_iterator i = known_.begin(); i != known_.end(); ++i)
528 {
529 const Node& inst(NodeMap::value(i));
530 if (inst.operational() == true)
531 {
532 const JoinMessage* jm = inst.join_message();
533 if (jm == 0)
534 {
535 evs_log_debug(D_CONSENSUS)
536 << "no join message for " << NodeMap::key(i);
537 return false;
538 }
539 // call is_consistent() instead of equal() to enforce strict
540 // check for messages originating from the same view (#541)
541 if (is_consistent(*jm) == false)
542 {
543 evs_log_debug(D_CONSENSUS)
544 << "join message " << *jm
545 << " not consistent with my join " << *my_jm;
546 return false;
547 }
548 }
549 }
550
551 return true;
552 }
553