1 /*
2  * Copyright (C) 2009-2019 Codership Oy <info@codership.com>
3  */
4 
5 #include "gmcast.hpp"
6 #include "gmcast_proto.hpp"
7 
8 #include "gcomm/common.hpp"
9 #include "gcomm/conf.hpp"
10 #include "gcomm/util.hpp"
11 #include "gcomm/map.hpp"
12 #include "defaults.hpp"
13 #include "gu_convert.hpp"
14 #include "gu_resolver.hpp"
15 #include "gu_asio.hpp" // gu::conf::use_ssl
16 
17 using namespace std::rel_ops;
18 
19 using gcomm::gmcast::Proto;
20 using gcomm::gmcast::ProtoMap;
21 using gcomm::gmcast::Link;
22 using gcomm::gmcast::LinkMap;
23 using gcomm::gmcast::Message;
24 
25 const long gcomm::GMCast::max_retry_cnt_(std::numeric_limits<int>::max());
26 
set_tcp_defaults(gu::URI * uri)27 static void set_tcp_defaults (gu::URI* uri)
28 {
29     // what happens if there is already this parameter?
30     uri->set_option(gcomm::Conf::TcpNonBlocking, gu::to_string(1));
31 }
32 
33 
check_tcp_uri(const gu::URI & uri)34 static bool check_tcp_uri(const gu::URI& uri)
35 {
36     return (uri.get_scheme() == gu::scheme::tcp ||
37             uri.get_scheme() == gu::scheme::ssl);
38 }
39 
get_scheme(bool use_ssl,bool dynamic_socket)40 static std::string get_scheme(bool use_ssl, bool dynamic_socket)
41 {
42     if (use_ssl == true && not dynamic_socket)
43     {
44         return gu::scheme::ssl;
45     }
46     return gu::scheme::tcp;
47 }
48 
49 //
50 // Check if the node should stay isolated.
51 // Possible outcomes:
52 // * Return false, node should continue reconnecting and accepting connections
53 //   (isolate = 0)
54 // * Return true, node should remain isolated (isolate = 1)
55 // * Throw fatal exception to terminate the backend (isolate = 2)
56 //
is_isolated(int isolate)57 static inline bool is_isolated(int isolate)
58 {
59     switch (isolate)
60     {
61     case 1:
62         return true;
63     case 2:
64         gu_throw_fatal<< "Gcomm backend termination was "
65                       << "requested by setting gmcast.isolate=2.";
66         break;
67     default:
68         break;
69     }
70     return false;
71 }
72 
GMCast(Protonet & net,const gu::URI & uri,const UUID * my_uuid)73 gcomm::GMCast::GMCast(Protonet& net, const gu::URI& uri,
74                       const UUID* my_uuid)
75     :
76     Transport     (net, uri),
77     version_(check_range(Conf::GMCastVersion,
78                          param<int>(conf_, uri, Conf::GMCastVersion, "0"),
79                          0, max_version_ + 1)),
80     segment_ (check_range(Conf::GMCastSegment,
81                           param<int>(conf_, uri, Conf::GMCastSegment, "0"),
82                           0, 255)),
83     my_uuid_      (my_uuid ? *my_uuid : UUID(0, 0)),
84     dynamic_socket_ (conf_.has(gu::conf::socket_dynamic) ?
85                      param<bool>(conf_, uri, gu::conf::socket_dynamic,
86                                  "false") :
87                      false),
88 #ifdef GALERA_HAVE_SSL
89     use_ssl_      (param<bool>(conf_, uri, gu::conf::use_ssl, "false")),
90 #else
91     use_ssl_(),
92 #endif // GALERA_HAVE_SSL
93     // @todo: technically group name should be in path component
94     group_name_   (param<std::string>(conf_, uri, Conf::GMCastGroup, "")),
95     listen_addr_  (
96         param<std::string>(
97             conf_, uri, Conf::GMCastListenAddr,
98             get_scheme(use_ssl_, dynamic_socket_) + "://0.0.0.0")), // how to make it IPv6 safe?
99     initial_addrs_(),
100     mcast_addr_   (param<std::string>(conf_, uri, Conf::GMCastMCastAddr, "")),
101     bind_ip_      (""),
102     mcast_ttl_    (check_range(
103                        Conf::GMCastMCastTTL,
104                        param<int>(conf_, uri, Conf::GMCastMCastTTL, "1"),
105                        1, 256)),
106     listener_     (),
107     mcast_        (),
108     pending_addrs_(),
109     remote_addrs_ (),
110     addr_blacklist_(),
111     relaying_     (false),
112     isolate_      (0),
113     prim_view_reached_(false),
114     proto_map_    (new ProtoMap()),
115     relay_set_    (),
116     segment_map_  (),
117     self_index_   (std::numeric_limits<size_t>::max()),
118     time_wait_    (param<gu::datetime::Period>(
119                        conf_, uri,
120                        Conf::GMCastTimeWait, Defaults::GMCastTimeWait)),
121     check_period_ ("PT0.5S"),
122     peer_timeout_ (param<gu::datetime::Period>(
123                        conf_, uri,
124                        Conf::GMCastPeerTimeout, Defaults::GMCastPeerTimeout)),
125     max_initial_reconnect_attempts_(
126         param<int>(conf_, uri,
127                    Conf::GMCastMaxInitialReconnectAttempts,
128                    gu::to_string(max_retry_cnt_))),
129     next_check_   (gu::datetime::Date::monotonic())
130 {
131     log_info << "GMCast version " << version_;
132 
133     if (group_name_ == "")
134     {
135         gu_throw_error (EINVAL) << "Group not defined in URL: "
136                                 << uri_.to_string();
137     }
138 
139     set_initial_addr(uri_);
140 
141     try
142     {
143         listen_addr_ = uri_.get_option (Conf::GMCastListenAddr);
144     }
145     catch (gu::NotFound&) {}
146 
147     try
148     {
149         gu::URI uri(listen_addr_); /* check validity of the address */
150     }
151     catch (gu::Exception&)
152     {
153         /* most probably no scheme, try to append one and see if it succeeds */
154         listen_addr_ = uri_string(get_scheme(use_ssl_, dynamic_socket_), listen_addr_);
155         gu_trace(gu::URI uri(listen_addr_));
156     }
157 
158     gu::URI listen_uri(listen_addr_);
159 
160     if (check_tcp_uri(listen_uri) == false)
161     {
162         gu_throw_error (EINVAL) << "listen addr '" << listen_addr_
163                                 << "' does not specify supported protocol";
164     }
165 
166     if (gu::net::resolve(listen_uri).get_addr().is_anyaddr() == false)
167     {
168         // bind outgoing connections to the same address as listening.
169         gu_trace(bind_ip_ = listen_uri.get_host());
170     }
171 
172     std::string port(Defaults::GMCastTcpPort);
173 
174     try
175     {
176         port = listen_uri.get_port();
177     }
178     catch (gu::NotSet&)
179     {
180         // if no listen port is set for listen address in the options,
181         // see if base port was configured
182         try
183         {
184             port = conf_.get(BASE_PORT_KEY);
185         }
186         catch (gu::NotSet&)
187         {
188             // if no base port configured, try port from the connection address
189             try { port = uri_.get_port(); } catch (gu::NotSet&) {}
190         }
191 
192         listen_addr_ += ":" + port;
193     }
194 
195     conf_.set(BASE_PORT_KEY, port);
196 
197     listen_addr_ = gu::net::resolve(listen_addr_).to_string();
198     // resolving sets scheme to tcp, have to rewrite for ssl
199     if (use_ssl_ == true && not dynamic_socket_)
200     {
201         listen_addr_.replace(0, 3, gu::scheme::ssl);
202     }
203     std::set<std::string>::iterator iaself(initial_addrs_.find(listen_addr_));
204     if (iaself != initial_addrs_.end())
205     {
206         log_debug << "removing own listen address '" << *iaself
207                   << "' from initial address list";
208         initial_addrs_.erase(iaself);
209     }
210 
211     if (mcast_addr_ != "")
212     {
213         try
214         {
215             port = param<std::string>(conf_, uri_, Conf::GMCastMCastPort, port);
216         }
217         catch (gu::NotFound&) {}
218 
219         mcast_addr_ = gu::net::resolve(
220             uri_string(gu::scheme::udp, mcast_addr_, port)).to_string();
221     }
222 
223     log_info << self_string() << " listening at " << listen_addr_;
224     log_info << self_string() << " multicast: " << mcast_addr_
225              << ", ttl: " << mcast_ttl_;
226 
227     conf_.set(Conf::GMCastListenAddr, listen_addr_);
228     conf_.set(Conf::GMCastMCastAddr, mcast_addr_);
229     conf_.set(Conf::GMCastVersion, gu::to_string(version_));
230     conf_.set(Conf::GMCastTimeWait, gu::to_string(time_wait_));
231     conf_.set(Conf::GMCastMCastTTL, gu::to_string(mcast_ttl_));
232     conf_.set(Conf::GMCastPeerTimeout, gu::to_string(peer_timeout_));
233     conf_.set(Conf::GMCastSegment, gu::to_string<int>(segment_));
234 }
235 
~GMCast()236 gcomm::GMCast::~GMCast()
237 {
238     if (listener_ != 0) close();
239 
240     delete proto_map_;
241 }
242 
set_initial_addr(const gu::URI & uri)243 void gcomm::GMCast::set_initial_addr(const gu::URI& uri)
244 {
245 
246     const gu::URI::AuthorityList& al(uri.get_authority_list());
247 
248     for (gu::URI::AuthorityList::const_iterator i(al.begin());
249          i != al.end(); ++i)
250     {
251         std::string host;
252         try
253         {
254             host = i->host();
255         }
256         catch (gu::NotSet& ns)
257         {
258             gu_throw_error(EINVAL) << "Unset host in URL " << uri;
259         }
260 
261         if (host_is_any(host)) continue;
262 
263         std::string port;
264         try
265         {
266             port = i->port();
267         }
268         catch (gu::NotSet&)
269         {
270             try
271             {
272                 port = conf_.get(BASE_PORT_KEY);
273             }
274             catch (gu::NotFound&)
275             {
276                 port = Defaults::GMCastTcpPort;
277             }
278             catch (gu::NotSet&)
279             {
280                 port = Defaults::GMCastTcpPort;
281             }
282         }
283 
284         std::string initial_uri = uri_string(get_scheme(use_ssl_, dynamic_socket_), host, port);
285         std::string initial_addr;
286         try
287         {
288             initial_addr = gu::net::resolve(initial_uri).to_string();
289         }
290         catch (gu::Exception& )
291         {
292             log_warn << "Failed to resolve " << initial_uri;
293             continue;
294         }
295         // resolving sets scheme to tcp, have to rewrite for ssl
296         if (use_ssl_ == true && not dynamic_socket_)
297         {
298             initial_addr.replace(0, 3, gu::scheme::ssl);
299         }
300 
301         if (check_tcp_uri(initial_addr) == false)
302         {
303             gu_throw_error (EINVAL) << "initial addr '" << initial_addr
304                                     << "' is not valid";
305         }
306 
307         log_debug << self_string() << " initial addr: " << initial_addr;
308         initial_addrs_.insert(initial_addr);
309 
310     }
311 
312 }
313 
connect_precheck(bool start_prim)314 void gcomm::GMCast::connect_precheck(bool start_prim)
315 {
316     if (!start_prim && initial_addrs_.empty()) {
317         gu_throw_fatal << "No address to connect";
318     }
319 }
320 
connect()321 void gcomm::GMCast::connect()
322 {
323     pstack_.push_proto(this);
324     log_debug << "gmcast " << uuid() << " connect";
325 
326     gu::URI listen_uri(listen_addr_);
327 
328     set_tcp_defaults (&listen_uri);
329 
330     listener_ = pnet().acceptor(listen_uri);
331     gu_trace (listener_->listen(listen_uri));
332 
333     if (!mcast_addr_.empty())
334     {
335         gu::URI mcast_uri(
336             mcast_addr_ + '?'
337             + gcomm::Socket::OptIfAddr + '='
338             + gu::URI(listen_addr_).get_host()+'&'
339             + gcomm::Socket::OptNonBlocking + "=1&"
340             + gcomm::Socket::OptMcastTTL    + '=' + gu::to_string(mcast_ttl_)
341             );
342 
343         mcast_ = pnet().socket(mcast_uri);
344         gu_trace(mcast_->connect(mcast_uri));
345     }
346 
347     if (!initial_addrs_.empty())
348     {
349         for (std::set<std::string>::const_iterator i(initial_addrs_.begin());
350              i != initial_addrs_.end(); ++i)
351         {
352             insert_address(*i, UUID(), pending_addrs_);
353             AddrList::iterator ai(pending_addrs_.find(*i));
354             AddrList::value(ai).set_max_retries(max_retry_cnt_);
355             gu_trace (gmcast_connect(*i));
356         }
357     }
358 }
359 
360 
connect(const gu::URI & uri)361 void gcomm::GMCast::connect(const gu::URI& uri)
362 {
363     set_initial_addr(uri);
364     connect();
365 }
366 
367 
368 
close(bool force)369 void gcomm::GMCast::close(bool force)
370 {
371     log_debug << "gmcast " << uuid() << " close";
372     pstack_.pop_proto(this);
373     if (mcast_)
374     {
375         mcast_->close();
376         // delete mcast;
377         // mcast = 0;
378     }
379 
380     gcomm_assert(listener_ != 0);
381     listener_->close();
382     listener_.reset();
383 
384     segment_map_.clear();
385     for (ProtoMap::iterator
386              i = proto_map_->begin(); i != proto_map_->end(); ++i)
387     {
388         delete ProtoMap::value(i);
389     }
390 
391     proto_map_->clear();
392     pending_addrs_.clear();
393     remote_addrs_.clear();
394     prim_view_reached_ = false;
395 }
396 
397 //
398 // Private
399 //
400 
401 
402 // Find other local endpoint matching to proto
403 static const Proto*
find_other_local_endpoint(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::gmcast::Proto * proto)404 find_other_local_endpoint(const gcomm::gmcast::ProtoMap& proto_map,
405                           const gcomm::gmcast::Proto* proto)
406 {
407     for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
408          i != proto_map.end(); ++i)
409     {
410         if (i->second != proto &&
411             i->second->handshake_uuid() == proto->handshake_uuid())
412         {
413             return i->second;
414         }
415     }
416     return 0;
417 }
418 
419 // Find other endpoint with same remote UUID
420 static const Proto*
find_other_endpoint_same_remote_uuid(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::gmcast::Proto * proto)421 find_other_endpoint_same_remote_uuid(const gcomm::gmcast::ProtoMap& proto_map,
422                                      const gcomm::gmcast::Proto* proto)
423 {
424     for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
425          i != proto_map.end(); ++i)
426     {
427         if (i->second != proto &&
428             i->second->remote_uuid() == proto->remote_uuid())
429         {
430             return i->second;
431         }
432     }
433     return 0;
434 }
435 
is_own(const gmcast::Proto * proto) const436 bool gcomm::GMCast::is_own(const gmcast::Proto* proto) const
437 {
438     assert(proto->remote_uuid() != gcomm::UUID::nil());
439     if (proto->remote_uuid() != uuid())
440     {
441         return false;
442     }
443     return find_other_local_endpoint(*proto_map_, proto);
444 }
445 
blacklist(const gmcast::Proto * proto)446 void gcomm::GMCast::blacklist(const gmcast::Proto* proto)
447 {
448     initial_addrs_.erase(proto->remote_addr());
449     pending_addrs_.erase(proto->remote_addr());
450     addr_blacklist_.insert(std::make_pair(
451                                proto->remote_addr(),
452                                AddrEntry(gu::datetime::Date::monotonic(),
453                                          gu::datetime::Date::monotonic(),
454                                          proto->remote_uuid())));
455 }
456 
is_not_own_and_duplicate_exists(const Proto * proto) const457 bool gcomm::GMCast::is_not_own_and_duplicate_exists(
458     const Proto* proto) const
459 {
460     assert(proto->remote_uuid() != gcomm::UUID::nil());
461     const Proto* other(find_other_local_endpoint(*proto_map_, proto));
462     if (!other)
463     {
464         // Not own
465         // Check if remote UUID matches to self
466         if (proto->remote_uuid() == uuid())
467         {
468             return true;
469         }
470         // Check if other proto entry with same remote
471         // UUID but different remote address exists.
472         other = find_other_endpoint_same_remote_uuid(*proto_map_, proto);
473         if (other && other->remote_addr() != proto->remote_addr())
474         {
475             return true;
476         }
477     }
478     return false;
479 }
480 
481 // Erase proto entry in safe manner
482 // 1) Erase from relay_set_
483 // 2) Erase from proto_map_
484 // 3) Delete proto entry
erase_proto(gmcast::ProtoMap::iterator i)485 void gcomm::GMCast::erase_proto(gmcast::ProtoMap::iterator i)
486 {
487     Proto* p(ProtoMap::value(i));
488     RelayEntry e(p, p->socket().get());
489     RelaySet::iterator si(relay_set_.find(e));
490     if (si != relay_set_.end())
491     {
492         relay_set_.erase(si);
493     }
494     proto_map_->erase(i);
495     delete p;
496 }
497 
gmcast_accept()498 void gcomm::GMCast::gmcast_accept()
499 {
500     SocketPtr tp;
501 
502     try
503     {
504         tp = listener_->accept();
505     }
506     catch (gu::Exception& e)
507     {
508         log_warn << e.what();
509         return;
510     }
511 
512     if (is_isolated(isolate_))
513     {
514         log_debug << "dropping accepted socket due to isolation";
515         tp->close();
516         return;
517     }
518 
519     Proto* peer = new Proto (
520         *this,
521         version_,
522         tp,
523         listener_->listen_addr(),
524         "",
525         mcast_addr_,
526         segment_,
527         group_name_);
528     std::pair<ProtoMap::iterator, bool> ret =
529         proto_map_->insert(std::make_pair(tp->id(), peer));
530 
531     if (ret.second == false)
532     {
533         delete peer;
534         gu_throw_fatal << "Failed to add peer to map";
535     }
536     if (tp->state() == Socket::S_CONNECTED)
537     {
538         peer->send_handshake();
539     }
540     else
541     {
542         log_debug << "accepted socket is connecting";
543     }
544     log_debug << "handshake sent";
545 }
546 
547 
gmcast_connect(const std::string & remote_addr)548 void gcomm::GMCast::gmcast_connect(const std::string& remote_addr)
549 {
550     if (remote_addr == listen_addr_) return;
551 
552     gu::URI connect_uri(remote_addr);
553 
554     set_tcp_defaults (&connect_uri);
555 
556     if (!bind_ip_.empty())
557     {
558         connect_uri.set_option(gcomm::Socket::OptIfAddr, bind_ip_);
559     }
560 
561     SocketPtr tp = pnet().socket(connect_uri);
562 
563     try
564     {
565         tp->connect(connect_uri);
566     }
567     catch (gu::Exception& e)
568     {
569         log_debug << "Connect failed: " << e.what();
570         // delete tp;
571         return;
572     }
573 
574     Proto* peer = new Proto (
575         *this,
576         version_,
577         tp,
578         listener_->listen_addr(),
579         remote_addr,
580         mcast_addr_,
581         segment_,
582         group_name_);
583 
584     std::pair<ProtoMap::iterator, bool> ret =
585         proto_map_->insert(std::make_pair(tp->id(), peer));
586 
587     if (ret.second == false)
588     {
589         delete peer;
590         gu_throw_fatal << "Failed to add peer to map";
591     }
592 
593     ret.first->second->wait_handshake();
594 }
595 
596 
gmcast_forget(const UUID & uuid,const gu::datetime::Period & wait_period)597 void gcomm::GMCast::gmcast_forget(const UUID& uuid,
598                                   const gu::datetime::Period& wait_period)
599 {
600     /* Close all proto entries corresponding to uuid */
601 
602     ProtoMap::iterator pi, pi_next;
603     for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
604     {
605         pi_next = pi, ++pi_next;
606         Proto* rp = ProtoMap::value(pi);
607         if (rp->remote_uuid() == uuid)
608         {
609             erase_proto(pi);
610         }
611     }
612 
613     /* Set all corresponding entries in address list to have retry cnt
614      * greater than max retries and next reconnect time after some period */
615     AddrList::iterator ai;
616     for (ai = remote_addrs_.begin(); ai != remote_addrs_.end(); ++ai)
617     {
618         AddrEntry& ae(AddrList::value(ai));
619         if (ae.uuid() == uuid)
620         {
621             log_info << "forgetting " << uuid
622                      << " (" << AddrList::key(ai) << ")";
623 
624             ProtoMap::iterator pi, pi_next;
625             for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
626             {
627                 pi_next = pi, ++pi_next;
628                 if (ProtoMap::value(pi)->remote_addr() == AddrList::key(ai))
629                 {
630                     log_info << "deleting entry " << AddrList::key(ai);
631                     erase_proto(pi);
632                 }
633             }
634             ae.set_max_retries(0);
635             ae.set_retry_cnt(1);
636             gu::datetime::Date now(gu::datetime::Date::monotonic());
637             // Don't reduce next reconnect time if it is set greater than
638             // requested
639             if ((now + wait_period > ae.next_reconnect()) ||
640                 (ae.next_reconnect() == gu::datetime::Date::max()))
641             {
642                 ae.set_next_reconnect(gu::datetime::Date::monotonic() + wait_period);
643             }
644             else
645             {
646                 log_debug << "not decreasing next reconnect for " << uuid;
647             }
648         }
649     }
650 
651     /* Update state */
652     update_addresses();
653 }
654 
handle_connected(Proto * rp)655 void gcomm::GMCast::handle_connected(Proto* rp)
656 {
657     const SocketPtr tp(rp->socket());
658     assert(tp->state() == Socket::S_CONNECTED);
659     log_debug << "transport " << tp << " connected";
660     if (rp->state() == Proto::S_INIT)
661     {
662         log_debug << "sending handshake";
663         // accepted socket was waiting for underlying transport
664         // handshake to finish
665         rp->send_handshake();
666     }
667 }
668 
handle_established(Proto * est)669 void gcomm::GMCast::handle_established(Proto* est)
670 {
671     log_info << self_string() << " connection established to "
672              << est->remote_uuid() << " "
673              << est->remote_addr();
674     // UUID checks are handled during protocol handshake
675     assert(est->remote_uuid() != uuid());
676 
677     if (is_evicted(est->remote_uuid()))
678     {
679         log_warn << "Closing connection to evicted node " << est->remote_uuid();
680         erase_proto(proto_map_->find_checked(est->socket()->id()));
681         update_addresses();
682         return;
683     }
684 
685     // If address is found from pending_addrs_, move it to remote_addrs list
686     // and set retry cnt to -1
687     const std::string& remote_addr(est->remote_addr());
688     AddrList::iterator i(pending_addrs_.find(remote_addr));
689 
690     if (i != pending_addrs_.end())
691     {
692         log_debug << "Erasing " << remote_addr << " from panding list";
693         pending_addrs_.erase(i);
694     }
695 
696     if ((i = remote_addrs_.find(remote_addr)) == remote_addrs_.end())
697     {
698         log_debug << "Inserting " << remote_addr << " to remote list";
699 
700         insert_address (remote_addr, est->remote_uuid(), remote_addrs_);
701         i = remote_addrs_.find(remote_addr);
702     }
703     else if (AddrList::value(i).uuid() != est->remote_uuid())
704     {
705         log_info << "remote endpoint " << est->remote_addr()
706                  << " changed identity " << AddrList::value(i).uuid().full_str()
707                  << " -> " << est->remote_uuid().full_str();
708         remote_addrs_.erase(i);
709         i = remote_addrs_.insert_unique(
710             make_pair(est->remote_addr(),
711                       AddrEntry(gu::datetime::Date::monotonic(),
712                                 gu::datetime::Date::monotonic(),
713                                 est->remote_uuid())));
714     }
715 
716     if (AddrList::value(i).retry_cnt() >
717         AddrList::value(i).max_retries())
718     {
719         log_warn << "discarding established (time wait) "
720                  << est->remote_uuid()
721                  << " (" << est->remote_addr() << ") ";
722         erase_proto(proto_map_->find(est->socket()->id()));
723         update_addresses();
724         return;
725     }
726 
727     // send_up(Datagram(), p->remote_uuid());
728 
729     // init retry cnt to -1 to avoid unnecessary logging at first attempt
730     // max retries will be readjusted in handle stable view
731     AddrList::value(i).set_retry_cnt(-1);
732     AddrList::value(i).set_max_retries(max_initial_reconnect_attempts_);
733 
734     // Cleanup all previously established entries with same
735     // remote uuid. It is assumed that the most recent connection
736     // is usually the healthiest one.
737     ProtoMap::iterator j, j_next;
738     for (j = proto_map_->begin(); j != proto_map_->end(); j = j_next)
739     {
740         j_next = j, ++j_next;
741 
742         Proto* p(ProtoMap::value(j));
743 
744         if (p->remote_uuid() == est->remote_uuid())
745         {
746             if (p->handshake_uuid() < est->handshake_uuid())
747             {
748                 log_debug << self_string()
749                           << " cleaning up duplicate "
750                           << p->socket()
751                           << " after established "
752                           << est->socket();
753                 erase_proto(j);
754             }
755             else if (p->handshake_uuid() > est->handshake_uuid())
756             {
757                 log_debug << self_string()
758                           << " cleaning up established "
759                           << est->socket()
760                           << " which is duplicate of "
761                           << p->socket();
762                 erase_proto(proto_map_->find_checked(est->socket()->id()));
763                 update_addresses();
764                 return;
765             }
766             else
767             {
768                 assert(p == est);
769             }
770         }
771     }
772 
773     AddrList::iterator ali(find_if(remote_addrs_.begin(),
774                                    remote_addrs_.end(),
775                                    AddrListUUIDCmp(est->remote_uuid())));
776     if (ali != remote_addrs_.end())
777     {
778         AddrList::value(ali).set_last_connect();
779     }
780     else
781     {
782         log_warn << "peer " << est->remote_addr()
783                  << " not found from remote addresses";
784     }
785 
786     update_addresses();
787 }
788 
handle_failed(Proto * failed)789 void gcomm::GMCast::handle_failed(Proto* failed)
790 {
791     log_debug << "handle failed: " << *failed;
792     const std::string& remote_addr = failed->remote_addr();
793 
794     bool found_ok(false);
795     for (ProtoMap::const_iterator i = proto_map_->begin();
796          i != proto_map_->end(); ++i)
797     {
798         Proto* p(ProtoMap::value(i));
799         if (p                    != failed      &&
800             p->state()       <= Proto::S_OK &&
801             p->remote_addr() == failed->remote_addr())
802         {
803             log_debug << "found live " << *p;
804             found_ok = true;
805             break;
806         }
807     }
808 
809     if (found_ok == false && remote_addr != "")
810     {
811         AddrList::iterator i;
812 
813         if ((i = pending_addrs_.find(remote_addr)) != pending_addrs_.end() ||
814             (i = remote_addrs_.find(remote_addr))  != remote_addrs_.end())
815         {
816             AddrEntry& ae(AddrList::value(i));
817             ae.set_retry_cnt(ae.retry_cnt() + 1);
818 
819             gu::datetime::Date rtime = gu::datetime::Date::monotonic() + gu::datetime::Period("PT1S");
820             log_debug << self_string()
821                       << " setting next reconnect time to "
822                       << rtime << " for " << remote_addr;
823             ae.set_next_reconnect(rtime);
824         }
825     }
826 
827     erase_proto(proto_map_->find_checked(failed->socket()->id()));
828     update_addresses();
829 }
830 
is_connected(const std::string & addr,const UUID & uuid) const831 bool gcomm::GMCast::is_connected(const std::string& addr, const UUID& uuid) const
832 {
833     for (ProtoMap::const_iterator i = proto_map_->begin();
834          i != proto_map_->end(); ++i)
835     {
836         Proto* conn = ProtoMap::value(i);
837 
838         if (addr == conn->remote_addr() ||
839             uuid == conn->remote_uuid())
840         {
841             return true;
842         }
843     }
844 
845     return false;
846 }
847 
848 
insert_address(const std::string & addr,const UUID & uuid,AddrList & alist)849 void gcomm::GMCast::insert_address (const std::string& addr,
850                              const UUID&   uuid,
851                              AddrList&     alist)
852 {
853     if (addr == listen_addr_)
854     {
855         gu_throw_fatal << "Trying to add self addr " << addr << " to addr list";
856     }
857 
858     if (alist.insert(make_pair(addr,
859                                AddrEntry(gu::datetime::Date::monotonic(),
860                                          gu::datetime::Date::monotonic(), uuid))).second == false)
861     {
862         log_warn << "Duplicate entry: " << addr;
863     }
864     else
865     {
866         log_debug << self_string() << ": new address entry " << uuid << ' '
867                   << addr;
868     }
869 }
870 
871 
update_addresses()872 void gcomm::GMCast::update_addresses()
873 {
874     LinkMap link_map;
875     std::set<UUID> uuids;
876     /* Add all established connections into uuid_map and update
877      * list of remote addresses */
878 
879     ProtoMap::iterator i, i_next;
880     for (i = proto_map_->begin(); i != proto_map_->end(); i = i_next)
881     {
882         i_next = i, ++i_next;
883 
884         Proto* rp = ProtoMap::value(i);
885 
886         if (rp->state() == Proto::S_OK)
887         {
888             if (rp->remote_addr() == "" ||
889                 rp->remote_uuid() == UUID::nil())
890             {
891                 gu_throw_fatal << "Protocol error: local: (" << my_uuid_
892                                << ", '" << listen_addr_
893                                << "'), remote: (" << rp->remote_uuid()
894                                << ", '" << rp->remote_addr() << "')";
895             }
896 
897             if (remote_addrs_.find(rp->remote_addr()) == remote_addrs_.end())
898             {
899                 log_warn << "Connection exists but no addr on addr list for "
900                          << rp->remote_addr();
901                 insert_address(rp->remote_addr(), rp->remote_uuid(),
902                                remote_addrs_);
903             }
904 
905             if (uuids.insert(rp->remote_uuid()).second == false)
906             {
907                 // Duplicate entry, drop this one
908                 // @todo Deeper inspection about the connection states
909                 log_debug << self_string() << " dropping duplicate entry";
910                 erase_proto(i);
911             }
912             else
913             {
914                 link_map.insert(Link(rp->remote_uuid(),
915                                      rp->remote_addr(),
916                                      rp->mcast_addr()));
917             }
918         }
919     }
920 
921     /* Send topology change message containing only established
922      * connections */
923     for (ProtoMap::iterator i = proto_map_->begin(); i != proto_map_->end(); ++i)
924     {
925         Proto* gp = ProtoMap::value(i);
926 
927         // @todo: a lot of stuff here is done for each connection, including
928         //        message creation and serialization. Need a mcast_msg() call
929         //        and move this loop in there.
930         if (gp->state() == Proto::S_OK)
931             gp->send_topology_change(link_map);
932     }
933 
934     /* Add entries reported by all other nodes to address list to
935      * get complete view of existing uuids/addresses */
936     for (ProtoMap::iterator i = proto_map_->begin(); i != proto_map_->end(); ++i)
937     {
938         Proto* rp = ProtoMap::value(i);
939 
940         if (rp->state() == Proto::S_OK)
941         {
942             for (LinkMap::const_iterator j = rp->link_map().begin();
943                  j != rp->link_map().end(); ++j)
944             {
945                 const UUID& link_uuid(LinkMap::key(j));
946                 const std::string& link_addr(LinkMap::value(j).addr());
947                 gcomm_assert(link_uuid != UUID::nil() && link_addr != "");
948 
949                 if (addr_blacklist_.find(link_addr) != addr_blacklist_.end())
950                 {
951                     log_debug << self_string()
952                               << " address '" << link_addr
953                               << "' pointing to uuid " << link_uuid
954                               << " is blacklisted, skipping";
955                     continue;
956                 }
957 
958                 if (link_uuid                     != uuid()         &&
959                     remote_addrs_.find(link_addr)  == remote_addrs_.end() &&
960                     pending_addrs_.find(link_addr) == pending_addrs_.end())
961                 {
962                     log_debug << self_string()
963                               << " conn refers to but no addr in addr list for "
964                               << link_addr;
965                     insert_address(link_addr, link_uuid, remote_addrs_);
966 
967                     AddrList::iterator pi(remote_addrs_.find(link_addr));
968 
969                     assert(pi != remote_addrs_.end());
970 
971                     AddrEntry& ae(AddrList::value(pi));
972 
973                     // init retry cnt to -1 to avoid unnecessary logging
974                     // at first attempt
975                     // max retries will be readjusted in handle stable view
976                     ae.set_retry_cnt(-1);
977                     ae.set_max_retries(max_initial_reconnect_attempts_);
978 
979                     // Add some randomness for first reconnect to avoid
980                     // simultaneous connects
981                     gu::datetime::Date rtime(gu::datetime::Date::monotonic());
982 
983                     rtime = rtime + ::rand() % (100*gu::datetime::MSec);
984                     ae.set_next_reconnect(rtime);
985                     next_check_ = std::min(next_check_, rtime);
986                 }
987             }
988         }
989     }
990 
991     // Build multicast tree
992     log_debug << self_string() << " --- mcast tree begin ---";
993     segment_map_.clear();
994 
995     Segment& local_segment(segment_map_[segment_]);
996 
997     if (mcast_)
998     {
999         log_debug << mcast_addr_;
1000         local_segment.push_back(RelayEntry(0, mcast_.get()));
1001     }
1002 
1003     self_index_ = 0;
1004     for (ProtoMap::const_iterator i(proto_map_->begin()); i != proto_map_->end();
1005          ++i)
1006     {
1007         Proto* p(ProtoMap::value(i));
1008 
1009         log_debug << "Proto: " << *p;
1010 
1011         if (p->remote_segment() == segment_)
1012         {
1013             if (p->state() == Proto::S_OK &&
1014                 (p->mcast_addr() == "" ||
1015                  p->mcast_addr() != mcast_addr_))
1016             {
1017                 local_segment.push_back(RelayEntry(p, p->socket().get()));
1018                 if (p->remote_uuid() < uuid())
1019                 {
1020                     ++self_index_;
1021                 }
1022             }
1023         }
1024         else
1025         {
1026             if (p->state() == Proto::S_OK)
1027             {
1028                 Segment& remote_segment(segment_map_[p->remote_segment()]);
1029                 remote_segment.push_back(RelayEntry(p, p->socket().get()));
1030             }
1031         }
1032     }
1033     log_debug << self_string() << " self index: " << self_index_;
1034     log_debug << self_string() << " --- mcast tree end ---";
1035 }
1036 
1037 
reconnect()1038 void gcomm::GMCast::reconnect()
1039 {
1040     if (is_isolated(isolate_))
1041     {
1042         log_debug << "skipping reconnect due to isolation";
1043         return;
1044     }
1045 
1046     /* Loop over known remote addresses and connect if proto entry
1047      * does not exist */
1048     gu::datetime::Date now = gu::datetime::Date::monotonic();
1049     AddrList::iterator i, i_next;
1050 
1051     for (i = pending_addrs_.begin(); i != pending_addrs_.end(); i = i_next)
1052     {
1053         i_next = i, ++i_next;
1054 
1055         const std::string& pending_addr(AddrList::key(i));
1056         const AddrEntry& ae(AddrList::value(i));
1057 
1058         if (is_connected (pending_addr, UUID::nil()) == false &&
1059             ae.next_reconnect()                  <= now)
1060         {
1061             if (ae.retry_cnt() > ae.max_retries())
1062             {
1063                 log_info << "cleaning up pending addr " << pending_addr;
1064                 pending_addrs_.erase(i);
1065                 continue; // no reference to pending_addr after this
1066             }
1067             else if (ae.next_reconnect() <= now)
1068             {
1069                 log_debug << "connecting to pending " << pending_addr;
1070                 gmcast_connect (pending_addr);
1071             }
1072         }
1073     }
1074 
1075     for (i = remote_addrs_.begin(); i != remote_addrs_.end(); i = i_next)
1076     {
1077         i_next = i, ++i_next;
1078 
1079         const std::string& remote_addr(AddrList::key(i));
1080         const AddrEntry& ae(AddrList::value(i));
1081         const UUID& remote_uuid(ae.uuid());
1082 
1083         gcomm_assert(remote_uuid != uuid());
1084 
1085         if (is_connected(remote_addr, remote_uuid) == false &&
1086             ae.next_reconnect()                <= now)
1087         {
1088             if (ae.retry_cnt() > ae.max_retries())
1089             {
1090                 log_info << " cleaning up " << remote_uuid << " ("
1091                          << remote_addr << ")";
1092                 remote_addrs_.erase(i);
1093                 continue;//no reference to remote_addr or remote_uuid after this
1094             }
1095             else if (ae.next_reconnect() <= now)
1096             {
1097                 if (ae.retry_cnt() % 30 == 0)
1098                 {
1099                     log_info << self_string() << " reconnecting to "
1100                              << remote_uuid << " (" << remote_addr
1101                              << "), attempt " << ae.retry_cnt();
1102                 }
1103 
1104                 gmcast_connect(remote_addr);
1105             }
1106             else
1107             {
1108                 //
1109             }
1110         }
1111     }
1112 }
1113 
1114 namespace
1115 {
1116     class CmpUuidCounts
1117     {
1118     public:
CmpUuidCounts(const std::set<gcomm::UUID> & uuids,gcomm::SegmentId preferred_segment)1119         CmpUuidCounts(const std::set<gcomm::UUID>& uuids,
1120                       gcomm::SegmentId preferred_segment)
1121             :
1122             uuids_(uuids),
1123             preferred_segment_(preferred_segment)
1124         { }
1125 
count(const gcomm::gmcast::Proto * p) const1126         size_t count(const gcomm::gmcast::Proto* p) const
1127         {
1128             size_t cnt(0);
1129             for (std::set<gcomm::UUID>::const_iterator i(uuids_.begin());
1130                  i != uuids_.end(); ++i)
1131             {
1132                 for (gcomm::gmcast::LinkMap::const_iterator
1133                          lm_i(p->link_map().begin());
1134                      lm_i != p->link_map().end(); ++lm_i)
1135                 {
1136                     if (lm_i->uuid() == *i)
1137                     {
1138                         ++cnt;
1139                         break;
1140                     }
1141                 }
1142             }
1143             return cnt;
1144         }
operator ()(const gcomm::gmcast::Proto * a,const gcomm::gmcast::Proto * b) const1145         bool operator()(const gcomm::gmcast::Proto* a,
1146                         const gcomm::gmcast::Proto* b) const
1147         {
1148             size_t ac(count(a));
1149             size_t bc(count(b));
1150             // if counts are equal, prefer peer from the same segment
1151             return (ac < bc ||
1152                     (ac == bc && a->remote_segment() != preferred_segment_));
1153         }
1154 
1155     private:
1156         const std::set<gcomm::UUID>& uuids_;
1157         gcomm::SegmentId preferred_segment_;
1158     };
1159 }
1160 
1161 
check_liveness()1162 void gcomm::GMCast::check_liveness()
1163 {
1164     std::set<UUID> live_uuids;
1165 
1166     // iterate over proto map and mark all timed out entries as failed
1167     gu::datetime::Date now(gu::datetime::Date::monotonic());
1168     for (ProtoMap::iterator i(proto_map_->begin()); i != proto_map_->end(); )
1169     {
1170         // Store next iterator into temporary, handle_failed() may remove
1171         // the entry proto_map_.
1172         ProtoMap::iterator i_next(i);
1173         ++i_next;
1174         Proto* p(ProtoMap::value(i));
1175         if (p->state() > Proto::S_INIT &&
1176             p->state() < Proto::S_FAILED &&
1177             p->recv_tstamp() + peer_timeout_ < now)
1178         {
1179             gcomm::SocketStats stats(p->socket()->stats());
1180             log_info << self_string()
1181                      << " connection to peer "
1182                      << p->remote_uuid() << " with addr "
1183                      << p->remote_addr()
1184                      << " timed out, no messages seen in " << peer_timeout_
1185                      << ", socket stats: "
1186                      << stats;
1187             p->set_state(Proto::S_FAILED);
1188             handle_failed(p);
1189         }
1190         else if (p->state() == Proto::S_OK)
1191         {
1192             gcomm::SocketStats stats(p->socket()->stats());
1193             if (stats.send_queue_length >= 1024)
1194             {
1195                 log_debug << self_string()
1196                           << " socket send queue to "
1197                           << " peer "
1198                           << p->remote_uuid() << " with addr "
1199                           << p->remote_addr()
1200                           << ", socket stats: "
1201                           << stats;
1202             }
1203             if ((p->recv_tstamp() + peer_timeout_*2/3 < now) ||
1204                 (p->send_tstamp() + peer_timeout_*1/3 < now))
1205             {
1206                 p->send_keepalive();
1207             }
1208 
1209             if (p->state() == Proto::S_FAILED)
1210             {
1211                 handle_failed(p);
1212             }
1213             else
1214             {
1215                 live_uuids.insert(p->remote_uuid());
1216             }
1217         }
1218         i = i_next;
1219     }
1220 
1221     bool should_relay(false);
1222 
1223     // iterate over addr list and check if there is at least one live
1224     // proto entry associated to each addr entry
1225 
1226     std::set<UUID> nonlive_uuids;
1227     std::string nonlive_peers;
1228     for (AddrList::const_iterator i(remote_addrs_.begin());
1229          i != remote_addrs_.end(); ++i)
1230     {
1231         const AddrEntry& ae(AddrList::value(i));
1232         if (ae.retry_cnt()             <= ae.max_retries() &&
1233             live_uuids.find(ae.uuid()) == live_uuids.end())
1234         {
1235             // log_info << self_string()
1236             // << " missing live proto entry for " << ae.uuid();
1237             nonlive_uuids.insert(ae.uuid());
1238             nonlive_peers += AddrList::key(i) + " ";
1239             should_relay = true;
1240         }
1241         else if (ae.last_connect() + peer_timeout_ > now)
1242         {
1243             log_debug << "continuing relaying for "
1244                       << (ae.last_connect() + peer_timeout_ - now);
1245             should_relay = true;
1246         }
1247     }
1248 
1249     if (should_relay == true)
1250     {
1251         if (relaying_ == false)
1252         {
1253             log_info << self_string()
1254                      << " turning message relay requesting on, nonlive peers: "
1255                      << nonlive_peers;
1256             relaying_ = true;
1257         }
1258         relay_set_.clear();
1259         // build set of protos having OK status
1260         std::set<Proto*> proto_set;
1261         for (ProtoMap::iterator i(proto_map_->begin()); i != proto_map_->end();
1262              ++i)
1263         {
1264             Proto* p(ProtoMap::value(i));
1265             if (p->state() == Proto::S_OK)
1266             {
1267                 proto_set.insert(p);
1268             }
1269         }
1270         // find minimal set of proto entries required to reach maximum set
1271         // of nonlive peers
1272         while (nonlive_uuids.empty() == false &&
1273                proto_set.empty() == false)
1274         {
1275             std::set<Proto*>::iterator maxel(
1276                 std::max_element(proto_set.begin(),
1277                                  proto_set.end(), CmpUuidCounts(nonlive_uuids, segment_)));
1278             Proto* p(*maxel);
1279             log_debug << "relay set maxel :" << *p << " count: "
1280                       << CmpUuidCounts(nonlive_uuids, segment_).count(p);
1281 
1282             relay_set_.insert(RelayEntry(p, p->socket().get()));
1283             const LinkMap& lm(p->link_map());
1284             for (LinkMap::const_iterator lm_i(lm.begin()); lm_i != lm.end();
1285                  ++lm_i)
1286             {
1287                 nonlive_uuids.erase((*lm_i).uuid());
1288             }
1289             proto_set.erase(maxel);
1290         }
1291     }
1292     else if (relaying_ == true && should_relay == false)
1293     {
1294         log_info << self_string() << " turning message relay requesting off";
1295         relay_set_.clear();
1296         relaying_ = false;
1297     }
1298 }
1299 
1300 
handle_timers()1301 gu::datetime::Date gcomm::GMCast::handle_timers()
1302 {
1303     const gu::datetime::Date now(gu::datetime::Date::monotonic());
1304 
1305     if (now >= next_check_)
1306     {
1307         check_liveness();
1308         reconnect();
1309         next_check_ = now + check_period_;
1310     }
1311 
1312     return next_check_;
1313 }
1314 
1315 
send(const RelayEntry & re,int segment,gcomm::Datagram & dg)1316 void gcomm::GMCast::send(const RelayEntry& re, int segment, gcomm::Datagram& dg)
1317 {
1318     int err;
1319     if ((err = re.socket->send(segment, dg)) != 0)
1320     {
1321         log_debug << "failed to send to " << re.socket->remote_addr()
1322                   << ": (" << err << ") " << strerror(err);
1323     }
1324     else if (re.proto)
1325     {
1326         re.proto->set_send_tstamp(gu::datetime::Date::monotonic());
1327     }
1328 }
1329 
relay(const Message & msg,const Datagram & dg,const void * exclude_id)1330 void gcomm::GMCast::relay(const Message& msg,
1331                           const Datagram& dg,
1332                           const void* exclude_id)
1333 {
1334     Datagram relay_dg(dg);
1335     relay_dg.normalize();
1336     Message relay_msg(msg);
1337 
1338     // reset all relay flags from message to be relayed
1339     relay_msg.set_flags(relay_msg.flags() &
1340                         ~(Message::F_RELAY | Message::F_SEGMENT_RELAY));
1341 
1342     // if F_RELAY is set in received message, relay to all peers except
1343     // the originator
1344     if (msg.flags() & Message::F_RELAY)
1345     {
1346         gu_trace(push_header(relay_msg, relay_dg));
1347         for (SegmentMap::iterator segment_i(segment_map_.begin());
1348              segment_i != segment_map_.end(); ++segment_i)
1349         {
1350             Segment& segment(segment_i->second);
1351             for (Segment::iterator target_i(segment.begin());
1352                  target_i != segment.end(); ++target_i)
1353             {
1354                 if ((*target_i).socket->id() != exclude_id)
1355                 {
1356                     send(*target_i, msg.segment_id(), relay_dg);
1357                 }
1358             }
1359         }
1360     }
1361     else if (msg.flags() & Message::F_SEGMENT_RELAY)
1362     {
1363         if (relay_set_.empty() == false)
1364         {
1365             // send message to all nodes in relay set to reach
1366             // nodes in local segment that are not directly reachable
1367             relay_msg.set_flags(relay_msg.flags() | Message::F_RELAY);
1368             gu_trace(push_header(relay_msg, relay_dg));
1369             for (RelaySet::iterator relay_i(relay_set_.begin());
1370                  relay_i != relay_set_.end(); ++relay_i)
1371             {
1372                 if ((*relay_i).socket->id() != exclude_id)
1373                 {
1374                     send(*relay_i, msg.segment_id(), relay_dg);
1375                 }
1376             }
1377             gu_trace(pop_header(relay_msg, relay_dg));
1378             relay_msg.set_flags(relay_msg.flags() & ~Message::F_RELAY);
1379         }
1380 
1381         if (msg.segment_id() == segment_)
1382         {
1383             log_warn << "message with F_SEGMENT_RELAY from own segment, "
1384                      << "source " << msg.source_uuid();
1385         }
1386 
1387         // Relay to local segment
1388         gu_trace(push_header(relay_msg, relay_dg));
1389         Segment& segment(segment_map_[segment_]);
1390         for (Segment::iterator i(segment.begin()); i != segment.end(); ++i)
1391         {
1392             send(*i, msg.segment_id(), relay_dg);
1393         }
1394     }
1395     else
1396     {
1397         log_warn << "GMCast::relay() called without relay flags set";
1398     }
1399 }
1400 
handle_up(const void * id,const Datagram & dg,const ProtoUpMeta & um)1401 void gcomm::GMCast::handle_up(const void*        id,
1402                        const Datagram&    dg,
1403                        const ProtoUpMeta& um)
1404 {
1405     ProtoMap::iterator i;
1406 
1407     if (listener_ == 0) { return; }
1408 
1409     if (id == listener_->id())
1410     {
1411         gmcast_accept();
1412     }
1413     else if (mcast_ && id == mcast_->id())
1414     {
1415         Message msg;
1416 
1417         try
1418         {
1419             if (dg.offset() < dg.header_len())
1420             {
1421                 gu_trace(msg.unserialize(dg.header(), dg.header_size(),
1422                                          dg.header_offset() +
1423                                          dg.offset()));
1424             }
1425             else
1426             {
1427                 gu_trace(msg.unserialize(dg.payload().data(),
1428                                          dg.len(),
1429                                          dg.offset()));
1430             }
1431         }
1432         catch (gu::Exception& e)
1433         {
1434             GU_TRACE(e);
1435             log_warn << e.what();
1436             return;
1437         }
1438 
1439         if (msg.type() >= Message::GMCAST_T_USER_BASE)
1440         {
1441             gu_trace(send_up(Datagram(dg, dg.offset() + msg.serial_size()),
1442                              ProtoUpMeta(msg.source_uuid())));
1443         }
1444         else
1445         {
1446             log_warn << "non-user message " << msg.type()
1447                      << " from multicast socket";
1448         }
1449     }
1450     else if ((i = proto_map_->find(id)) != proto_map_->end())
1451     {
1452         Proto* p(ProtoMap::value(i));
1453 
1454         if (dg.len() > 0)
1455         {
1456             const Proto::State prev_state(p->state());
1457 
1458             if (prev_state == Proto::S_FAILED)
1459             {
1460                 log_warn << "unhandled failed proto";
1461                 handle_failed(p);
1462                 return;
1463             }
1464 
1465             Message msg;
1466 
1467             try
1468             {
1469                 msg.unserialize(dg.payload().data(), dg.len(),
1470                                 dg.offset());
1471             }
1472             catch (gu::Exception& e)
1473             {
1474                 GU_TRACE(e);
1475                 log_warn << e.what();
1476                 p->set_state(Proto::S_FAILED);
1477                 handle_failed(p);
1478                 return;
1479             }
1480 
1481             if (msg.type() >= Message::GMCAST_T_USER_BASE)
1482             {
1483                 if (evict_list().empty() == false &&
1484                     evict_list().find(msg.source_uuid()) != evict_list().end())
1485                 {
1486                     return;
1487                 }
1488                 if (msg.flags() &
1489                     (Message::F_RELAY | Message::F_SEGMENT_RELAY))
1490                 {
1491                     relay(msg,
1492                           Datagram(dg, dg.offset() + msg.serial_size()),
1493                           id);
1494                 }
1495                 p->set_recv_tstamp(gu::datetime::Date::monotonic());
1496                 send_up(Datagram(dg, dg.offset() + msg.serial_size()),
1497                         ProtoUpMeta(msg.source_uuid()));
1498                 return;
1499             }
1500             else
1501             {
1502                 try
1503                 {
1504                     p->set_recv_tstamp(gu::datetime::Date::monotonic());
1505                     gu_trace(p->handle_message(msg));
1506                 }
1507                 catch (const gu::Exception& e)
1508                 {
1509                     handle_failed(p);
1510                     if (e.get_errno() == ENOTRECOVERABLE)
1511                     {
1512                         throw;
1513                     }
1514                     log_warn
1515                         << "handling gmcast protocol message failed: "
1516                         << e.what();
1517                     return;
1518                 }
1519 
1520                 if (p->state() == Proto::S_FAILED)
1521                 {
1522                     handle_failed(p);
1523                     return;
1524                 }
1525                 else if (p->check_changed_and_reset() == true)
1526                 {
1527                     update_addresses();
1528                     check_liveness();
1529                     reconnect();
1530                 }
1531             }
1532 
1533             if (prev_state != Proto::S_OK && p->state() == Proto::S_OK)
1534             {
1535                 handle_established(p);
1536             }
1537         }
1538         else if (p->socket()->state() == Socket::S_CONNECTED &&
1539                  (p->state() == Proto::S_HANDSHAKE_WAIT ||
1540                   p->state() == Proto::S_INIT))
1541         {
1542             handle_connected(p);
1543         }
1544         else if (p->socket()->state() == Socket::S_CONNECTED)
1545         {
1546             log_warn << "connection " << p->socket()->id()
1547                      << " closed by peer";
1548             p->set_state(Proto::S_FAILED);
1549             handle_failed(p);
1550         }
1551         else
1552         {
1553             log_debug << "socket in state " << p->socket()->state();
1554             p->set_state(Proto::S_FAILED);
1555             handle_failed(p);
1556         }
1557     }
1558     else
1559     {
1560         // log_info << "proto entry " << id << " not found";
1561     }
1562 }
1563 
find_by_remote_uuid(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::UUID & uuid)1564 static gcomm::gmcast::Proto* find_by_remote_uuid(
1565     const gcomm::gmcast::ProtoMap& proto_map,
1566     const gcomm::UUID& uuid)
1567 {
1568     for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
1569          i != proto_map.end(); ++i)
1570     {
1571         if (i->second->remote_uuid() == uuid)
1572         {
1573             return i->second;
1574         }
1575     }
1576     return 0;
1577 }
1578 
handle_down(Datagram & dg,const ProtoDownMeta & dm)1579 int gcomm::GMCast::handle_down(Datagram& dg, const ProtoDownMeta& dm)
1580 {
1581     Message msg(version_, Message::GMCAST_T_USER_BASE, uuid(), 1, segment_);
1582 
1583     // If target is set and proto entry for target is found,
1584     // send a direct message. Otherwise fall back for broadcast
1585     // to ensure message delivery via relay
1586     if (dm.target() != UUID::nil())
1587     {
1588         Proto* target_proto(find_by_remote_uuid(*proto_map_, dm.target()));
1589         if (target_proto && target_proto->state() == Proto::S_OK)
1590         {
1591             gu_trace(push_header(msg, dg));
1592             int err;
1593             if ((err = target_proto->socket()->send(msg.segment_id(), dg)) != 0)
1594             {
1595                 log_debug << "failed to send to "
1596                           << target_proto->socket()->remote_addr()
1597                           << ": (" << err << ") " << strerror(err);
1598             }
1599             else
1600             {
1601                 target_proto->set_send_tstamp(gu::datetime::Date::monotonic());
1602             }
1603             gu_trace(pop_header(msg, dg));
1604             if (err == 0)
1605             {
1606                 return 0;
1607             }
1608             // In case of error fall back to broadcast
1609         }
1610         else
1611         {
1612             log_debug << "Target " << dm.target() << " proto not found";
1613         }
1614     }
1615 
1616     // handle relay set first, skip these peers below
1617     if (relay_set_.empty() == false)
1618     {
1619         msg.set_flags(msg.flags() | Message::F_RELAY);
1620         gu_trace(push_header(msg, dg));
1621         for (RelaySet::iterator ri(relay_set_.begin());
1622              ri != relay_set_.end(); ++ri)
1623         {
1624             send(*ri, msg.segment_id(), dg);
1625         }
1626         gu_trace(pop_header(msg, dg));
1627         msg.set_flags(msg.flags() & ~Message::F_RELAY);
1628     }
1629 
1630     for (SegmentMap::iterator si(segment_map_.begin());
1631          si != segment_map_.end(); ++si)
1632     {
1633         uint8_t segment_id(si->first);
1634         Segment& segment(si->second);
1635 
1636         if (segment_id != segment_)
1637         {
1638             size_t target_idx((self_index_ + segment_id) % segment.size());
1639             msg.set_flags(msg.flags() | Message::F_SEGMENT_RELAY);
1640             // skip peers that are in relay set
1641             if (relay_set_.empty() == true ||
1642                 relay_set_.find(segment[target_idx]) == relay_set_.end())
1643             {
1644                 gu_trace(push_header(msg, dg));
1645                 send(segment[target_idx], msg.segment_id(), dg);
1646                 gu_trace(pop_header(msg, dg));
1647             }
1648         }
1649         else
1650         {
1651             msg.set_flags(msg.flags() & ~Message::F_SEGMENT_RELAY);
1652             gu_trace(push_header(msg, dg));
1653             for (Segment::iterator i(segment.begin());
1654                  i != segment.end(); ++i)
1655             {
1656                 // skip peers that are in relay set
1657                 if (relay_set_.empty() == true ||
1658                     relay_set_.find(*i) == relay_set_.end())
1659                 {
1660                     send(*i, msg.segment_id(), dg);
1661                 }
1662             }
1663             gu_trace(pop_header(msg, dg));
1664         }
1665     }
1666 
1667     return 0;
1668 }
1669 
handle_stable_view(const View & view)1670 void gcomm::GMCast::handle_stable_view(const View& view)
1671 {
1672     log_debug << "GMCast::handle_stable_view: " << view;
1673     if (view.type() == V_PRIM)
1674     {
1675         // discard addr list entries not in view
1676         std::set<UUID> gmcast_lst;
1677         for (AddrList::const_iterator i(remote_addrs_.begin());
1678              i != remote_addrs_.end(); ++i)
1679         {
1680             gmcast_lst.insert(i->second.uuid());
1681         }
1682         std::set<UUID> view_lst;
1683         for (NodeList::const_iterator i(view.members().begin());
1684              i != view.members().end(); ++i)
1685         {
1686             view_lst.insert(i->first);
1687         }
1688         std::list<UUID> diff;
1689         std::set_difference(gmcast_lst.begin(),
1690                             gmcast_lst.end(),
1691                             view_lst.begin(),
1692                             view_lst.end(),
1693                             std::back_inserter(diff));
1694 
1695         // Forget partitioned entries, allow them to reconnect
1696         // in time_wait_/2. Left nodes are given time_wait_ ban for
1697         // reconnecting when handling V_REG below.
1698         for (std::list<UUID>::const_iterator i(diff.begin());
1699              i != diff.end(); ++i)
1700         {
1701             gmcast_forget(*i, time_wait_/2);
1702         }
1703 
1704         // mark nodes in view as stable
1705         for (std::set<UUID>::const_iterator i(view_lst.begin());
1706              i != view_lst.end(); ++i)
1707         {
1708             AddrList::iterator ai;
1709             if ((ai = find_if(remote_addrs_.begin(), remote_addrs_.end(),
1710                               AddrListUUIDCmp(*i))) != remote_addrs_.end())
1711             {
1712                 ai->second.set_retry_cnt(-1);
1713                 ai->second.set_max_retries(max_retry_cnt_);
1714             }
1715         }
1716 
1717         // iterate over pending address list and discard entries without UUID
1718         for (AddrList::iterator i(pending_addrs_.begin());
1719              i != pending_addrs_.end(); )
1720         {
1721             AddrList::iterator i_next(i);
1722             ++i_next;
1723             const AddrEntry& ae(AddrList::value(i));
1724             if (ae.uuid() == UUID())
1725             {
1726                 const std::string addr(AddrList::key(i));
1727                 log_info << "discarding pending addr without UUID: "
1728                          << addr;
1729                 for (ProtoMap::iterator pi(proto_map_->begin());
1730                      pi != proto_map_->end();)
1731                 {
1732                     ProtoMap::iterator pi_next(pi);
1733                     ++pi_next;
1734                     Proto* p(ProtoMap::value(pi));
1735                     if (p->remote_addr() == addr)
1736                     {
1737                         log_info << "discarding pending addr proto entry " << p;
1738                         erase_proto(pi);
1739                     }
1740                     pi = pi_next;
1741                 }
1742                 pending_addrs_.erase(i);
1743             }
1744             i = i_next;
1745         }
1746         prim_view_reached_ = true;
1747     }
1748     else if (view.type() == V_REG)
1749     {
1750         for (NodeList::const_iterator i(view.members().begin());
1751              i != view.members().end(); ++i)
1752         {
1753             AddrList::iterator ai;
1754             if ((ai = find_if(remote_addrs_.begin(), remote_addrs_.end(),
1755                               AddrListUUIDCmp(NodeList::key(i))))
1756                 != remote_addrs_.end())
1757             {
1758                 log_info << "declaring " << NodeList::key(i)
1759                          << " at " << handle_get_address(NodeList::key(i))
1760                          << " stable";
1761                 ai->second.set_retry_cnt(-1);
1762                 ai->second.set_max_retries(max_retry_cnt_);
1763             }
1764         }
1765 
1766         // Forget left nodes
1767         for (NodeList::const_iterator i(view.left().begin());
1768              i != view.left().end(); ++i)
1769         {
1770             gmcast_forget(NodeList::key(i), time_wait_);
1771         }
1772     }
1773     check_liveness();
1774 
1775     for (ProtoMap::const_iterator i(proto_map_->begin()); i != proto_map_->end();
1776          ++i)
1777     {
1778         log_debug << "proto: " << *ProtoMap::value(i);
1779     }
1780 }
1781 
1782 
handle_evict(const UUID & uuid)1783 void gcomm::GMCast::handle_evict(const UUID& uuid)
1784 {
1785     if (is_evicted(uuid) == true)
1786     {
1787         return;
1788     }
1789     gmcast_forget(uuid, time_wait_);
1790 }
1791 
1792 
handle_get_address(const UUID & uuid) const1793 std::string gcomm::GMCast::handle_get_address(const UUID& uuid) const
1794 {
1795     AddrList::const_iterator ali(
1796         find_if(remote_addrs_.begin(),
1797                 remote_addrs_.end(),
1798                 AddrListUUIDCmp(uuid)));
1799     return (ali == remote_addrs_.end() ? "" : AddrList::key(ali));
1800 }
1801 
add_or_del_addr(const std::string & val)1802 void gcomm::GMCast::add_or_del_addr(const std::string& val)
1803 {
1804     if (val.compare(0, 4, "add:") == 0)
1805     {
1806         gu::URI uri(val.substr(4));
1807         std::string addr(gu::net::resolve(uri_string(get_scheme(use_ssl_, dynamic_socket_),
1808                                                      uri.get_host(),
1809                                                      uri.get_port())).to_string());
1810         log_info << "inserting address '" << addr << "'";
1811         insert_address(addr, UUID(), remote_addrs_);
1812         AddrList::iterator ai(remote_addrs_.find(addr));
1813         AddrList::value(ai).set_max_retries(
1814             max_initial_reconnect_attempts_);
1815         AddrList::value(ai).set_retry_cnt(-1);
1816     }
1817     else if (val.compare(0, 4, "del:") == 0)
1818     {
1819         std::string addr(val.substr(4));
1820         AddrList::iterator ai(remote_addrs_.find(addr));
1821         if (ai != remote_addrs_.end())
1822         {
1823             ProtoMap::iterator pi, pi_next;
1824             for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
1825             {
1826                 pi_next = pi, ++pi_next;
1827                 Proto* rp = ProtoMap::value(pi);
1828                 if (rp->remote_addr() == AddrList::key(ai))
1829                 {
1830                     log_info << "deleting entry " << AddrList::key(ai);
1831                     erase_proto(pi);
1832                 }
1833             }
1834             AddrEntry& ae(AddrList::value(ai));
1835             ae.set_max_retries(0);
1836             ae.set_retry_cnt(1);
1837             ae.set_next_reconnect(gu::datetime::Date::monotonic() + time_wait_);
1838             update_addresses();
1839         }
1840         else
1841         {
1842             log_info << "address '" << addr
1843                      << "' not found from remote addrs list";
1844         }
1845     }
1846     else
1847     {
1848         gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1849     }
1850 }
1851 
1852 
set_param(const std::string & key,const std::string & val,Protolay::sync_param_cb_t & sync_param_cb)1853 bool gcomm::GMCast::set_param(const std::string& key, const std::string& val,
1854                               Protolay::sync_param_cb_t& sync_param_cb)
1855 {
1856     try
1857     {
1858         if (key == Conf::GMCastMaxInitialReconnectAttempts)
1859         {
1860             max_initial_reconnect_attempts_ = gu::from_string<int>(val);
1861             return true;
1862         }
1863         else if (key == Conf::GMCastPeerAddr)
1864         {
1865             try
1866             {
1867                 add_or_del_addr(val);
1868             }
1869             catch (gu::NotFound& nf)
1870             {
1871                 gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1872             }
1873             catch (gu::NotSet& ns)
1874             {
1875                 gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1876             }
1877             return true;
1878         }
1879         else if (key == Conf::GMCastIsolate)
1880         {
1881             int tmpval = gu::from_string<int>(val);
1882             if (tmpval < 0 || tmpval > 2)
1883             {
1884                 gu_throw_error(EINVAL)
1885                     << "invalid value for gmacst.isolate: '"
1886                     << tmpval << "'";
1887             }
1888             isolate_ = tmpval;
1889             log_info << "turning isolation "
1890                      << (isolate_ == 1 ? "on" :
1891                          (isolate_ == 2 ? "force quit" : "off"));
1892             if (isolate_)
1893             {
1894                 // delete all entries in proto map
1895                 ProtoMap::iterator pi, pi_next;
1896                 for (pi = proto_map_->begin(); pi != proto_map_->end();
1897                      pi = pi_next)
1898                 {
1899                     pi_next = pi, ++pi_next;
1900                     erase_proto(pi);
1901                 }
1902                 segment_map_.clear();
1903             }
1904             return true;
1905         }
1906         else if (key == Conf::SocketRecvBufSize)
1907         {
1908             gu_trace(Conf::check_recv_buf_size(val));
1909             conf_.set(key, val);
1910 
1911             for (ProtoMap::iterator pi(proto_map_->begin());
1912                  pi != proto_map_->end(); ++pi)
1913             {
1914                 gu_trace(pi->second->socket()->set_option(key, val));
1915                 // erase_proto(pi++);
1916             }
1917             // segment_map_.clear();
1918             // reconnect();
1919             return true;
1920         }
1921         else if (key == Conf::GMCastGroup       ||
1922                  key == Conf::GMCastListenAddr  ||
1923                  key == Conf::GMCastMCastAddr   ||
1924                  key == Conf::GMCastMCastPort   ||
1925                  key == Conf::GMCastMCastTTL    ||
1926                  key == Conf::GMCastTimeWait    ||
1927                  key == Conf::GMCastPeerTimeout ||
1928                  key == Conf::GMCastSegment)
1929         {
1930             gu_throw_error(EPERM) << "can't change value during runtime";
1931         }
1932     }
1933     catch (gu::Exception& e)
1934     {
1935         GU_TRACE(e); throw;
1936     }
1937     catch (std::exception& e)
1938     {
1939         gu_throw_error(EINVAL) << e.what();
1940     }
1941     catch (...)
1942     {
1943         gu_throw_error(EINVAL) << "exception";
1944     }
1945 
1946     return false;
1947 }
1948