1 /* 2 * Copyright (C) 2009-2019 Codership Oy <info@codership.com> 3 */ 4 5 /*! 6 * @file evs_proto.hpp 7 * 8 * @brief EVS protocol implementation header. 9 */ 10 11 #ifndef GCOMM_EVS_PROTO_HPP 12 #define GCOMM_EVS_PROTO_HPP 13 14 #include "gcomm/protolay.hpp" 15 #include "gcomm/view.hpp" 16 #include "gcomm/transport.hpp" 17 #include "gcomm/map.hpp" 18 #include "gu_histogram.hpp" 19 #include "gu_stats.hpp" 20 21 #include "evs_seqno.hpp" 22 #include "evs_node.hpp" 23 #include "evs_consensus.hpp" 24 #include "protocol_version.hpp" 25 26 #include "gu_datetime.hpp" 27 28 #include <list> 29 #include <deque> 30 #include <vector> 31 #include <limits> 32 33 namespace gcomm 34 { 35 namespace evs 36 { 37 38 class Message; 39 class MessageNodeList; 40 class UserMessage; 41 class DelegateMessage; 42 class GapMessage; 43 class JoinMessage; 44 class InstallMessage; 45 class LeaveMessage; 46 class InputMap; 47 class InputMapMsg; 48 class Proto; 49 std::ostream& operator<<(std::ostream&, const Proto&); 50 51 // 52 // Helper class for getting the location where 53 // certain methods are called from. 54 // 55 // Example usage: 56 // Method prototype: 57 // void fun(EVS_CALLER_ARG, int a) 58 // 59 // Calling: 60 // fun(EVS_CALLER, a) 61 // 62 // Logging inside function: 63 // log_debug << EVS_LOG_METHOD << "log message" 64 // 65 class Caller 66 { 67 public: Caller(const char * const file,const int line)68 Caller(const char* const file, const int line) : 69 file_(file), 70 line_(line) 71 { } 72 friend std::ostream& operator<<(std::ostream&, const Caller&); 73 private: 74 const char* const file_; 75 const int line_; 76 }; operator <<(std::ostream & os,const Caller & caller)77 inline std::ostream& operator<<(std::ostream& os, const Caller& caller) 78 { 79 return (os << caller.file_ << ": " << caller.line_ << ": "); 80 } 81 #define EVS_CALLER_ARG const Caller& caller 82 #define EVS_CALLER Caller(__FILE__, __LINE__) 83 #define EVS_LOG_METHOD __FUNCTION__ << " called from " << caller 84 } 85 } 86 87 88 /*! 89 * @brief Class implementing EVS protocol 90 */ 91 class gcomm::evs::Proto : public Protolay 92 { 93 94 public: 95 enum State { 96 S_CLOSED, 97 S_JOINING, 98 S_LEAVING, 99 S_GATHER, 100 S_INSTALL, 101 S_OPERATIONAL, 102 S_MAX 103 }; 104 to_string(const State s)105 static std::string to_string(const State s) 106 { 107 switch (s) { 108 case S_CLOSED: return "CLOSED"; 109 case S_JOINING: return "JOINING"; 110 case S_LEAVING: return "LEAVING"; 111 case S_GATHER: return "GATHER"; 112 case S_INSTALL: return "INSTALL"; 113 case S_OPERATIONAL: return "OPERATIONAL"; 114 default: 115 gu_throw_fatal << "Invalid state"; 116 } 117 } 118 119 friend std::ostream& operator<<(std::ostream&, const Proto&); 120 friend class Consensus; 121 122 /*! 123 * Default constructor. 124 */ 125 Proto(gu::Config& conf, 126 const UUID& my_uuid, 127 SegmentId segment, 128 const gu::URI& uri = gu::URI("evs://"), 129 const size_t mtu = std::numeric_limits<size_t>::max(), 130 const View* rst_view = NULL); 131 ~Proto(); 132 uuid() const133 const UUID& uuid() const { return my_uuid_; } 134 self_string() const135 std::string self_string() const 136 { 137 std::ostringstream os; 138 os << "evs::proto(" << uuid() << ", " << to_string(state()) 139 << ", " << current_view_.id() << ")"; 140 return os.str(); 141 } 142 state() const143 State state() const { return state_; } 144 known_size() const145 size_t known_size() const { return known_.size(); } 146 is_output_empty() const147 bool is_output_empty() const { return output_.empty(); } 148 149 std::string stats() const; 150 void reset_stats(); 151 152 // Return true if the message with seqno and given send window will 153 // cause flow control. 154 bool is_flow_control(const seqno_t seqno, const seqno_t win) const; 155 // Return true if sending the user message contained in dg 156 // should make all nodes to respond to the message. This happens 157 // if sending the datagram would cause some predefined (@todo name 158 // variable here) number of bytes to be exceeded without sending 159 // and user message without F_MSG_MORE flag. 160 bool request_user_msg_feedback(const gcomm::Datagram& dg) const; 161 int send_user(Datagram&, 162 uint8_t, 163 Order, 164 seqno_t, 165 seqno_t, 166 size_t n_aggregated = 1); mtu() const167 size_t mtu() const { return mtu_; } 168 size_t aggregate_len() const; 169 int send_user(const seqno_t); 170 void complete_user(const seqno_t); 171 int send_delegate(Datagram&, const UUID& target); 172 bool gap_rate_limit(const UUID&, const Range&) const; 173 // Send GAP message. 174 // @param range_uuid If non-nil, the gap message will contain request for 175 // retransmission of messages in given range. 176 // @param view_id View ID the gap message belongs to. 177 // @param range If non-empty denotes the range of messages to be resent 178 // by the node with range_uuid 179 // @param commit If set, the gap informs that the node will commit to the 180 // proposed view in previously received install message. 181 void send_gap(EVS_CALLER_ARG, 182 const UUID& range_uuid, const ViewId& view_id, const Range range, 183 bool commit = false); 184 const JoinMessage& create_join(); 185 bool join_rate_limit() const; 186 void send_join(bool tval = true); 187 void set_join(const JoinMessage&, const UUID&); 188 void set_leave(const LeaveMessage&, const UUID&); 189 void send_leave(bool handle = true); 190 void send_install(EVS_CALLER_ARG); 191 void send_delayed_list(); 192 193 void resend(const UUID&, const Range); 194 void recover(const UUID&, const UUID&, const Range); 195 196 void retrans_leaves(const MessageNodeList&); 197 198 void set_inactive(const UUID&); 199 bool is_inactive(const UUID&) const; 200 void check_inactive(); 201 // Clean up foreign nodes according to install message. 202 void cleanup_foreign(const InstallMessage&); 203 void cleanup_views(); 204 void cleanup_evicted(); 205 void cleanup_joins(); 206 207 size_t n_operational() const; 208 209 void validate_reg_msg(const UserMessage&); 210 void deliver_finish(const InputMapMsg&); 211 void deliver(); 212 void deliver_local(bool trans = false); 213 void deliver_causal(uint8_t user_type, seqno_t seqno, const Datagram&); 214 void validate_trans_msg(const UserMessage&); 215 void deliver_trans(); 216 void deliver_reg_view(const InstallMessage&, const View&); 217 void deliver_trans_view(const InstallMessage&, const View&); 218 void deliver_empty_view(); 219 220 void setall_committed(bool val); 221 bool is_all_committed() const; 222 void setall_installed(bool val); 223 bool is_all_installed() const; is_install_message() const224 bool is_install_message() const { return install_message_ != 0; } 225 226 bool is_representative(const UUID& pid) const; 227 228 void shift_to(const State, const bool send_j = true); 229 bool is_all_suspected(const UUID& uuid) const; current_view() const230 const View& current_view() const { return current_view_; } 231 232 // Message handlers 233 private: 234 235 236 /*! 237 * Update input map safe seq 238 * @param uuid Node uuid 239 * @param seq Sequence number 240 * @return Input map seqno before updating 241 */ 242 seqno_t update_im_safe_seq(const size_t uuid, const seqno_t seq); 243 244 /*! 245 * Update input map safe seqs according to message node list. Only 246 * inactive nodes are allowed to be in 247 */ 248 bool update_im_safe_seqs(const MessageNodeList&); 249 bool is_msg_from_previous_view(const Message&); 250 void check_suspects(const UUID&, const MessageNodeList&); 251 void cross_check_inactives(const UUID&, const MessageNodeList&); 252 void check_unseen(); 253 void check_nil_view_id(); 254 void asymmetry_elimination(); 255 void handle_foreign(const Message&); 256 void send_request_retrans_gap(const UUID& target, const UUID& origin, 257 const Range& range); 258 // Request retransmission of messages. 259 // @param target Target node to request messages from. 260 // @param origin Origin of the range of messages to request. 261 // @param range Seqno range to request 262 void request_retrans(const UUID& target, const UUID& origin, 263 const Range& range); 264 // Request missing messages from nodes in the same view. 265 // This method should be used only during configuration changes, 266 // not in operational state. 267 void request_missing(); 268 // Retrans messages which may be missing from some nodes. This method 269 // should used only during configuration changes, not in 270 // operational state. 271 void retrans_missing(); 272 // Handle user message which has view id different from 273 // current view ID. 274 // @return True if the message must be processed 275 void handle_user_from_different_view(const Node& node, 276 const UserMessage& msg); 277 void handle_user(const UserMessage&, 278 NodeMap::iterator, 279 const Datagram&); 280 void handle_delegate(const DelegateMessage&, 281 NodeMap::iterator, 282 const Datagram&); 283 void handle_gap(const GapMessage&, NodeMap::iterator); 284 void handle_join(const JoinMessage&, NodeMap::iterator); 285 void handle_leave(const LeaveMessage&, NodeMap::iterator); 286 void handle_install(const InstallMessage&, NodeMap::iterator); 287 void handle_delayed_list(const DelayedListMessage&, NodeMap::iterator); 288 void populate_node_list(MessageNodeList*) const; 289 void isolate(gu::datetime::Period period); 290 public: 291 static size_t unserialize_message(const UUID&, 292 const Datagram&, 293 Message*); 294 void handle_msg(const Message& msg, 295 const Datagram& dg = Datagram(), 296 bool direct = true); 297 // Protolay 298 void handle_up(const void*, const Datagram&, const ProtoUpMeta&); 299 int handle_down(Datagram& wb, const ProtoDownMeta& dm); 300 301 int send_down(Datagram& dg, const ProtoDownMeta& dm); 302 handle_stable_view(const View & view)303 void handle_stable_view(const View& view) 304 { 305 set_stable_view(view); 306 } 307 handle_fencing(const UUID & uuid)308 void handle_fencing(const UUID& uuid) { } 309 connect(bool first)310 void connect(bool first) 311 { 312 gu_trace(shift_to(S_JOINING)); 313 gu_trace(send_join(first)); 314 } 315 close(bool force=false)316 void close(bool force = false) 317 { 318 // shifting to S_LEAVING from S_INSTALL is troublesome, 319 // instead of that raise a boolean flag to indicate that 320 // shifting to S_LEAVING should be done once S_OPERATIONAL 321 // is reached 322 // 323 // #760 - pending leave should be done also from S_GATHER, 324 // changing state to S_LEAVING resets timers and may prevent 325 // remaining nodes to reach new group until install timer 326 // times out 327 log_debug << self_string() << " closing in state " << state(); 328 if (state() != S_GATHER && state() != S_INSTALL) 329 { 330 gu_trace(shift_to(S_LEAVING)); 331 gu_trace(send_leave()); 332 pending_leave_ = false; 333 } 334 else 335 { 336 pending_leave_ = true; 337 } 338 } 339 close(const UUID & uuid)340 void close(const UUID& uuid) 341 { 342 set_inactive(uuid); 343 } 344 345 bool set_param(const std::string& key, const std::string& val, 346 Protolay::sync_param_cb_t& sync_param_cb); 347 348 void handle_get_status(gu::Status& status) const; 349 350 // gu::datetime::Date functions do appropriate actions for timer handling 351 // and return next expiration time 352 private: 353 public: 354 enum Timer 355 { 356 T_INACTIVITY, 357 T_RETRANS, 358 T_INSTALL, 359 T_STATS 360 }; 361 /*! 362 * Internal timer list 363 */ 364 typedef MultiMap<gu::datetime::Date, Timer> TimerList; 365 private: 366 TimerList timers_; 367 public: 368 // These need currently to be public for unit tests 369 void handle_inactivity_timer(); 370 void handle_retrans_timer(); 371 void handle_install_timer(); 372 void handle_stats_timer(); 373 gu::datetime::Date next_expiration(const Timer) const; 374 void reset_timer(Timer); 375 void cancel_timer(Timer); 376 gu::datetime::Date handle_timers(); 377 378 /*! 379 * @brief Flags controlling what debug information is logged if 380 * debug logging is turned on. 381 */ 382 enum DebugFlags 383 { 384 D_STATE = 1 << 0, /*!< State changes */ 385 D_TIMERS = 1 << 1, /*!< Timer handling */ 386 D_CONSENSUS = 1 << 2, /*!< Consensus protocol */ 387 D_USER_MSGS = 1 << 3, /*!< User messages */ 388 D_DELEGATE_MSGS = 1 << 4, /*!< Delegate messages */ 389 D_GAP_MSGS = 1 << 5, /*!< Gap messages */ 390 D_JOIN_MSGS = 1 << 6, /*!< Join messages */ 391 D_INSTALL_MSGS = 1 << 7, /*!< Install messages */ 392 D_LEAVE_MSGS = 1 << 8, /*!< Leave messages */ 393 D_FOREIGN_MSGS = 1 << 9, /*!< Foreing messages */ 394 D_RETRANS = 1 << 10, /*!< Retransmitted/recovered messages */ 395 D_DELIVERY = 1 << 11 /*!< Message delivery */ 396 }; 397 398 /*! 399 * @brief Flags controlling what info log is printed in logs. 400 */ 401 enum InfoFlags 402 { 403 I_VIEWS = 1 << 0, /*!< View changes */ 404 I_STATE = 1 << 1, /*!< State change information */ 405 I_STATISTICS = 1 << 2, /*!< Statistics */ 406 I_PROFILING = 1 << 3 /*!< Profiling information */ 407 }; 408 private: 409 410 int version_; 411 int debug_mask_; 412 int info_mask_; 413 gu::datetime::Date last_stats_report_; 414 bool collect_stats_; 415 gu::Histogram hs_agreed_; 416 gu::Histogram hs_safe_; 417 gu::Histogram hs_local_causal_; 418 gu::Stats safe_deliv_latency_; 419 long long int send_queue_s_; 420 long long int n_send_queue_s_; 421 std::vector<long long int> sent_msgs_; 422 long long int retrans_msgs_; 423 long long int recovered_msgs_; 424 std::vector<long long int> recvd_msgs_; 425 std::vector<long long int> delivered_msgs_; 426 bool delivering_; 427 UUID my_uuid_; 428 SegmentId segment_; 429 // 430 // Known instances 431 friend class Node; 432 friend class InspectNode; 433 NodeMap known_; 434 NodeMap::iterator self_i_; 435 // 436 gu::datetime::Period view_forget_timeout_; 437 gu::datetime::Period inactive_timeout_; 438 gu::datetime::Period suspect_timeout_; 439 gu::datetime::Period inactive_check_period_; 440 gu::datetime::Period retrans_period_; 441 gu::datetime::Period install_timeout_; 442 gu::datetime::Period join_retrans_period_; 443 gu::datetime::Period stats_report_period_; 444 gu::datetime::Period causal_keepalive_period_; 445 446 gu::datetime::Period delay_margin_; 447 gu::datetime::Period delayed_keep_period_; 448 449 gu::datetime::Date last_inactive_check_; 450 gu::datetime::Date last_causal_keepalive_; 451 452 // Current view id 453 // ViewId current_view; 454 View current_view_; 455 View previous_view_; 456 typedef std::map<ViewId, gu::datetime::Date> ViewList; 457 // List of previously seen views from which messages should not be 458 // accepted anymore 459 ViewList previous_views_; 460 // Seen views in gather state, will be copied to previous views 461 // when shifting to operational 462 ViewList gather_views_; 463 464 // Map containing received messages and aru/safe seqnos 465 InputMap* input_map_; 466 // Helper container for local causal messages 467 class CausalMessage 468 { 469 public: CausalMessage(uint8_t user_type,seqno_t seqno,const Datagram & datagram)470 CausalMessage(uint8_t user_type, 471 seqno_t seqno, 472 const Datagram& datagram) 473 : 474 user_type_(user_type), 475 seqno_ (seqno ), 476 datagram_ (datagram ), 477 tstamp_ (gu::datetime::Date::monotonic()) 478 { } user_type() const479 uint8_t user_type() const { return user_type_; } seqno() const480 seqno_t seqno() const { return seqno_ ; } datagram() const481 const Datagram& datagram() const { return datagram_ ; } tstamp() const482 const gu::datetime::Date& tstamp() const { return tstamp_ ; } 483 private: 484 uint8_t user_type_; 485 seqno_t seqno_; 486 Datagram datagram_; 487 gu::datetime::Date tstamp_; 488 }; 489 // Queue containing local causal messages 490 std::deque<CausalMessage> causal_queue_; 491 // Consensus module 492 Consensus consensus_; 493 // Last sent join tstamp 494 gu::datetime::Date last_sent_join_tstamp_; 495 // Last received install message 496 InstallMessage* install_message_; 497 // Highest seen view id seqno 498 uint32_t max_view_id_seq_; 499 // Install attempt counter 500 uint32_t attempt_seq_; 501 // Boolean to suppress logging when new view has been 502 // detected 503 bool new_view_logged_; 504 // Install timeout counting 505 int max_install_timeouts_; 506 int install_timeout_count_; 507 // Sequence number to maintain membership message FIFO order 508 int64_t fifo_seq_; 509 // Last sent seq 510 seqno_t last_sent_; 511 // Protocol send window size 512 seqno_t send_window_; 513 // User send window size 514 seqno_t user_send_window_; 515 // Bytes since the last user msg which will require feedback from 516 // other nodes (i.e. sent without F_MSG_MORE) 517 size_t bytes_since_request_user_msg_feedback_; 518 // Output message queue. Class implemented as a thin wrapper 519 // around std::deque<> with book keeping of outbound bytes. 520 class out_queue 521 { 522 public: 523 typedef std::deque<std::pair<Datagram, ProtoDownMeta> > queue_type; 524 typedef queue_type::const_iterator const_iterator; 525 out_queue()526 out_queue() 527 : outbound_bytes_(), 528 queue_() 529 { } 530 empty() const531 bool empty() const 532 { 533 assert(outbound_bytes_ || queue_.empty()); 534 return (outbound_bytes_ == 0); 535 } 536 push_back(const queue_type::value_type & msg)537 void push_back(const queue_type::value_type& msg) 538 { 539 outbound_bytes_ += msg.first.len(); 540 queue_.push_back(msg); 541 } 542 pop_front()543 void pop_front() 544 { 545 assert(not queue_.empty()); 546 assert(outbound_bytes_ >= queue_.front().first.len()); 547 outbound_bytes_ -= queue_.front().first.len(); 548 queue_.pop_front(); 549 } 550 front() const551 const queue_type::value_type& front() const 552 { 553 assert(not queue_.empty()); 554 return queue_.front(); 555 } 556 begin() const557 const_iterator begin() const { return queue_.begin(); } 558 end() const559 const_iterator end() const { return queue_.end(); } 560 size() const561 size_t size() const { return queue_.size(); } 562 clear()563 void clear() 564 { 565 outbound_bytes_ = 0; 566 queue_.clear(); 567 } 568 outbound_bytes() const569 size_t outbound_bytes() const { return outbound_bytes_; } 570 571 static const size_t max_outbound_bytes = (size_t(1) << 20); 572 private: 573 size_t outbound_bytes_; 574 queue_type queue_; 575 } output_; 576 577 std::vector<gu::byte_t> send_buf_; 578 uint32_t max_output_size_; 579 size_t mtu_; 580 bool use_aggregate_; 581 bool self_loopback_; 582 State state_; 583 int shift_to_rfcnt_; 584 bool pending_leave_; 585 gu::datetime::Date isolation_end_; 586 587 class DelayedEntry 588 { 589 public: 590 typedef enum 591 { 592 S_OK, 593 S_DELAYED 594 } State; DelayedEntry(const std::string & addr)595 DelayedEntry(const std::string& addr) 596 : 597 addr_ (addr), 598 tstamp_(gu::datetime::Date::monotonic()), 599 state_(S_DELAYED), 600 state_change_cnt_(1) 601 { } addr() const602 const std::string& addr() const { return addr_; } 603 set_tstamp(gu::datetime::Date tstamp)604 void set_tstamp(gu::datetime::Date tstamp) { tstamp_ = tstamp; } tstamp() const605 gu::datetime::Date tstamp() const { return tstamp_; } 606 set_state(State state,const gu::datetime::Period decay_period,const gu::datetime::Date now)607 void set_state(State state, 608 const gu::datetime::Period decay_period, 609 const gu::datetime::Date now) 610 { 611 if (state == S_DELAYED && state_ != state) 612 { 613 // Limit to 0xff, see DelayedList format in DelayedListMessage 614 // restricts this value to uint8_t max. 615 if (state_change_cnt_ < 0xff) 616 ++state_change_cnt_; 617 } 618 else if (state == S_OK && 619 tstamp_ + decay_period < now) 620 { 621 if (state_change_cnt_ > 0) 622 --state_change_cnt_; 623 } 624 state_ = state; 625 } state() const626 State state() const {return state_; } state_change_cnt() const627 size_t state_change_cnt() const { return state_change_cnt_; } 628 private: 629 const std::string addr_; 630 gu::datetime::Date tstamp_; 631 State state_; 632 size_t state_change_cnt_; 633 }; 634 635 typedef std::map<UUID, DelayedEntry> DelayedList; 636 DelayedList delayed_list_; 637 size_t auto_evict_; 638 639 // non-copyable 640 Proto(const Proto&); 641 void operator=(const Proto&); 642 }; 643 644 645 #endif // EVS_PROTO_HPP 646