1 /*
2  * Copyright (C) 2009-2019 Codership Oy <info@codership.com>
3  */
4 
5 #include "gmcast.hpp"
6 #include "gmcast_proto.hpp"
7 
8 #include "gcomm/common.hpp"
9 #include "gcomm/conf.hpp"
10 #include "gcomm/util.hpp"
11 #include "gcomm/map.hpp"
12 #include "defaults.hpp"
13 #include "gu_convert.hpp"
14 #include "gu_resolver.hpp"
15 #include "gu_asio.hpp" // gu::conf::use_ssl
16 
17 using namespace std::rel_ops;
18 
19 using gcomm::gmcast::Proto;
20 using gcomm::gmcast::ProtoMap;
21 using gcomm::gmcast::Link;
22 using gcomm::gmcast::LinkMap;
23 using gcomm::gmcast::Message;
24 
25 const long gcomm::GMCast::max_retry_cnt_(std::numeric_limits<int>::max());
26 
set_tcp_defaults(gu::URI * uri)27 static void set_tcp_defaults (gu::URI* uri)
28 {
29     // what happens if there is already this parameter?
30     uri->set_option(gcomm::Conf::TcpNonBlocking, gu::to_string(1));
31 }
32 
33 
check_tcp_uri(const gu::URI & uri)34 static bool check_tcp_uri(const gu::URI& uri)
35 {
36     return (uri.get_scheme() == gu::scheme::tcp ||
37             uri.get_scheme() == gu::scheme::ssl);
38 }
39 
get_scheme(bool use_ssl)40 static std::string get_scheme(bool use_ssl)
41 {
42     if (use_ssl == true)
43     {
44         return gu::scheme::ssl;
45     }
46     return gu::scheme::tcp;
47 }
48 
49 //
50 // Check if the node should stay isolated.
51 // Possible outcomes:
52 // * Return false, node should continue reconnecting and accepting connections
53 //   (isolate = 0)
54 // * Return true, node should remain isolated (isolate = 1)
55 // * Throw fatal exception to terminate the backend (isolate = 2)
56 //
is_isolated(int isolate)57 static inline bool is_isolated(int isolate)
58 {
59     switch (isolate)
60     {
61     case 1:
62         return true;
63     case 2:
64         gu_throw_fatal<< "Gcomm backend termination was "
65                       << "requested by setting gmcast.isolate=2.";
66         break;
67     default:
68         break;
69     }
70     return false;
71 }
72 
GMCast(Protonet & net,const gu::URI & uri,const UUID * my_uuid)73 gcomm::GMCast::GMCast(Protonet& net, const gu::URI& uri,
74                       const UUID* my_uuid)
75     :
76     Transport     (net, uri),
77     version_(check_range(Conf::GMCastVersion,
78                          param<int>(conf_, uri, Conf::GMCastVersion, "0"),
79                          0, max_version_ + 1)),
80     segment_ (check_range(Conf::GMCastSegment,
81                           param<int>(conf_, uri, Conf::GMCastSegment, "0"),
82                           0, 255)),
83     my_uuid_      (my_uuid ? *my_uuid : UUID(0, 0)),
84     use_ssl_      (param<bool>(conf_, uri, gu::conf::use_ssl, "false")),
85     // @todo: technically group name should be in path component
86     group_name_   (param<std::string>(conf_, uri, Conf::GMCastGroup, "")),
87     listen_addr_  (
88         param<std::string>(
89             conf_, uri, Conf::GMCastListenAddr,
90             get_scheme(use_ssl_) + "://0.0.0.0")), // how to make it IPv6 safe?
91     initial_addrs_(),
92     mcast_addr_   (param<std::string>(conf_, uri, Conf::GMCastMCastAddr, "")),
93     bind_ip_      (""),
94     mcast_ttl_    (check_range(
95                        Conf::GMCastMCastTTL,
96                        param<int>(conf_, uri, Conf::GMCastMCastTTL, "1"),
97                        1, 256)),
98     listener_     (0),
99     mcast_        (),
100     pending_addrs_(),
101     remote_addrs_ (),
102     addr_blacklist_(),
103     relaying_     (false),
104     isolate_      (0),
105     prim_view_reached_(false),
106     proto_map_    (new ProtoMap()),
107     relay_set_    (),
108     segment_map_  (),
109     self_index_   (std::numeric_limits<size_t>::max()),
110     time_wait_    (param<gu::datetime::Period>(
111                        conf_, uri,
112                        Conf::GMCastTimeWait, Defaults::GMCastTimeWait)),
113     check_period_ ("PT0.5S"),
114     peer_timeout_ (param<gu::datetime::Period>(
115                        conf_, uri,
116                        Conf::GMCastPeerTimeout, Defaults::GMCastPeerTimeout)),
117     max_initial_reconnect_attempts_(
118         param<int>(conf_, uri,
119                    Conf::GMCastMaxInitialReconnectAttempts,
120                    gu::to_string(max_retry_cnt_))),
121     next_check_   (gu::datetime::Date::monotonic())
122 {
123     log_info << "GMCast version " << version_;
124 
125     if (group_name_ == "")
126     {
127         gu_throw_error (EINVAL) << "Group not defined in URL: "
128                                 << uri_.to_string();
129     }
130 
131     set_initial_addr(uri_);
132 
133     try
134     {
135         listen_addr_ = uri_.get_option (Conf::GMCastListenAddr);
136     }
137     catch (gu::NotFound&) {}
138 
139     try
140     {
141         gu::URI uri(listen_addr_); /* check validity of the address */
142     }
143     catch (gu::Exception&)
144     {
145         /* most probably no scheme, try to append one and see if it succeeds */
146         listen_addr_ = uri_string(get_scheme(use_ssl_), listen_addr_);
147         gu_trace(gu::URI uri(listen_addr_));
148     }
149 
150     gu::URI listen_uri(listen_addr_);
151 
152     if (check_tcp_uri(listen_uri) == false)
153     {
154         gu_throw_error (EINVAL) << "listen addr '" << listen_addr_
155                                 << "' does not specify supported protocol";
156     }
157 
158     if (gu::net::resolve(listen_uri).get_addr().is_anyaddr() == false)
159     {
160         // bind outgoing connections to the same address as listening.
161         gu_trace(bind_ip_ = listen_uri.get_host());
162     }
163 
164     std::string port(Defaults::GMCastTcpPort);
165 
166     try
167     {
168         port = listen_uri.get_port();
169     }
170     catch (gu::NotSet&)
171     {
172         // if no listen port is set for listen address in the options,
173         // see if base port was configured
174         try
175         {
176             port = conf_.get(BASE_PORT_KEY);
177         }
178         catch (gu::NotSet&)
179         {
180             // if no base port configured, try port from the connection address
181             try { port = uri_.get_port(); } catch (gu::NotSet&) {}
182         }
183 
184         listen_addr_ += ":" + port;
185     }
186 
187     conf_.set(BASE_PORT_KEY, port);
188 
189     listen_addr_ = gu::net::resolve(listen_addr_).to_string();
190     // resolving sets scheme to tcp, have to rewrite for ssl
191     if (use_ssl_ == true)
192     {
193         listen_addr_.replace(0, 3, gu::scheme::ssl);
194     }
195 
196     std::set<std::string>::iterator iaself(initial_addrs_.find(listen_addr_));
197     if (iaself != initial_addrs_.end())
198     {
199         log_debug << "removing own listen address '" << *iaself
200                   << "' from initial address list";
201         initial_addrs_.erase(iaself);
202     }
203 
204     if (mcast_addr_ != "")
205     {
206         try
207         {
208             port = param<std::string>(conf_, uri_, Conf::GMCastMCastPort, port);
209         }
210         catch (gu::NotFound&) {}
211 
212         mcast_addr_ = gu::net::resolve(
213             uri_string(gu::scheme::udp, mcast_addr_, port)).to_string();
214     }
215 
216     log_info << self_string() << " listening at " << listen_addr_;
217     log_info << self_string() << " multicast: " << mcast_addr_
218              << ", ttl: " << mcast_ttl_;
219 
220     conf_.set(Conf::GMCastListenAddr, listen_addr_);
221     conf_.set(Conf::GMCastMCastAddr, mcast_addr_);
222     conf_.set(Conf::GMCastVersion, gu::to_string(version_));
223     conf_.set(Conf::GMCastTimeWait, gu::to_string(time_wait_));
224     conf_.set(Conf::GMCastMCastTTL, gu::to_string(mcast_ttl_));
225     conf_.set(Conf::GMCastPeerTimeout, gu::to_string(peer_timeout_));
226     conf_.set(Conf::GMCastSegment, gu::to_string<int>(segment_));
227 }
228 
~GMCast()229 gcomm::GMCast::~GMCast()
230 {
231     if (listener_ != 0) close();
232 
233     delete proto_map_;
234 }
235 
set_initial_addr(const gu::URI & uri)236 void gcomm::GMCast::set_initial_addr(const gu::URI& uri)
237 {
238 
239     const gu::URI::AuthorityList& al(uri.get_authority_list());
240 
241     for (gu::URI::AuthorityList::const_iterator i(al.begin());
242          i != al.end(); ++i)
243     {
244         std::string host;
245         try
246         {
247             host = i->host();
248         }
249         catch (gu::NotSet& ns)
250         {
251             gu_throw_error(EINVAL) << "Unset host in URL " << uri;
252         }
253 
254         if (host_is_any(host)) continue;
255 
256         std::string port;
257         try
258         {
259             port = i->port();
260         }
261         catch (gu::NotSet&)
262         {
263             try
264             {
265                 port = conf_.get(BASE_PORT_KEY);
266             }
267             catch (gu::NotFound&)
268             {
269                 port = Defaults::GMCastTcpPort;
270             }
271             catch (gu::NotSet&)
272             {
273                 port = Defaults::GMCastTcpPort;
274             }
275         }
276 
277         std::string initial_uri = uri_string(get_scheme(use_ssl_), host, port);
278         std::string initial_addr;
279         try
280         {
281             initial_addr = gu::net::resolve(initial_uri).to_string();
282         }
283         catch (gu::Exception& )
284         {
285             log_warn << "Failed to resolve " << initial_uri;
286             continue;
287         }
288 
289         // resolving sets scheme to tcp, have to rewrite for ssl
290         if (use_ssl_ == true)
291         {
292             initial_addr.replace(0, 3, gu::scheme::ssl);
293         }
294 
295         if (check_tcp_uri(initial_addr) == false)
296         {
297             gu_throw_error (EINVAL) << "initial addr '" << initial_addr
298                                     << "' is not valid";
299         }
300 
301         log_debug << self_string() << " initial addr: " << initial_addr;
302         initial_addrs_.insert(initial_addr);
303 
304     }
305 
306 }
307 
connect_precheck(bool start_prim)308 void gcomm::GMCast::connect_precheck(bool start_prim)
309 {
310     if (!start_prim && initial_addrs_.empty()) {
311         gu_throw_fatal << "No address to connect";
312     }
313 }
314 
connect()315 void gcomm::GMCast::connect()
316 {
317     pstack_.push_proto(this);
318     log_debug << "gmcast " << uuid() << " connect";
319 
320     gu::URI listen_uri(listen_addr_);
321 
322     set_tcp_defaults (&listen_uri);
323 
324     listener_ = pnet().acceptor(listen_uri);
325     gu_trace (listener_->listen(listen_uri));
326 
327     if (!mcast_addr_.empty())
328     {
329         gu::URI mcast_uri(
330             mcast_addr_ + '?'
331             + gcomm::Socket::OptIfAddr + '='
332             + gu::URI(listen_addr_).get_host()+'&'
333             + gcomm::Socket::OptNonBlocking + "=1&"
334             + gcomm::Socket::OptMcastTTL    + '=' + gu::to_string(mcast_ttl_)
335             );
336 
337         mcast_ = pnet().socket(mcast_uri);
338         gu_trace(mcast_->connect(mcast_uri));
339     }
340 
341     if (!initial_addrs_.empty())
342     {
343         for (std::set<std::string>::const_iterator i(initial_addrs_.begin());
344              i != initial_addrs_.end(); ++i)
345         {
346             insert_address(*i, UUID(), pending_addrs_);
347             AddrList::iterator ai(pending_addrs_.find(*i));
348             AddrList::value(ai).set_max_retries(max_retry_cnt_);
349             gu_trace (gmcast_connect(*i));
350         }
351     }
352 }
353 
354 
connect(const gu::URI & uri)355 void gcomm::GMCast::connect(const gu::URI& uri)
356 {
357     set_initial_addr(uri);
358     connect();
359 }
360 
361 
362 
close(bool force)363 void gcomm::GMCast::close(bool force)
364 {
365     log_debug << "gmcast " << uuid() << " close";
366     pstack_.pop_proto(this);
367     if (mcast_)
368     {
369         mcast_->close();
370         // delete mcast;
371         // mcast = 0;
372     }
373 
374     gcomm_assert(listener_ != 0);
375     listener_->close();
376     delete listener_;
377     listener_ = 0;
378 
379     segment_map_.clear();
380     for (ProtoMap::iterator
381              i = proto_map_->begin(); i != proto_map_->end(); ++i)
382     {
383         delete ProtoMap::value(i);
384     }
385 
386     proto_map_->clear();
387     pending_addrs_.clear();
388     remote_addrs_.clear();
389     prim_view_reached_ = false;
390 }
391 
392 //
393 // Private
394 //
395 
396 
397 // Find other local endpoint matching to proto
398 static const Proto*
find_other_local_endpoint(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::gmcast::Proto * proto)399 find_other_local_endpoint(const gcomm::gmcast::ProtoMap& proto_map,
400                           const gcomm::gmcast::Proto* proto)
401 {
402     for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
403          i != proto_map.end(); ++i)
404     {
405         if (i->second != proto &&
406             i->second->handshake_uuid() == proto->handshake_uuid())
407         {
408             return i->second;
409         }
410     }
411     return 0;
412 }
413 
414 // Find other endpoint with same remote UUID
415 static const Proto*
find_other_endpoint_same_remote_uuid(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::gmcast::Proto * proto)416 find_other_endpoint_same_remote_uuid(const gcomm::gmcast::ProtoMap& proto_map,
417                                      const gcomm::gmcast::Proto* proto)
418 {
419     for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
420          i != proto_map.end(); ++i)
421     {
422         if (i->second != proto &&
423             i->second->remote_uuid() == proto->remote_uuid())
424         {
425             return i->second;
426         }
427     }
428     return 0;
429 }
430 
is_own(const gmcast::Proto * proto) const431 bool gcomm::GMCast::is_own(const gmcast::Proto* proto) const
432 {
433     assert(proto->remote_uuid() != gcomm::UUID::nil());
434     if (proto->remote_uuid() != uuid())
435     {
436         return false;
437     }
438     return find_other_local_endpoint(*proto_map_, proto);
439 }
440 
blacklist(const gmcast::Proto * proto)441 void gcomm::GMCast::blacklist(const gmcast::Proto* proto)
442 {
443     initial_addrs_.erase(proto->remote_addr());
444     pending_addrs_.erase(proto->remote_addr());
445     addr_blacklist_.insert(std::make_pair(
446                                proto->remote_addr(),
447                                AddrEntry(gu::datetime::Date::monotonic(),
448                                          gu::datetime::Date::monotonic(),
449                                          proto->remote_uuid())));
450 }
451 
is_not_own_and_duplicate_exists(const Proto * proto) const452 bool gcomm::GMCast::is_not_own_and_duplicate_exists(
453     const Proto* proto) const
454 {
455     assert(proto->remote_uuid() != gcomm::UUID::nil());
456     const Proto* other(find_other_local_endpoint(*proto_map_, proto));
457     if (!other)
458     {
459         // Not own
460         // Check if remote UUID matches to self
461         if (proto->remote_uuid() == uuid())
462         {
463             return true;
464         }
465         // Check if other proto entry with same remote
466         // UUID but different remote address exists.
467         other = find_other_endpoint_same_remote_uuid(*proto_map_, proto);
468         if (other && other->remote_addr() != proto->remote_addr())
469         {
470             return true;
471         }
472     }
473     return false;
474 }
475 
476 // Erase proto entry in safe manner
477 // 1) Erase from relay_set_
478 // 2) Erase from proto_map_
479 // 3) Delete proto entry
erase_proto(gmcast::ProtoMap::iterator i)480 void gcomm::GMCast::erase_proto(gmcast::ProtoMap::iterator i)
481 {
482     Proto* p(ProtoMap::value(i));
483     RelayEntry e(p, p->socket().get());
484     RelaySet::iterator si(relay_set_.find(e));
485     if (si != relay_set_.end())
486     {
487         relay_set_.erase(si);
488     }
489     proto_map_->erase(i);
490     delete p;
491 }
492 
gmcast_accept()493 void gcomm::GMCast::gmcast_accept()
494 {
495     SocketPtr tp;
496 
497     try
498     {
499         tp = listener_->accept();
500     }
501     catch (gu::Exception& e)
502     {
503         log_warn << e.what();
504         return;
505     }
506 
507     if (is_isolated(isolate_))
508     {
509         log_debug << "dropping accepted socket due to isolation";
510         tp->close();
511         return;
512     }
513 
514     Proto* peer = new Proto (
515         *this,
516         version_,
517         tp,
518         listener_->listen_addr(),
519         "",
520         mcast_addr_,
521         segment_,
522         group_name_);
523     std::pair<ProtoMap::iterator, bool> ret =
524         proto_map_->insert(std::make_pair(tp->id(), peer));
525 
526     if (ret.second == false)
527     {
528         delete peer;
529         gu_throw_fatal << "Failed to add peer to map";
530     }
531     if (tp->state() == Socket::S_CONNECTED)
532     {
533         peer->send_handshake();
534     }
535     else
536     {
537         log_debug << "accepted socket is connecting";
538     }
539     log_debug << "handshake sent";
540 }
541 
542 
gmcast_connect(const std::string & remote_addr)543 void gcomm::GMCast::gmcast_connect(const std::string& remote_addr)
544 {
545     if (remote_addr == listen_addr_) return;
546 
547     gu::URI connect_uri(remote_addr);
548 
549     set_tcp_defaults (&connect_uri);
550 
551     if (!bind_ip_.empty())
552     {
553         connect_uri.set_option(gcomm::Socket::OptIfAddr, bind_ip_);
554     }
555 
556     SocketPtr tp = pnet().socket(connect_uri);
557 
558     try
559     {
560         tp->connect(connect_uri);
561     }
562     catch (gu::Exception& e)
563     {
564         log_debug << "Connect failed: " << e.what();
565         // delete tp;
566         return;
567     }
568 
569     Proto* peer = new Proto (
570         *this,
571         version_,
572         tp,
573         listener_->listen_addr(),
574         remote_addr,
575         mcast_addr_,
576         segment_,
577         group_name_);
578 
579     std::pair<ProtoMap::iterator, bool> ret =
580         proto_map_->insert(std::make_pair(tp->id(), peer));
581 
582     if (ret.second == false)
583     {
584         delete peer;
585         gu_throw_fatal << "Failed to add peer to map";
586     }
587 
588     ret.first->second->wait_handshake();
589 }
590 
591 
gmcast_forget(const UUID & uuid,const gu::datetime::Period & wait_period)592 void gcomm::GMCast::gmcast_forget(const UUID& uuid,
593                                   const gu::datetime::Period& wait_period)
594 {
595     /* Close all proto entries corresponding to uuid */
596 
597     ProtoMap::iterator pi, pi_next;
598     for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
599     {
600         pi_next = pi, ++pi_next;
601         Proto* rp = ProtoMap::value(pi);
602         if (rp->remote_uuid() == uuid)
603         {
604             erase_proto(pi);
605         }
606     }
607 
608     /* Set all corresponding entries in address list to have retry cnt
609      * greater than max retries and next reconnect time after some period */
610     AddrList::iterator ai;
611     for (ai = remote_addrs_.begin(); ai != remote_addrs_.end(); ++ai)
612     {
613         AddrEntry& ae(AddrList::value(ai));
614         if (ae.uuid() == uuid)
615         {
616             log_info << "forgetting " << uuid
617                      << " (" << AddrList::key(ai) << ")";
618 
619             ProtoMap::iterator pi, pi_next;
620             for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
621             {
622                 pi_next = pi, ++pi_next;
623                 if (ProtoMap::value(pi)->remote_addr() == AddrList::key(ai))
624                 {
625                     log_info << "deleting entry " << AddrList::key(ai);
626                     erase_proto(pi);
627                 }
628             }
629             ae.set_max_retries(0);
630             ae.set_retry_cnt(1);
631             gu::datetime::Date now(gu::datetime::Date::monotonic());
632             // Don't reduce next reconnect time if it is set greater than
633             // requested
634             if ((now + wait_period > ae.next_reconnect()) ||
635                 (ae.next_reconnect() == gu::datetime::Date::max()))
636             {
637                 ae.set_next_reconnect(gu::datetime::Date::monotonic() + wait_period);
638             }
639             else
640             {
641                 log_debug << "not decreasing next reconnect for " << uuid;
642             }
643         }
644     }
645 
646     /* Update state */
647     update_addresses();
648 }
649 
handle_connected(Proto * rp)650 void gcomm::GMCast::handle_connected(Proto* rp)
651 {
652     const SocketPtr tp(rp->socket());
653     assert(tp->state() == Socket::S_CONNECTED);
654     log_debug << "transport " << tp << " connected";
655     if (rp->state() == Proto::S_INIT)
656     {
657         log_debug << "sending handshake";
658         // accepted socket was waiting for underlying transport
659         // handshake to finish
660         rp->send_handshake();
661     }
662 }
663 
handle_established(Proto * est)664 void gcomm::GMCast::handle_established(Proto* est)
665 {
666     log_info << self_string() << " connection established to "
667              << est->remote_uuid() << " "
668              << est->remote_addr();
669     // UUID checks are handled during protocol handshake
670     assert(est->remote_uuid() != uuid());
671 
672     if (is_evicted(est->remote_uuid()))
673     {
674         log_warn << "Closing connection to evicted node " << est->remote_uuid();
675         erase_proto(proto_map_->find_checked(est->socket()->id()));
676         update_addresses();
677         return;
678     }
679 
680     // If address is found from pending_addrs_, move it to remote_addrs list
681     // and set retry cnt to -1
682     const std::string& remote_addr(est->remote_addr());
683     AddrList::iterator i(pending_addrs_.find(remote_addr));
684 
685     if (i != pending_addrs_.end())
686     {
687         log_debug << "Erasing " << remote_addr << " from panding list";
688         pending_addrs_.erase(i);
689     }
690 
691     if ((i = remote_addrs_.find(remote_addr)) == remote_addrs_.end())
692     {
693         log_debug << "Inserting " << remote_addr << " to remote list";
694 
695         insert_address (remote_addr, est->remote_uuid(), remote_addrs_);
696         i = remote_addrs_.find(remote_addr);
697     }
698     else if (AddrList::value(i).uuid() != est->remote_uuid())
699     {
700         log_info << "remote endpoint " << est->remote_addr()
701                  << " changed identity " << AddrList::value(i).uuid()
702                  << " -> " << est->remote_uuid();
703         remote_addrs_.erase(i);
704         i = remote_addrs_.insert_unique(
705             make_pair(est->remote_addr(),
706                       AddrEntry(gu::datetime::Date::monotonic(),
707                                 gu::datetime::Date::monotonic(),
708                                 est->remote_uuid())));
709     }
710 
711     if (AddrList::value(i).retry_cnt() >
712         AddrList::value(i).max_retries())
713     {
714         log_warn << "discarding established (time wait) "
715                  << est->remote_uuid()
716                  << " (" << est->remote_addr() << ") ";
717         erase_proto(proto_map_->find(est->socket()->id()));
718         update_addresses();
719         return;
720     }
721 
722     // send_up(Datagram(), p->remote_uuid());
723 
724     // init retry cnt to -1 to avoid unnecessary logging at first attempt
725     // max retries will be readjusted in handle stable view
726     AddrList::value(i).set_retry_cnt(-1);
727     AddrList::value(i).set_max_retries(max_initial_reconnect_attempts_);
728 
729     // Cleanup all previously established entries with same
730     // remote uuid. It is assumed that the most recent connection
731     // is usually the healthiest one.
732     ProtoMap::iterator j, j_next;
733     for (j = proto_map_->begin(); j != proto_map_->end(); j = j_next)
734     {
735         j_next = j, ++j_next;
736 
737         Proto* p(ProtoMap::value(j));
738 
739         if (p->remote_uuid() == est->remote_uuid())
740         {
741             if (p->handshake_uuid() < est->handshake_uuid())
742             {
743                 log_debug << self_string()
744                           << " cleaning up duplicate "
745                           << p->socket()
746                           << " after established "
747                           << est->socket();
748                 erase_proto(j);
749             }
750             else if (p->handshake_uuid() > est->handshake_uuid())
751             {
752                 log_debug << self_string()
753                           << " cleaning up established "
754                           << est->socket()
755                           << " which is duplicate of "
756                           << p->socket();
757                 erase_proto(proto_map_->find_checked(est->socket()->id()));
758                 update_addresses();
759                 return;
760             }
761             else
762             {
763                 assert(p == est);
764             }
765         }
766     }
767 
768     AddrList::iterator ali(find_if(remote_addrs_.begin(),
769                                    remote_addrs_.end(),
770                                    AddrListUUIDCmp(est->remote_uuid())));
771     if (ali != remote_addrs_.end())
772     {
773         AddrList::value(ali).set_last_connect();
774     }
775     else
776     {
777         log_warn << "peer " << est->remote_addr()
778                  << " not found from remote addresses";
779     }
780 
781     update_addresses();
782 }
783 
handle_failed(Proto * failed)784 void gcomm::GMCast::handle_failed(Proto* failed)
785 {
786     log_debug << "handle failed: " << *failed;
787     const std::string& remote_addr = failed->remote_addr();
788 
789     bool found_ok(false);
790     for (ProtoMap::const_iterator i = proto_map_->begin();
791          i != proto_map_->end(); ++i)
792     {
793         Proto* p(ProtoMap::value(i));
794         if (p                    != failed      &&
795             p->state()       <= Proto::S_OK &&
796             p->remote_addr() == failed->remote_addr())
797         {
798             log_debug << "found live " << *p;
799             found_ok = true;
800             break;
801         }
802     }
803 
804     if (found_ok == false && remote_addr != "")
805     {
806         AddrList::iterator i;
807 
808         if ((i = pending_addrs_.find(remote_addr)) != pending_addrs_.end() ||
809             (i = remote_addrs_.find(remote_addr))  != remote_addrs_.end())
810         {
811             AddrEntry& ae(AddrList::value(i));
812             ae.set_retry_cnt(ae.retry_cnt() + 1);
813 
814             gu::datetime::Date rtime = gu::datetime::Date::monotonic() + gu::datetime::Period("PT1S");
815             log_debug << self_string()
816                       << " setting next reconnect time to "
817                       << rtime << " for " << remote_addr;
818             ae.set_next_reconnect(rtime);
819         }
820     }
821 
822     erase_proto(proto_map_->find_checked(failed->socket()->id()));
823     update_addresses();
824 }
825 
is_connected(const std::string & addr,const UUID & uuid) const826 bool gcomm::GMCast::is_connected(const std::string& addr, const UUID& uuid) const
827 {
828     for (ProtoMap::const_iterator i = proto_map_->begin();
829          i != proto_map_->end(); ++i)
830     {
831         Proto* conn = ProtoMap::value(i);
832 
833         if (addr == conn->remote_addr() ||
834             uuid == conn->remote_uuid())
835         {
836             return true;
837         }
838     }
839 
840     return false;
841 }
842 
843 
insert_address(const std::string & addr,const UUID & uuid,AddrList & alist)844 void gcomm::GMCast::insert_address (const std::string& addr,
845                              const UUID&   uuid,
846                              AddrList&     alist)
847 {
848     if (addr == listen_addr_)
849     {
850         gu_throw_fatal << "Trying to add self addr " << addr << " to addr list";
851     }
852 
853     if (alist.insert(make_pair(addr,
854                                AddrEntry(gu::datetime::Date::monotonic(),
855                                          gu::datetime::Date::monotonic(), uuid))).second == false)
856     {
857         log_warn << "Duplicate entry: " << addr;
858     }
859     else
860     {
861         log_debug << self_string() << ": new address entry " << uuid << ' '
862                   << addr;
863     }
864 }
865 
866 
update_addresses()867 void gcomm::GMCast::update_addresses()
868 {
869     LinkMap link_map;
870     std::set<UUID> uuids;
871     /* Add all established connections into uuid_map and update
872      * list of remote addresses */
873 
874     ProtoMap::iterator i, i_next;
875     for (i = proto_map_->begin(); i != proto_map_->end(); i = i_next)
876     {
877         i_next = i, ++i_next;
878 
879         Proto* rp = ProtoMap::value(i);
880 
881         if (rp->state() == Proto::S_OK)
882         {
883             if (rp->remote_addr() == "" ||
884                 rp->remote_uuid() == UUID::nil())
885             {
886                 gu_throw_fatal << "Protocol error: local: (" << my_uuid_
887                                << ", '" << listen_addr_
888                                << "'), remote: (" << rp->remote_uuid()
889                                << ", '" << rp->remote_addr() << "')";
890             }
891 
892             if (remote_addrs_.find(rp->remote_addr()) == remote_addrs_.end())
893             {
894                 log_warn << "Connection exists but no addr on addr list for "
895                          << rp->remote_addr();
896                 insert_address(rp->remote_addr(), rp->remote_uuid(),
897                                remote_addrs_);
898             }
899 
900             if (uuids.insert(rp->remote_uuid()).second == false)
901             {
902                 // Duplicate entry, drop this one
903                 // @todo Deeper inspection about the connection states
904                 log_debug << self_string() << " dropping duplicate entry";
905                 erase_proto(i);
906             }
907             else
908             {
909                 link_map.insert(Link(rp->remote_uuid(),
910                                      rp->remote_addr(),
911                                      rp->mcast_addr()));
912             }
913         }
914     }
915 
916     /* Send topology change message containing only established
917      * connections */
918     for (ProtoMap::iterator i = proto_map_->begin(); i != proto_map_->end(); ++i)
919     {
920         Proto* gp = ProtoMap::value(i);
921 
922         // @todo: a lot of stuff here is done for each connection, including
923         //        message creation and serialization. Need a mcast_msg() call
924         //        and move this loop in there.
925         if (gp->state() == Proto::S_OK)
926             gp->send_topology_change(link_map);
927     }
928 
929     /* Add entries reported by all other nodes to address list to
930      * get complete view of existing uuids/addresses */
931     for (ProtoMap::iterator i = proto_map_->begin(); i != proto_map_->end(); ++i)
932     {
933         Proto* rp = ProtoMap::value(i);
934 
935         if (rp->state() == Proto::S_OK)
936         {
937             for (LinkMap::const_iterator j = rp->link_map().begin();
938                  j != rp->link_map().end(); ++j)
939             {
940                 const UUID& link_uuid(LinkMap::key(j));
941                 const std::string& link_addr(LinkMap::value(j).addr());
942                 gcomm_assert(link_uuid != UUID::nil() && link_addr != "");
943 
944                 if (addr_blacklist_.find(link_addr) != addr_blacklist_.end())
945                 {
946                     log_debug << self_string()
947                               << " address '" << link_addr
948                               << "' pointing to uuid " << link_uuid
949                               << " is blacklisted, skipping";
950                     continue;
951                 }
952 
953                 if (link_uuid                     != uuid()         &&
954                     remote_addrs_.find(link_addr)  == remote_addrs_.end() &&
955                     pending_addrs_.find(link_addr) == pending_addrs_.end())
956                 {
957                     log_debug << self_string()
958                               << " conn refers to but no addr in addr list for "
959                               << link_addr;
960                     insert_address(link_addr, link_uuid, remote_addrs_);
961 
962                     AddrList::iterator pi(remote_addrs_.find(link_addr));
963 
964                     assert(pi != remote_addrs_.end());
965 
966                     AddrEntry& ae(AddrList::value(pi));
967 
968                     // init retry cnt to -1 to avoid unnecessary logging
969                     // at first attempt
970                     // max retries will be readjusted in handle stable view
971                     ae.set_retry_cnt(-1);
972                     ae.set_max_retries(max_initial_reconnect_attempts_);
973 
974                     // Add some randomness for first reconnect to avoid
975                     // simultaneous connects
976                     gu::datetime::Date rtime(gu::datetime::Date::monotonic());
977 
978                     rtime = rtime + ::rand() % (100*gu::datetime::MSec);
979                     ae.set_next_reconnect(rtime);
980                     next_check_ = std::min(next_check_, rtime);
981                 }
982             }
983         }
984     }
985 
986     // Build multicast tree
987     log_debug << self_string() << " --- mcast tree begin ---";
988     segment_map_.clear();
989 
990     Segment& local_segment(segment_map_[segment_]);
991 
992     if (mcast_)
993     {
994         log_debug << mcast_addr_;
995         local_segment.push_back(RelayEntry(0, mcast_.get()));
996     }
997 
998     self_index_ = 0;
999     for (ProtoMap::const_iterator i(proto_map_->begin()); i != proto_map_->end();
1000          ++i)
1001     {
1002         Proto* p(ProtoMap::value(i));
1003 
1004         log_debug << "Proto: " << *p;
1005 
1006         if (p->remote_segment() == segment_)
1007         {
1008             if (p->state() == Proto::S_OK &&
1009                 (p->mcast_addr() == "" ||
1010                  p->mcast_addr() != mcast_addr_))
1011             {
1012                 local_segment.push_back(RelayEntry(p, p->socket().get()));
1013                 if (p->remote_uuid() < uuid())
1014                 {
1015                     ++self_index_;
1016                 }
1017             }
1018         }
1019         else
1020         {
1021             if (p->state() == Proto::S_OK)
1022             {
1023                 Segment& remote_segment(segment_map_[p->remote_segment()]);
1024                 remote_segment.push_back(RelayEntry(p, p->socket().get()));
1025             }
1026         }
1027     }
1028     log_debug << self_string() << " self index: " << self_index_;
1029     log_debug << self_string() << " --- mcast tree end ---";
1030 }
1031 
1032 
reconnect()1033 void gcomm::GMCast::reconnect()
1034 {
1035     if (is_isolated(isolate_))
1036     {
1037         log_debug << "skipping reconnect due to isolation";
1038         return;
1039     }
1040 
1041     /* Loop over known remote addresses and connect if proto entry
1042      * does not exist */
1043     gu::datetime::Date now = gu::datetime::Date::monotonic();
1044     AddrList::iterator i, i_next;
1045 
1046     for (i = pending_addrs_.begin(); i != pending_addrs_.end(); i = i_next)
1047     {
1048         i_next = i, ++i_next;
1049 
1050         const std::string& pending_addr(AddrList::key(i));
1051         const AddrEntry& ae(AddrList::value(i));
1052 
1053         if (is_connected (pending_addr, UUID::nil()) == false &&
1054             ae.next_reconnect()                  <= now)
1055         {
1056             if (ae.retry_cnt() > ae.max_retries())
1057             {
1058                 log_info << "cleaning up pending addr " << pending_addr;
1059                 pending_addrs_.erase(i);
1060                 continue; // no reference to pending_addr after this
1061             }
1062             else if (ae.next_reconnect() <= now)
1063             {
1064                 log_debug << "connecting to pending " << pending_addr;
1065                 gmcast_connect (pending_addr);
1066             }
1067         }
1068     }
1069 
1070     for (i = remote_addrs_.begin(); i != remote_addrs_.end(); i = i_next)
1071     {
1072         i_next = i, ++i_next;
1073 
1074         const std::string& remote_addr(AddrList::key(i));
1075         const AddrEntry& ae(AddrList::value(i));
1076         const UUID& remote_uuid(ae.uuid());
1077 
1078         gcomm_assert(remote_uuid != uuid());
1079 
1080         if (is_connected(remote_addr, remote_uuid) == false &&
1081             ae.next_reconnect()                <= now)
1082         {
1083             if (ae.retry_cnt() > ae.max_retries())
1084             {
1085                 log_info << " cleaning up " << remote_uuid << " ("
1086                          << remote_addr << ")";
1087                 remote_addrs_.erase(i);
1088                 continue;//no reference to remote_addr or remote_uuid after this
1089             }
1090             else if (ae.next_reconnect() <= now)
1091             {
1092                 if (ae.retry_cnt() % 30 == 0)
1093                 {
1094                     log_info << self_string() << " reconnecting to "
1095                              << remote_uuid << " (" << remote_addr
1096                              << "), attempt " << ae.retry_cnt();
1097                 }
1098 
1099                 gmcast_connect(remote_addr);
1100             }
1101             else
1102             {
1103                 //
1104             }
1105         }
1106     }
1107 }
1108 
1109 namespace
1110 {
1111     class CmpUuidCounts
1112     {
1113     public:
CmpUuidCounts(const std::set<gcomm::UUID> & uuids,gcomm::SegmentId preferred_segment)1114         CmpUuidCounts(const std::set<gcomm::UUID>& uuids,
1115                       gcomm::SegmentId preferred_segment)
1116             :
1117             uuids_(uuids),
1118             preferred_segment_(preferred_segment)
1119         { }
1120 
count(const gcomm::gmcast::Proto * p) const1121         size_t count(const gcomm::gmcast::Proto* p) const
1122         {
1123             size_t cnt(0);
1124             for (std::set<gcomm::UUID>::const_iterator i(uuids_.begin());
1125                  i != uuids_.end(); ++i)
1126             {
1127                 for (gcomm::gmcast::LinkMap::const_iterator
1128                          lm_i(p->link_map().begin());
1129                      lm_i != p->link_map().end(); ++lm_i)
1130                 {
1131                     if (lm_i->uuid() == *i)
1132                     {
1133                         ++cnt;
1134                         break;
1135                     }
1136                 }
1137             }
1138             return cnt;
1139         }
operator ()(const gcomm::gmcast::Proto * a,const gcomm::gmcast::Proto * b) const1140         bool operator()(const gcomm::gmcast::Proto* a,
1141                         const gcomm::gmcast::Proto* b) const
1142         {
1143             size_t ac(count(a));
1144             size_t bc(count(b));
1145             // if counts are equal, prefer peer from the same segment
1146             return (ac < bc ||
1147                     (ac == bc && a->remote_segment() != preferred_segment_));
1148         }
1149 
1150     private:
1151         const std::set<gcomm::UUID>& uuids_;
1152         gcomm::SegmentId preferred_segment_;
1153     };
1154 }
1155 
1156 
check_liveness()1157 void gcomm::GMCast::check_liveness()
1158 {
1159     std::set<UUID> live_uuids;
1160 
1161     // iterate over proto map and mark all timed out entries as failed
1162     gu::datetime::Date now(gu::datetime::Date::monotonic());
1163     for (ProtoMap::iterator i(proto_map_->begin()); i != proto_map_->end(); )
1164     {
1165         // Store next iterator into temporary, handle_failed() may remove
1166         // the entry proto_map_.
1167         ProtoMap::iterator i_next(i);
1168         ++i_next;
1169         Proto* p(ProtoMap::value(i));
1170         if (p->state() > Proto::S_INIT &&
1171             p->state() < Proto::S_FAILED &&
1172             p->recv_tstamp() + peer_timeout_ < now)
1173         {
1174             gcomm::SocketStats stats(p->socket()->stats());
1175             log_info << self_string()
1176                      << " connection to peer "
1177                      << p->remote_uuid() << " with addr "
1178                      << p->remote_addr()
1179                      << " timed out, no messages seen in " << peer_timeout_
1180                      << ", socket stats: "
1181                      << stats;
1182             p->set_state(Proto::S_FAILED);
1183             handle_failed(p);
1184         }
1185         else if (p->state() == Proto::S_OK)
1186         {
1187             gcomm::SocketStats stats(p->socket()->stats());
1188             if (stats.send_queue_length >= 1024)
1189             {
1190                 log_debug << self_string()
1191                           << " socket send queue to "
1192                           << " peer "
1193                           << p->remote_uuid() << " with addr "
1194                           << p->remote_addr()
1195                           << ", socket stats: "
1196                           << stats;
1197             }
1198             if ((p->recv_tstamp() + peer_timeout_*2/3 < now) ||
1199                 (p->send_tstamp() + peer_timeout_*1/3 < now))
1200             {
1201                 p->send_keepalive();
1202             }
1203 
1204             if (p->state() == Proto::S_FAILED)
1205             {
1206                 handle_failed(p);
1207             }
1208             else
1209             {
1210                 live_uuids.insert(p->remote_uuid());
1211             }
1212         }
1213         i = i_next;
1214     }
1215 
1216     bool should_relay(false);
1217 
1218     // iterate over addr list and check if there is at least one live
1219     // proto entry associated to each addr entry
1220 
1221     std::set<UUID> nonlive_uuids;
1222     std::string nonlive_peers;
1223     for (AddrList::const_iterator i(remote_addrs_.begin());
1224          i != remote_addrs_.end(); ++i)
1225     {
1226         const AddrEntry& ae(AddrList::value(i));
1227         if (ae.retry_cnt()             <= ae.max_retries() &&
1228             live_uuids.find(ae.uuid()) == live_uuids.end())
1229         {
1230             // log_info << self_string()
1231             // << " missing live proto entry for " << ae.uuid();
1232             nonlive_uuids.insert(ae.uuid());
1233             nonlive_peers += AddrList::key(i) + " ";
1234             should_relay = true;
1235         }
1236         else if (ae.last_connect() + peer_timeout_ > now)
1237         {
1238             log_debug << "continuing relaying for "
1239                       << (ae.last_connect() + peer_timeout_ - now);
1240             should_relay = true;
1241         }
1242     }
1243 
1244     if (should_relay == true)
1245     {
1246         if (relaying_ == false)
1247         {
1248             log_info << self_string()
1249                      << " turning message relay requesting on, nonlive peers: "
1250                      << nonlive_peers;
1251             relaying_ = true;
1252         }
1253         relay_set_.clear();
1254         // build set of protos having OK status
1255         std::set<Proto*> proto_set;
1256         for (ProtoMap::iterator i(proto_map_->begin()); i != proto_map_->end();
1257              ++i)
1258         {
1259             Proto* p(ProtoMap::value(i));
1260             if (p->state() == Proto::S_OK)
1261             {
1262                 proto_set.insert(p);
1263             }
1264         }
1265         // find minimal set of proto entries required to reach maximum set
1266         // of nonlive peers
1267         while (nonlive_uuids.empty() == false &&
1268                proto_set.empty() == false)
1269         {
1270             std::set<Proto*>::iterator maxel(
1271                 std::max_element(proto_set.begin(),
1272                                  proto_set.end(), CmpUuidCounts(nonlive_uuids, segment_)));
1273             Proto* p(*maxel);
1274             log_debug << "relay set maxel :" << *p << " count: "
1275                       << CmpUuidCounts(nonlive_uuids, segment_).count(p);
1276 
1277             relay_set_.insert(RelayEntry(p, p->socket().get()));
1278             const LinkMap& lm(p->link_map());
1279             for (LinkMap::const_iterator lm_i(lm.begin()); lm_i != lm.end();
1280                  ++lm_i)
1281             {
1282                 nonlive_uuids.erase((*lm_i).uuid());
1283             }
1284             proto_set.erase(maxel);
1285         }
1286     }
1287     else if (relaying_ == true && should_relay == false)
1288     {
1289         log_info << self_string() << " turning message relay requesting off";
1290         relay_set_.clear();
1291         relaying_ = false;
1292     }
1293 }
1294 
1295 
handle_timers()1296 gu::datetime::Date gcomm::GMCast::handle_timers()
1297 {
1298     const gu::datetime::Date now(gu::datetime::Date::monotonic());
1299 
1300     if (now >= next_check_)
1301     {
1302         check_liveness();
1303         reconnect();
1304         next_check_ = now + check_period_;
1305     }
1306 
1307     return next_check_;
1308 }
1309 
1310 
send(const RelayEntry & re,int segment,gcomm::Datagram & dg)1311 void gcomm::GMCast::send(const RelayEntry& re, int segment, gcomm::Datagram& dg)
1312 {
1313     int err;
1314     if ((err = re.socket->send(segment, dg)) != 0)
1315     {
1316         log_debug << "failed to send to " << re.socket->remote_addr()
1317                   << ": (" << err << ") " << strerror(err);
1318     }
1319     else if (re.proto)
1320     {
1321         re.proto->set_send_tstamp(gu::datetime::Date::monotonic());
1322     }
1323 }
1324 
relay(const Message & msg,const Datagram & dg,const void * exclude_id)1325 void gcomm::GMCast::relay(const Message& msg,
1326                           const Datagram& dg,
1327                           const void* exclude_id)
1328 {
1329     Datagram relay_dg(dg);
1330     relay_dg.normalize();
1331     Message relay_msg(msg);
1332 
1333     // reset all relay flags from message to be relayed
1334     relay_msg.set_flags(relay_msg.flags() &
1335                         ~(Message::F_RELAY | Message::F_SEGMENT_RELAY));
1336 
1337     // if F_RELAY is set in received message, relay to all peers except
1338     // the originator
1339     if (msg.flags() & Message::F_RELAY)
1340     {
1341         gu_trace(push_header(relay_msg, relay_dg));
1342         for (SegmentMap::iterator segment_i(segment_map_.begin());
1343              segment_i != segment_map_.end(); ++segment_i)
1344         {
1345             Segment& segment(segment_i->second);
1346             for (Segment::iterator target_i(segment.begin());
1347                  target_i != segment.end(); ++target_i)
1348             {
1349                 if ((*target_i).socket->id() != exclude_id)
1350                 {
1351                     send(*target_i, msg.segment_id(), relay_dg);
1352                 }
1353             }
1354         }
1355     }
1356     else if (msg.flags() & Message::F_SEGMENT_RELAY)
1357     {
1358         if (relay_set_.empty() == false)
1359         {
1360             // send message to all nodes in relay set to reach
1361             // nodes in local segment that are not directly reachable
1362             relay_msg.set_flags(relay_msg.flags() | Message::F_RELAY);
1363             gu_trace(push_header(relay_msg, relay_dg));
1364             for (RelaySet::iterator relay_i(relay_set_.begin());
1365                  relay_i != relay_set_.end(); ++relay_i)
1366             {
1367                 if ((*relay_i).socket->id() != exclude_id)
1368                 {
1369                     send(*relay_i, msg.segment_id(), relay_dg);
1370                 }
1371             }
1372             gu_trace(pop_header(relay_msg, relay_dg));
1373             relay_msg.set_flags(relay_msg.flags() & ~Message::F_RELAY);
1374         }
1375 
1376         if (msg.segment_id() == segment_)
1377         {
1378             log_warn << "message with F_SEGMENT_RELAY from own segment, "
1379                      << "source " << msg.source_uuid();
1380         }
1381 
1382         // Relay to local segment
1383         gu_trace(push_header(relay_msg, relay_dg));
1384         Segment& segment(segment_map_[segment_]);
1385         for (Segment::iterator i(segment.begin()); i != segment.end(); ++i)
1386         {
1387             send(*i, msg.segment_id(), relay_dg);
1388         }
1389     }
1390     else
1391     {
1392         log_warn << "GMCast::relay() called without relay flags set";
1393     }
1394 }
1395 
handle_up(const void * id,const Datagram & dg,const ProtoUpMeta & um)1396 void gcomm::GMCast::handle_up(const void*        id,
1397                        const Datagram&    dg,
1398                        const ProtoUpMeta& um)
1399 {
1400     ProtoMap::iterator i;
1401 
1402     if (listener_ == 0) { return; }
1403 
1404     if (id == listener_->id())
1405     {
1406         gmcast_accept();
1407     }
1408     else if (mcast_ && id == mcast_->id())
1409     {
1410         Message msg;
1411 
1412         try
1413         {
1414             if (dg.offset() < dg.header_len())
1415             {
1416                 gu_trace(msg.unserialize(dg.header(), dg.header_size(),
1417                                          dg.header_offset() +
1418                                          dg.offset()));
1419             }
1420             else
1421             {
1422                 gu_trace(msg.unserialize(dg.payload().data(),
1423                                          dg.len(),
1424                                          dg.offset()));
1425             }
1426         }
1427         catch (gu::Exception& e)
1428         {
1429             GU_TRACE(e);
1430             log_warn << e.what();
1431             return;
1432         }
1433 
1434         if (msg.type() >= Message::GMCAST_T_USER_BASE)
1435         {
1436             gu_trace(send_up(Datagram(dg, dg.offset() + msg.serial_size()),
1437                              ProtoUpMeta(msg.source_uuid())));
1438         }
1439         else
1440         {
1441             log_warn << "non-user message " << msg.type()
1442                      << " from multicast socket";
1443         }
1444     }
1445     else if ((i = proto_map_->find(id)) != proto_map_->end())
1446     {
1447         Proto* p(ProtoMap::value(i));
1448 
1449         if (dg.len() > 0)
1450         {
1451             const Proto::State prev_state(p->state());
1452 
1453             if (prev_state == Proto::S_FAILED)
1454             {
1455                 log_warn << "unhandled failed proto";
1456                 handle_failed(p);
1457                 return;
1458             }
1459 
1460             Message msg;
1461 
1462             try
1463             {
1464                 msg.unserialize(dg.payload().data(), dg.len(),
1465                                 dg.offset());
1466             }
1467             catch (gu::Exception& e)
1468             {
1469                 GU_TRACE(e);
1470                 log_warn << e.what();
1471                 p->set_state(Proto::S_FAILED);
1472                 handle_failed(p);
1473                 return;
1474             }
1475 
1476             if (msg.type() >= Message::GMCAST_T_USER_BASE)
1477             {
1478                 if (evict_list().empty() == false &&
1479                     evict_list().find(msg.source_uuid()) != evict_list().end())
1480                 {
1481                     return;
1482                 }
1483                 if (msg.flags() &
1484                     (Message::F_RELAY | Message::F_SEGMENT_RELAY))
1485                 {
1486                     relay(msg,
1487                           Datagram(dg, dg.offset() + msg.serial_size()),
1488                           id);
1489                 }
1490                 p->set_recv_tstamp(gu::datetime::Date::monotonic());
1491                 send_up(Datagram(dg, dg.offset() + msg.serial_size()),
1492                         ProtoUpMeta(msg.source_uuid()));
1493                 return;
1494             }
1495             else
1496             {
1497                 try
1498                 {
1499                     p->set_recv_tstamp(gu::datetime::Date::monotonic());
1500                     gu_trace(p->handle_message(msg));
1501                 }
1502                 catch (const gu::Exception& e)
1503                 {
1504                     handle_failed(p);
1505                     if (e.get_errno() == ENOTRECOVERABLE)
1506                     {
1507                         throw;
1508                     }
1509                     log_warn
1510                         << "handling gmcast protocol message failed: "
1511                         << e.what();
1512                     return;
1513                 }
1514 
1515                 if (p->state() == Proto::S_FAILED)
1516                 {
1517                     handle_failed(p);
1518                     return;
1519                 }
1520                 else if (p->check_changed_and_reset() == true)
1521                 {
1522                     update_addresses();
1523                     check_liveness();
1524                     reconnect();
1525                 }
1526             }
1527 
1528             if (prev_state != Proto::S_OK && p->state() == Proto::S_OK)
1529             {
1530                 handle_established(p);
1531             }
1532         }
1533         else if (p->socket()->state() == Socket::S_CONNECTED &&
1534                  (p->state() == Proto::S_HANDSHAKE_WAIT ||
1535                   p->state() == Proto::S_INIT))
1536         {
1537             handle_connected(p);
1538         }
1539         else if (p->socket()->state() == Socket::S_CONNECTED)
1540         {
1541             log_warn << "connection " << p->socket()->id()
1542                      << " closed by peer";
1543             p->set_state(Proto::S_FAILED);
1544             handle_failed(p);
1545         }
1546         else
1547         {
1548             log_debug << "socket in state " << p->socket()->state();
1549             p->set_state(Proto::S_FAILED);
1550             handle_failed(p);
1551         }
1552     }
1553     else
1554     {
1555         // log_info << "proto entry " << id << " not found";
1556     }
1557 }
1558 
find_by_remote_uuid(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::UUID & uuid)1559 static gcomm::gmcast::Proto* find_by_remote_uuid(
1560     const gcomm::gmcast::ProtoMap& proto_map,
1561     const gcomm::UUID& uuid)
1562 {
1563     for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
1564          i != proto_map.end(); ++i)
1565     {
1566         if (i->second->remote_uuid() == uuid)
1567         {
1568             return i->second;
1569         }
1570     }
1571     return 0;
1572 }
1573 
handle_down(Datagram & dg,const ProtoDownMeta & dm)1574 int gcomm::GMCast::handle_down(Datagram& dg, const ProtoDownMeta& dm)
1575 {
1576     Message msg(version_, Message::GMCAST_T_USER_BASE, uuid(), 1, segment_);
1577 
1578     // If target is set and proto entry for target is found,
1579     // send a direct message. Otherwise fall back for broadcast
1580     // to ensure message delivery via relay
1581     if (dm.target() != UUID::nil())
1582     {
1583         Proto* target_proto(find_by_remote_uuid(*proto_map_, dm.target()));
1584         if (target_proto && target_proto->state() == Proto::S_OK)
1585         {
1586             gu_trace(push_header(msg, dg));
1587             int err;
1588             if ((err = target_proto->socket()->send(msg.segment_id(), dg)) != 0)
1589             {
1590                 log_debug << "failed to send to "
1591                           << target_proto->socket()->remote_addr()
1592                           << ": (" << err << ") " << strerror(err);
1593             }
1594             else
1595             {
1596                 target_proto->set_send_tstamp(gu::datetime::Date::monotonic());
1597             }
1598             gu_trace(pop_header(msg, dg));
1599             if (err == 0)
1600             {
1601                 return 0;
1602             }
1603             // In case of error fall back to broadcast
1604         }
1605         else
1606         {
1607             log_debug << "Target " << dm.target() << " proto not found";
1608         }
1609     }
1610 
1611     // handle relay set first, skip these peers below
1612     if (relay_set_.empty() == false)
1613     {
1614         msg.set_flags(msg.flags() | Message::F_RELAY);
1615         gu_trace(push_header(msg, dg));
1616         for (RelaySet::iterator ri(relay_set_.begin());
1617              ri != relay_set_.end(); ++ri)
1618         {
1619             send(*ri, msg.segment_id(), dg);
1620         }
1621         gu_trace(pop_header(msg, dg));
1622         msg.set_flags(msg.flags() & ~Message::F_RELAY);
1623     }
1624 
1625     for (SegmentMap::iterator si(segment_map_.begin());
1626          si != segment_map_.end(); ++si)
1627     {
1628         uint8_t segment_id(si->first);
1629         Segment& segment(si->second);
1630 
1631         if (segment_id != segment_)
1632         {
1633             size_t target_idx((self_index_ + segment_id) % segment.size());
1634             msg.set_flags(msg.flags() | Message::F_SEGMENT_RELAY);
1635             // skip peers that are in relay set
1636             if (relay_set_.empty() == true ||
1637                 relay_set_.find(segment[target_idx]) == relay_set_.end())
1638             {
1639                 gu_trace(push_header(msg, dg));
1640                 send(segment[target_idx], msg.segment_id(), dg);
1641                 gu_trace(pop_header(msg, dg));
1642             }
1643         }
1644         else
1645         {
1646             msg.set_flags(msg.flags() & ~Message::F_SEGMENT_RELAY);
1647             gu_trace(push_header(msg, dg));
1648             for (Segment::iterator i(segment.begin());
1649                  i != segment.end(); ++i)
1650             {
1651                 // skip peers that are in relay set
1652                 if (relay_set_.empty() == true ||
1653                     relay_set_.find(*i) == relay_set_.end())
1654                 {
1655                     send(*i, msg.segment_id(), dg);
1656                 }
1657             }
1658             gu_trace(pop_header(msg, dg));
1659         }
1660     }
1661 
1662     return 0;
1663 }
1664 
handle_stable_view(const View & view)1665 void gcomm::GMCast::handle_stable_view(const View& view)
1666 {
1667     log_debug << "GMCast::handle_stable_view: " << view;
1668     if (view.type() == V_PRIM)
1669     {
1670         // discard addr list entries not in view
1671         std::set<UUID> gmcast_lst;
1672         for (AddrList::const_iterator i(remote_addrs_.begin());
1673              i != remote_addrs_.end(); ++i)
1674         {
1675             gmcast_lst.insert(i->second.uuid());
1676         }
1677         std::set<UUID> view_lst;
1678         for (NodeList::const_iterator i(view.members().begin());
1679              i != view.members().end(); ++i)
1680         {
1681             view_lst.insert(i->first);
1682         }
1683         std::list<UUID> diff;
1684         std::set_difference(gmcast_lst.begin(),
1685                             gmcast_lst.end(),
1686                             view_lst.begin(),
1687                             view_lst.end(),
1688                             std::back_inserter(diff));
1689 
1690         // Forget partitioned entries, allow them to reconnect
1691         // in time_wait_/2. Left nodes are given time_wait_ ban for
1692         // reconnecting when handling V_REG below.
1693         for (std::list<UUID>::const_iterator i(diff.begin());
1694              i != diff.end(); ++i)
1695         {
1696             gmcast_forget(*i, time_wait_/2);
1697         }
1698 
1699         // mark nodes in view as stable
1700         for (std::set<UUID>::const_iterator i(view_lst.begin());
1701              i != view_lst.end(); ++i)
1702         {
1703             AddrList::iterator ai;
1704             if ((ai = find_if(remote_addrs_.begin(), remote_addrs_.end(),
1705                               AddrListUUIDCmp(*i))) != remote_addrs_.end())
1706             {
1707                 ai->second.set_retry_cnt(-1);
1708                 ai->second.set_max_retries(max_retry_cnt_);
1709             }
1710         }
1711 
1712         // iterate over pending address list and discard entries without UUID
1713         for (AddrList::iterator i(pending_addrs_.begin());
1714              i != pending_addrs_.end(); )
1715         {
1716             AddrList::iterator i_next(i);
1717             ++i_next;
1718             const AddrEntry& ae(AddrList::value(i));
1719             if (ae.uuid() == UUID())
1720             {
1721                 const std::string addr(AddrList::key(i));
1722                 log_info << "discarding pending addr without UUID: "
1723                          << addr;
1724                 for (ProtoMap::iterator pi(proto_map_->begin());
1725                      pi != proto_map_->end();)
1726                 {
1727                     ProtoMap::iterator pi_next(pi);
1728                     ++pi_next;
1729                     Proto* p(ProtoMap::value(pi));
1730                     if (p->remote_addr() == addr)
1731                     {
1732                         log_info << "discarding pending addr proto entry " << p;
1733                         erase_proto(pi);
1734                     }
1735                     pi = pi_next;
1736                 }
1737                 pending_addrs_.erase(i);
1738             }
1739             i = i_next;
1740         }
1741         prim_view_reached_ = true;
1742     }
1743     else if (view.type() == V_REG)
1744     {
1745         for (NodeList::const_iterator i(view.members().begin());
1746              i != view.members().end(); ++i)
1747         {
1748             AddrList::iterator ai;
1749             if ((ai = find_if(remote_addrs_.begin(), remote_addrs_.end(),
1750                               AddrListUUIDCmp(NodeList::key(i))))
1751                 != remote_addrs_.end())
1752             {
1753                 log_info << "declaring " << NodeList::key(i)
1754                          << " at " << handle_get_address(NodeList::key(i))
1755                          << " stable";
1756                 ai->second.set_retry_cnt(-1);
1757                 ai->second.set_max_retries(max_retry_cnt_);
1758             }
1759         }
1760 
1761         // Forget left nodes
1762         for (NodeList::const_iterator i(view.left().begin());
1763              i != view.left().end(); ++i)
1764         {
1765             gmcast_forget(NodeList::key(i), time_wait_);
1766         }
1767     }
1768     check_liveness();
1769 
1770     for (ProtoMap::const_iterator i(proto_map_->begin()); i != proto_map_->end();
1771          ++i)
1772     {
1773         log_debug << "proto: " << *ProtoMap::value(i);
1774     }
1775 }
1776 
1777 
handle_evict(const UUID & uuid)1778 void gcomm::GMCast::handle_evict(const UUID& uuid)
1779 {
1780     if (is_evicted(uuid) == true)
1781     {
1782         return;
1783     }
1784     gmcast_forget(uuid, time_wait_);
1785 }
1786 
1787 
handle_get_address(const UUID & uuid) const1788 std::string gcomm::GMCast::handle_get_address(const UUID& uuid) const
1789 {
1790     AddrList::const_iterator ali(
1791         find_if(remote_addrs_.begin(),
1792                 remote_addrs_.end(),
1793                 AddrListUUIDCmp(uuid)));
1794     return (ali == remote_addrs_.end() ? "" : AddrList::key(ali));
1795 }
1796 
add_or_del_addr(const std::string & val)1797 void gcomm::GMCast::add_or_del_addr(const std::string& val)
1798 {
1799     if (val.compare(0, 4, "add:") == 0)
1800     {
1801         gu::URI uri(val.substr(4));
1802         std::string addr(gu::net::resolve(uri_string(get_scheme(use_ssl_),
1803                                                      uri.get_host(),
1804                                                      uri.get_port())).to_string());
1805         log_info << "inserting address '" << addr << "'";
1806         insert_address(addr, UUID(), remote_addrs_);
1807         AddrList::iterator ai(remote_addrs_.find(addr));
1808         AddrList::value(ai).set_max_retries(
1809             max_initial_reconnect_attempts_);
1810         AddrList::value(ai).set_retry_cnt(-1);
1811     }
1812     else if (val.compare(0, 4, "del:") == 0)
1813     {
1814         std::string addr(val.substr(4));
1815         AddrList::iterator ai(remote_addrs_.find(addr));
1816         if (ai != remote_addrs_.end())
1817         {
1818             ProtoMap::iterator pi, pi_next;
1819             for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
1820             {
1821                 pi_next = pi, ++pi_next;
1822                 Proto* rp = ProtoMap::value(pi);
1823                 if (rp->remote_addr() == AddrList::key(ai))
1824                 {
1825                     log_info << "deleting entry " << AddrList::key(ai);
1826                     erase_proto(pi);
1827                 }
1828             }
1829             AddrEntry& ae(AddrList::value(ai));
1830             ae.set_max_retries(0);
1831             ae.set_retry_cnt(1);
1832             ae.set_next_reconnect(gu::datetime::Date::monotonic() + time_wait_);
1833             update_addresses();
1834         }
1835         else
1836         {
1837             log_info << "address '" << addr
1838                      << "' not found from remote addrs list";
1839         }
1840     }
1841     else
1842     {
1843         gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1844     }
1845 }
1846 
1847 
set_param(const std::string & key,const std::string & val,Protolay::sync_param_cb_t & sync_param_cb)1848 bool gcomm::GMCast::set_param(const std::string& key, const std::string& val,
1849                               Protolay::sync_param_cb_t& sync_param_cb)
1850 {
1851     try
1852     {
1853         if (key == Conf::GMCastMaxInitialReconnectAttempts)
1854         {
1855             max_initial_reconnect_attempts_ = gu::from_string<int>(val);
1856             return true;
1857         }
1858         else if (key == Conf::GMCastPeerAddr)
1859         {
1860             try
1861             {
1862                 add_or_del_addr(val);
1863             }
1864             catch (gu::NotFound& nf)
1865             {
1866                 gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1867             }
1868             catch (gu::NotSet& ns)
1869             {
1870                 gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1871             }
1872             return true;
1873         }
1874         else if (key == Conf::GMCastIsolate)
1875         {
1876             int tmpval = gu::from_string<int>(val);
1877             if (tmpval < 0 || tmpval > 2)
1878             {
1879                 gu_throw_error(EINVAL)
1880                     << "invalid value for gmacst.isolate: '"
1881                     << tmpval << "'";
1882             }
1883             isolate_ = tmpval;
1884             log_info << "turning isolation "
1885                      << (isolate_ == 1 ? "on" :
1886                          (isolate_ == 2 ? "force quit" : "off"));
1887             if (isolate_)
1888             {
1889                 // delete all entries in proto map
1890                 ProtoMap::iterator pi, pi_next;
1891                 for (pi = proto_map_->begin(); pi != proto_map_->end();
1892                      pi = pi_next)
1893                 {
1894                     pi_next = pi, ++pi_next;
1895                     erase_proto(pi);
1896                 }
1897                 segment_map_.clear();
1898             }
1899             return true;
1900         }
1901         else if (key == Conf::SocketRecvBufSize)
1902         {
1903             gu_trace(Conf::check_recv_buf_size(val));
1904             conf_.set(key, val);
1905 
1906             for (ProtoMap::iterator pi(proto_map_->begin());
1907                  pi != proto_map_->end(); ++pi)
1908             {
1909                 gu_trace(pi->second->socket()->set_option(key, val));
1910                 // erase_proto(pi++);
1911             }
1912             // segment_map_.clear();
1913             // reconnect();
1914             return true;
1915         }
1916         else if (key == Conf::GMCastGroup       ||
1917                  key == Conf::GMCastListenAddr  ||
1918                  key == Conf::GMCastMCastAddr   ||
1919                  key == Conf::GMCastMCastPort   ||
1920                  key == Conf::GMCastMCastTTL    ||
1921                  key == Conf::GMCastTimeWait    ||
1922                  key == Conf::GMCastPeerTimeout ||
1923                  key == Conf::GMCastSegment)
1924         {
1925             gu_throw_error(EPERM) << "can't change value during runtime";
1926         }
1927     }
1928     catch (gu::Exception& e)
1929     {
1930         GU_TRACE(e); throw;
1931     }
1932     catch (std::exception& e)
1933     {
1934         gu_throw_error(EINVAL) << e.what();
1935     }
1936     catch (...)
1937     {
1938         gu_throw_error(EINVAL) << "exception";
1939     }
1940 
1941     return false;
1942 }
1943