1 /*
2  * Copyright (C) 2009-2019 Codership Oy <info@codership.com>
3  */
4 
5 /*!
6  * @file evs_proto.hpp
7  *
8  * @brief EVS protocol implementation header.
9  */
10 
11 #ifndef GCOMM_EVS_PROTO_HPP
12 #define GCOMM_EVS_PROTO_HPP
13 
14 #include "gcomm/protolay.hpp"
15 #include "gcomm/view.hpp"
16 #include "gcomm/transport.hpp"
17 #include "gcomm/map.hpp"
18 #include "gu_histogram.hpp"
19 #include "gu_stats.hpp"
20 
21 #include "evs_seqno.hpp"
22 #include "evs_node.hpp"
23 #include "evs_consensus.hpp"
24 #include "protocol_version.hpp"
25 
26 #include "gu_datetime.hpp"
27 
28 #include <list>
29 #include <deque>
30 #include <vector>
31 #include <limits>
32 
33 namespace gcomm
34 {
35     namespace evs
36     {
37 
38         class Message;
39         class MessageNodeList;
40         class UserMessage;
41         class DelegateMessage;
42         class GapMessage;
43         class JoinMessage;
44         class InstallMessage;
45         class LeaveMessage;
46         class InputMap;
47         class InputMapMsg;
48         class Proto;
49         std::ostream& operator<<(std::ostream&, const Proto&);
50 
51         //
52         // Helper class for getting the location where
53         // certain methods are called from.
54         //
55         // Example usage:
56         // Method prototype:
57         // void fun(EVS_CALLER_ARG, int a)
58         //
59         // Calling:
60         // fun(EVS_CALLER, a)
61         //
62         // Logging inside function:
63         // log_debug << EVS_LOG_METHOD << "log message"
64         //
65         class Caller
66         {
67         public:
Caller(const char * const file,const int line)68             Caller(const char* const file, const int line) :
69                 file_(file),
70                 line_(line)
71             { }
72             friend std::ostream& operator<<(std::ostream&, const Caller&);
73         private:
74             const char* const file_;
75             const int         line_;
76         };
operator <<(std::ostream & os,const Caller & caller)77         inline std::ostream& operator<<(std::ostream& os, const Caller& caller)
78         {
79             return (os << caller.file_ << ": " << caller.line_ << ": ");
80         }
81 #define EVS_CALLER_ARG const Caller& caller
82 #define EVS_CALLER Caller(__FILE__, __LINE__)
83 #define EVS_LOG_METHOD __FUNCTION__ << " called from " << caller
84     }
85 }
86 
87 
88 /*!
89  * @brief Class implementing EVS protocol
90  */
91 class gcomm::evs::Proto : public Protolay
92 {
93 
94 public:
95     enum State {
96         S_CLOSED,
97         S_JOINING,
98         S_LEAVING,
99         S_GATHER,
100         S_INSTALL,
101         S_OPERATIONAL,
102         S_MAX
103     };
104 
to_string(const State s)105     static std::string to_string(const State s)
106     {
107         switch (s) {
108         case S_CLOSED:      return "CLOSED";
109         case S_JOINING:     return "JOINING";
110         case S_LEAVING:     return "LEAVING";
111         case S_GATHER:      return "GATHER";
112         case S_INSTALL:     return "INSTALL";
113         case S_OPERATIONAL: return "OPERATIONAL";
114         default:
115             gu_throw_fatal << "Invalid state";
116         }
117     }
118 
119     friend std::ostream& operator<<(std::ostream&, const Proto&);
120     friend class Consensus;
121 
122     /*!
123      * Default constructor.
124      */
125     Proto(gu::Config&    conf,
126           const UUID&    my_uuid,
127           SegmentId      segment,
128           const gu::URI& uri = gu::URI("evs://"),
129           const size_t   mtu = std::numeric_limits<size_t>::max(),
130           const View*    rst_view = NULL);
131     ~Proto();
132 
uuid() const133     const UUID& uuid() const { return my_uuid_; }
134 
self_string() const135     std::string self_string() const
136     {
137         std::ostringstream os;
138         os << "evs::proto(" << uuid() << ", " << to_string(state())
139            << ", " << current_view_.id() << ")";
140         return os.str();
141     }
142 
state() const143     State state() const { return state_; }
144 
known_size() const145     size_t known_size() const { return known_.size(); }
146 
is_output_empty() const147     bool is_output_empty() const { return output_.empty(); }
148 
149     std::string stats() const;
150     void reset_stats();
151 
152     // Return true if the message with seqno and given send window will
153     // cause flow control.
154     bool is_flow_control(const seqno_t seqno, const seqno_t win) const;
155     // Return true if sending the user message contained in dg
156     // should make all nodes to respond to the message. This happens
157     // if sending the datagram would cause some predefined (@todo name
158     // variable here) number of bytes to be exceeded without sending
159     // and user message without F_MSG_MORE flag.
160     bool request_user_msg_feedback(const gcomm::Datagram& dg) const;
161     int send_user(Datagram&,
162                   uint8_t,
163                   Order,
164                   seqno_t,
165                   seqno_t,
166                   size_t n_aggregated = 1);
mtu() const167     size_t mtu() const { return mtu_; }
168     size_t aggregate_len() const;
169     int send_user(const seqno_t);
170     void complete_user(const seqno_t);
171     int send_delegate(Datagram&, const UUID& target);
172     bool gap_rate_limit(const UUID&, const Range&) const;
173     // Send GAP message.
174     // @param range_uuid If non-nil, the gap message will contain request for
175     //                   retransmission of messages in given range.
176     // @param view_id View ID the gap message belongs to.
177     // @param range If non-empty denotes the range of messages to be resent
178     //              by the node with range_uuid
179     // @param commit If set, the gap informs that the node will commit to the
180     //               proposed view in previously received install message.
181     void send_gap(EVS_CALLER_ARG,
182                   const UUID& range_uuid, const ViewId& view_id, const Range range,
183                   bool commit = false);
184     const JoinMessage& create_join();
185     bool join_rate_limit() const;
186     void send_join(bool tval = true);
187     void set_join(const JoinMessage&, const UUID&);
188     void set_leave(const LeaveMessage&, const UUID&);
189     void send_leave(bool handle = true);
190     void send_install(EVS_CALLER_ARG);
191     void send_delayed_list();
192 
193     void resend(const UUID&, const Range);
194     void recover(const UUID&, const UUID&, const Range);
195 
196     void retrans_leaves(const MessageNodeList&);
197 
198     void set_inactive(const UUID&);
199     bool is_inactive(const UUID&) const;
200     void check_inactive();
201     // Clean up foreign nodes according to install message.
202     void cleanup_foreign(const InstallMessage&);
203     void cleanup_views();
204     void cleanup_evicted();
205     void cleanup_joins();
206 
207     size_t n_operational() const;
208 
209     void validate_reg_msg(const UserMessage&);
210     void deliver_finish(const InputMapMsg&);
211     void deliver();
212     void deliver_local(bool trans = false);
213     void deliver_causal(uint8_t user_type, seqno_t seqno, const Datagram&);
214     void validate_trans_msg(const UserMessage&);
215     void deliver_trans();
216     void deliver_reg_view(const InstallMessage&, const View&);
217     void deliver_trans_view(const InstallMessage&, const View&);
218     void deliver_empty_view();
219 
220     void setall_committed(bool val);
221     bool is_all_committed() const;
222     void setall_installed(bool val);
223     bool is_all_installed() const;
is_install_message() const224     bool is_install_message() const { return install_message_ != 0; }
225 
226     bool is_representative(const UUID& pid) const;
227 
228     void shift_to(const State, const bool send_j = true);
229     bool is_all_suspected(const UUID& uuid) const;
current_view() const230     const View& current_view() const { return current_view_; }
231 
232     // Message handlers
233 private:
234 
235 
236     /*!
237      * Update input map safe seq
238      * @param uuid Node uuid
239      * @param seq  Sequence number
240      * @return Input map seqno before updating
241      */
242     seqno_t update_im_safe_seq(const size_t uuid, const seqno_t seq);
243 
244     /*!
245      * Update input map safe seqs according to message node list. Only
246      * inactive nodes are allowed to be in
247      */
248     bool update_im_safe_seqs(const MessageNodeList&);
249     bool is_msg_from_previous_view(const Message&);
250     void check_suspects(const UUID&, const MessageNodeList&);
251     void cross_check_inactives(const UUID&, const MessageNodeList&);
252     void check_unseen();
253     void check_nil_view_id();
254     void asymmetry_elimination();
255     void handle_foreign(const Message&);
256     void send_request_retrans_gap(const UUID& target, const UUID& origin,
257                                   const Range& range);
258     // Request retransmission of messages.
259     // @param target Target node to request messages from.
260     // @param origin Origin of the range of messages to request.
261     // @param range Seqno range to request
262     void request_retrans(const UUID& target, const UUID& origin,
263                          const Range& range);
264     // Request missing messages from nodes in the same view.
265     // This method should be used only during configuration changes,
266     // not in operational state.
267     void request_missing();
268     // Retrans messages which may be missing from some nodes. This method
269     // should used only during configuration changes, not in
270     // operational state.
271     void retrans_missing();
272     // Handle user message which has view id different from
273     // current view ID.
274     // @return True if the message must be processed
275     void handle_user_from_different_view(const Node& node,
276                                          const UserMessage& msg);
277     void handle_user(const UserMessage&,
278                      NodeMap::iterator,
279                      const Datagram&);
280     void handle_delegate(const DelegateMessage&,
281                          NodeMap::iterator,
282                          const Datagram&);
283     void handle_gap(const GapMessage&, NodeMap::iterator);
284     void handle_join(const JoinMessage&, NodeMap::iterator);
285     void handle_leave(const LeaveMessage&, NodeMap::iterator);
286     void handle_install(const InstallMessage&, NodeMap::iterator);
287     void handle_delayed_list(const DelayedListMessage&, NodeMap::iterator);
288     void populate_node_list(MessageNodeList*) const;
289     void isolate(gu::datetime::Period period);
290 public:
291     static size_t unserialize_message(const UUID&,
292                                       const Datagram&,
293                                       Message*);
294     void handle_msg(const Message& msg,
295                     const Datagram& dg = Datagram(),
296                     bool direct = true);
297     // Protolay
298     void handle_up(const void*, const Datagram&, const ProtoUpMeta&);
299     int handle_down(Datagram& wb, const ProtoDownMeta& dm);
300 
301     int send_down(Datagram& dg, const ProtoDownMeta& dm);
302 
handle_stable_view(const View & view)303     void handle_stable_view(const View& view)
304     {
305         set_stable_view(view);
306     }
307 
handle_fencing(const UUID & uuid)308     void handle_fencing(const UUID& uuid) { }
309 
connect(bool first)310     void connect(bool first)
311     {
312         gu_trace(shift_to(S_JOINING));
313         gu_trace(send_join(first));
314     }
315 
close(bool force=false)316     void close(bool force = false)
317     {
318         // shifting to S_LEAVING from S_INSTALL is troublesome,
319         // instead of that raise a boolean flag to indicate that
320         // shifting to S_LEAVING should be done once S_OPERATIONAL
321         // is reached
322         //
323         // #760 - pending leave should be done also from S_GATHER,
324         // changing state to S_LEAVING resets timers and may prevent
325         // remaining nodes to reach new group until install timer
326         // times out
327         log_debug << self_string() << " closing in state " << state();
328         if (state() != S_GATHER && state() != S_INSTALL)
329         {
330             gu_trace(shift_to(S_LEAVING));
331             gu_trace(send_leave());
332             pending_leave_ = false;
333         }
334         else
335         {
336             pending_leave_ = true;
337         }
338     }
339 
close(const UUID & uuid)340     void close(const UUID& uuid)
341     {
342         set_inactive(uuid);
343     }
344 
345     bool set_param(const std::string& key, const std::string& val,
346                    Protolay::sync_param_cb_t& sync_param_cb);
347 
348     void handle_get_status(gu::Status& status) const;
349 
350     // gu::datetime::Date functions do appropriate actions for timer handling
351     // and return next expiration time
352 private:
353 public:
354     enum Timer
355     {
356         T_INACTIVITY,
357         T_RETRANS,
358         T_INSTALL,
359         T_STATS
360     };
361     /*!
362      * Internal timer list
363      */
364     typedef MultiMap<gu::datetime::Date, Timer> TimerList;
365 private:
366     TimerList timers_;
367 public:
368     // These need currently to be public for unit tests
369     void handle_inactivity_timer();
370     void handle_retrans_timer();
371     void handle_install_timer();
372     void handle_stats_timer();
373     gu::datetime::Date next_expiration(const Timer) const;
374     void reset_timer(Timer);
375     void cancel_timer(Timer);
376     gu::datetime::Date handle_timers();
377 
378     /*!
379      * @brief Flags controlling what debug information is logged if
380      *        debug logging is turned on.
381      */
382     enum DebugFlags
383     {
384         D_STATE         = 1 << 0,  /*!< State changes */
385         D_TIMERS        = 1 << 1,  /*!< Timer handling */
386         D_CONSENSUS     = 1 << 2,  /*!< Consensus protocol */
387         D_USER_MSGS     = 1 << 3,  /*!< User messages */
388         D_DELEGATE_MSGS = 1 << 4,  /*!< Delegate messages */
389         D_GAP_MSGS      = 1 << 5,  /*!< Gap messages */
390         D_JOIN_MSGS     = 1 << 6,  /*!< Join messages */
391         D_INSTALL_MSGS  = 1 << 7,  /*!< Install messages */
392         D_LEAVE_MSGS    = 1 << 8,  /*!< Leave messages */
393         D_FOREIGN_MSGS  = 1 << 9,  /*!< Foreing messages */
394         D_RETRANS       = 1 << 10, /*!< Retransmitted/recovered messages */
395         D_DELIVERY      = 1 << 11  /*!< Message delivery */
396     };
397 
398     /*!
399      * @brief Flags controlling what info log is printed in logs.
400      */
401     enum InfoFlags
402     {
403         I_VIEWS      = 1 << 0, /*!< View changes */
404         I_STATE      = 1 << 1, /*!< State change information */
405         I_STATISTICS = 1 << 2, /*!< Statistics */
406         I_PROFILING  = 1 << 3  /*!< Profiling information */
407     };
408 private:
409 
410     int version_;
411     int debug_mask_;
412     int info_mask_;
413     gu::datetime::Date last_stats_report_;
414     bool collect_stats_;
415     gu::Histogram hs_agreed_;
416     gu::Histogram hs_safe_;
417     gu::Histogram hs_local_causal_;
418     gu::Stats     safe_deliv_latency_;
419     long long int send_queue_s_;
420     long long int n_send_queue_s_;
421     std::vector<long long int> sent_msgs_;
422     long long int retrans_msgs_;
423     long long int recovered_msgs_;
424     std::vector<long long int> recvd_msgs_;
425     std::vector<long long int> delivered_msgs_;
426     bool delivering_;
427     UUID my_uuid_;
428     SegmentId segment_;
429     //
430     // Known instances
431     friend class Node;
432     friend class InspectNode;
433     NodeMap known_;
434     NodeMap::iterator self_i_;
435     //
436     gu::datetime::Period view_forget_timeout_;
437     gu::datetime::Period inactive_timeout_;
438     gu::datetime::Period suspect_timeout_;
439     gu::datetime::Period inactive_check_period_;
440     gu::datetime::Period retrans_period_;
441     gu::datetime::Period install_timeout_;
442     gu::datetime::Period join_retrans_period_;
443     gu::datetime::Period stats_report_period_;
444     gu::datetime::Period causal_keepalive_period_;
445 
446     gu::datetime::Period delay_margin_;
447     gu::datetime::Period delayed_keep_period_;
448 
449     gu::datetime::Date last_inactive_check_;
450     gu::datetime::Date last_causal_keepalive_;
451 
452     // Current view id
453     // ViewId current_view;
454     View current_view_;
455     View previous_view_;
456     typedef std::map<ViewId, gu::datetime::Date> ViewList;
457     // List of previously seen views from which messages should not be
458     // accepted anymore
459     ViewList previous_views_;
460     // Seen views in gather state, will be copied to previous views
461     // when shifting to operational
462     ViewList gather_views_;
463 
464     // Map containing received messages and aru/safe seqnos
465     InputMap* input_map_;
466     // Helper container for local causal messages
467     class CausalMessage
468     {
469     public:
CausalMessage(uint8_t user_type,seqno_t seqno,const Datagram & datagram)470         CausalMessage(uint8_t             user_type,
471                       seqno_t             seqno,
472                       const Datagram& datagram)
473             :
474             user_type_(user_type),
475             seqno_    (seqno    ),
476             datagram_ (datagram ),
477             tstamp_   (gu::datetime::Date::monotonic())
478         { }
user_type() const479         uint8_t             user_type() const { return user_type_; }
seqno() const480         seqno_t             seqno()     const { return seqno_    ; }
datagram() const481         const Datagram& datagram()  const { return datagram_ ; }
tstamp() const482         const gu::datetime::Date& tstamp()    const { return tstamp_   ; }
483     private:
484         uint8_t            user_type_;
485         seqno_t            seqno_;
486         Datagram       datagram_;
487         gu::datetime::Date tstamp_;
488     };
489     // Queue containing local causal messages
490     std::deque<CausalMessage> causal_queue_;
491     // Consensus module
492     Consensus consensus_;
493     // Last sent join tstamp
494     gu::datetime::Date last_sent_join_tstamp_;
495     // Last received install message
496     InstallMessage* install_message_;
497     // Highest seen view id seqno
498     uint32_t max_view_id_seq_;
499     // Install attempt counter
500     uint32_t attempt_seq_;
501     // Boolean to suppress logging when new view has been
502     // detected
503     bool new_view_logged_;
504     // Install timeout counting
505     int max_install_timeouts_;
506     int install_timeout_count_;
507     // Sequence number to maintain membership message FIFO order
508     int64_t fifo_seq_;
509     // Last sent seq
510     seqno_t last_sent_;
511     // Protocol send window size
512     seqno_t send_window_;
513     // User send window size
514     seqno_t user_send_window_;
515     // Bytes since the last user msg which will require feedback from
516     // other nodes (i.e. sent without F_MSG_MORE)
517     size_t bytes_since_request_user_msg_feedback_;
518     // Output message queue. Class implemented as a thin wrapper
519     // around std::deque<> with book keeping of outbound bytes.
520     class out_queue
521     {
522     public:
523         typedef std::deque<std::pair<Datagram, ProtoDownMeta> > queue_type;
524         typedef queue_type::const_iterator const_iterator;
525 
out_queue()526         out_queue()
527             : outbound_bytes_(),
528               queue_()
529         { }
530 
empty() const531         bool empty() const
532         {
533             assert(outbound_bytes_ || queue_.empty());
534             return (outbound_bytes_ == 0);
535         }
536 
push_back(const queue_type::value_type & msg)537         void push_back(const queue_type::value_type& msg)
538         {
539             outbound_bytes_ += msg.first.len();
540             queue_.push_back(msg);
541         }
542 
pop_front()543         void pop_front()
544         {
545             assert(not queue_.empty());
546             assert(outbound_bytes_ >= queue_.front().first.len());
547             outbound_bytes_ -= queue_.front().first.len();
548             queue_.pop_front();
549         }
550 
front() const551         const queue_type::value_type& front() const
552         {
553             assert(not queue_.empty());
554             return queue_.front();
555         }
556 
begin() const557         const_iterator begin() const { return queue_.begin(); }
558 
end() const559         const_iterator end() const { return queue_.end(); }
560 
size() const561         size_t size() const { return queue_.size(); }
562 
clear()563         void clear()
564         {
565             outbound_bytes_ = 0;
566             queue_.clear();
567         }
568 
outbound_bytes() const569         size_t outbound_bytes() const { return outbound_bytes_; }
570 
571         static const size_t max_outbound_bytes = (size_t(1) << 20);
572     private:
573         size_t outbound_bytes_;
574         queue_type queue_;
575     } output_;
576 
577     std::vector<gu::byte_t> send_buf_;
578     uint32_t max_output_size_;
579     size_t mtu_;
580     bool use_aggregate_;
581     bool self_loopback_;
582     State state_;
583     int shift_to_rfcnt_;
584     bool pending_leave_;
585     gu::datetime::Date isolation_end_;
586 
587     class DelayedEntry
588     {
589     public:
590         typedef enum
591         {
592             S_OK,
593             S_DELAYED
594         } State;
DelayedEntry(const std::string & addr)595         DelayedEntry(const std::string& addr)
596             :
597             addr_      (addr),
598             tstamp_(gu::datetime::Date::monotonic()),
599             state_(S_DELAYED),
600             state_change_cnt_(1)
601         { }
addr() const602         const std::string& addr() const { return addr_; }
603 
set_tstamp(gu::datetime::Date tstamp)604         void set_tstamp(gu::datetime::Date tstamp) { tstamp_ = tstamp; }
tstamp() const605         gu::datetime::Date tstamp() const { return tstamp_; }
606 
set_state(State state,const gu::datetime::Period decay_period,const gu::datetime::Date now)607         void set_state(State state,
608                        const gu::datetime::Period decay_period,
609                        const gu::datetime::Date now)
610         {
611             if (state == S_DELAYED && state_ != state)
612             {
613                 // Limit to 0xff, see DelayedList format in DelayedListMessage
614                 // restricts this value to uint8_t max.
615                 if (state_change_cnt_ < 0xff)
616                     ++state_change_cnt_;
617             }
618             else if (state == S_OK &&
619                      tstamp_ + decay_period < now)
620             {
621                 if (state_change_cnt_ > 0)
622                     --state_change_cnt_;
623             }
624             state_ = state;
625         }
state() const626         State state() const {return state_; }
state_change_cnt() const627         size_t state_change_cnt() const { return state_change_cnt_; }
628     private:
629         const std::string addr_;
630         gu::datetime::Date tstamp_;
631         State  state_;
632         size_t state_change_cnt_;
633     };
634 
635     typedef std::map<UUID, DelayedEntry> DelayedList;
636     DelayedList delayed_list_;
637     size_t      auto_evict_;
638 
639     // non-copyable
640     Proto(const Proto&);
641     void operator=(const Proto&);
642 };
643 
644 
645 #endif // EVS_PROTO_HPP
646