1 /*
2 * Copyright (C) 2009-2019 Codership Oy <info@codership.com>
3 */
4
5 #include "gmcast.hpp"
6 #include "gmcast_proto.hpp"
7
8 #include "gcomm/common.hpp"
9 #include "gcomm/conf.hpp"
10 #include "gcomm/util.hpp"
11 #include "gcomm/map.hpp"
12 #include "defaults.hpp"
13 #include "gu_convert.hpp"
14 #include "gu_resolver.hpp"
15 #include "gu_asio.hpp" // gu::conf::use_ssl
16
17 using namespace std::rel_ops;
18
19 using gcomm::gmcast::Proto;
20 using gcomm::gmcast::ProtoMap;
21 using gcomm::gmcast::Link;
22 using gcomm::gmcast::LinkMap;
23 using gcomm::gmcast::Message;
24
25 const long gcomm::GMCast::max_retry_cnt_(std::numeric_limits<int>::max());
26
set_tcp_defaults(gu::URI * uri)27 static void set_tcp_defaults (gu::URI* uri)
28 {
29 // what happens if there is already this parameter?
30 uri->set_option(gcomm::Conf::TcpNonBlocking, gu::to_string(1));
31 }
32
33
check_tcp_uri(const gu::URI & uri)34 static bool check_tcp_uri(const gu::URI& uri)
35 {
36 return (uri.get_scheme() == gu::scheme::tcp ||
37 uri.get_scheme() == gu::scheme::ssl);
38 }
39
get_scheme(bool use_ssl)40 static std::string get_scheme(bool use_ssl)
41 {
42 if (use_ssl == true)
43 {
44 return gu::scheme::ssl;
45 }
46 return gu::scheme::tcp;
47 }
48
49 //
50 // Check if the node should stay isolated.
51 // Possible outcomes:
52 // * Return false, node should continue reconnecting and accepting connections
53 // (isolate = 0)
54 // * Return true, node should remain isolated (isolate = 1)
55 // * Throw fatal exception to terminate the backend (isolate = 2)
56 //
is_isolated(int isolate)57 static inline bool is_isolated(int isolate)
58 {
59 switch (isolate)
60 {
61 case 1:
62 return true;
63 case 2:
64 gu_throw_fatal<< "Gcomm backend termination was "
65 << "requested by setting gmcast.isolate=2.";
66 break;
67 default:
68 break;
69 }
70 return false;
71 }
72
GMCast(Protonet & net,const gu::URI & uri,const UUID * my_uuid)73 gcomm::GMCast::GMCast(Protonet& net, const gu::URI& uri,
74 const UUID* my_uuid)
75 :
76 Transport (net, uri),
77 version_(check_range(Conf::GMCastVersion,
78 param<int>(conf_, uri, Conf::GMCastVersion, "0"),
79 0, max_version_ + 1)),
80 segment_ (check_range(Conf::GMCastSegment,
81 param<int>(conf_, uri, Conf::GMCastSegment, "0"),
82 0, 255)),
83 my_uuid_ (my_uuid ? *my_uuid : UUID(0, 0)),
84 use_ssl_ (param<bool>(conf_, uri, gu::conf::use_ssl, "false")),
85 // @todo: technically group name should be in path component
86 group_name_ (param<std::string>(conf_, uri, Conf::GMCastGroup, "")),
87 listen_addr_ (
88 param<std::string>(
89 conf_, uri, Conf::GMCastListenAddr,
90 get_scheme(use_ssl_) + "://0.0.0.0")), // how to make it IPv6 safe?
91 initial_addrs_(),
92 mcast_addr_ (param<std::string>(conf_, uri, Conf::GMCastMCastAddr, "")),
93 bind_ip_ (""),
94 mcast_ttl_ (check_range(
95 Conf::GMCastMCastTTL,
96 param<int>(conf_, uri, Conf::GMCastMCastTTL, "1"),
97 1, 256)),
98 listener_ (0),
99 mcast_ (),
100 pending_addrs_(),
101 remote_addrs_ (),
102 addr_blacklist_(),
103 relaying_ (false),
104 isolate_ (0),
105 prim_view_reached_(false),
106 proto_map_ (new ProtoMap()),
107 relay_set_ (),
108 segment_map_ (),
109 self_index_ (std::numeric_limits<size_t>::max()),
110 time_wait_ (param<gu::datetime::Period>(
111 conf_, uri,
112 Conf::GMCastTimeWait, Defaults::GMCastTimeWait)),
113 check_period_ ("PT0.5S"),
114 peer_timeout_ (param<gu::datetime::Period>(
115 conf_, uri,
116 Conf::GMCastPeerTimeout, Defaults::GMCastPeerTimeout)),
117 max_initial_reconnect_attempts_(
118 param<int>(conf_, uri,
119 Conf::GMCastMaxInitialReconnectAttempts,
120 gu::to_string(max_retry_cnt_))),
121 next_check_ (gu::datetime::Date::monotonic())
122 {
123 log_info << "GMCast version " << version_;
124
125 if (group_name_ == "")
126 {
127 gu_throw_error (EINVAL) << "Group not defined in URL: "
128 << uri_.to_string();
129 }
130
131 set_initial_addr(uri_);
132
133 try
134 {
135 listen_addr_ = uri_.get_option (Conf::GMCastListenAddr);
136 }
137 catch (gu::NotFound&) {}
138
139 try
140 {
141 gu::URI uri(listen_addr_); /* check validity of the address */
142 }
143 catch (gu::Exception&)
144 {
145 /* most probably no scheme, try to append one and see if it succeeds */
146 listen_addr_ = uri_string(get_scheme(use_ssl_), listen_addr_);
147 gu_trace(gu::URI uri(listen_addr_));
148 }
149
150 gu::URI listen_uri(listen_addr_);
151
152 if (check_tcp_uri(listen_uri) == false)
153 {
154 gu_throw_error (EINVAL) << "listen addr '" << listen_addr_
155 << "' does not specify supported protocol";
156 }
157
158 if (gu::net::resolve(listen_uri).get_addr().is_anyaddr() == false)
159 {
160 // bind outgoing connections to the same address as listening.
161 gu_trace(bind_ip_ = listen_uri.get_host());
162 }
163
164 std::string port(Defaults::GMCastTcpPort);
165
166 try
167 {
168 port = listen_uri.get_port();
169 }
170 catch (gu::NotSet&)
171 {
172 // if no listen port is set for listen address in the options,
173 // see if base port was configured
174 try
175 {
176 port = conf_.get(BASE_PORT_KEY);
177 }
178 catch (gu::NotSet&)
179 {
180 // if no base port configured, try port from the connection address
181 try { port = uri_.get_port(); } catch (gu::NotSet&) {}
182 }
183
184 listen_addr_ += ":" + port;
185 }
186
187 conf_.set(BASE_PORT_KEY, port);
188
189 listen_addr_ = gu::net::resolve(listen_addr_).to_string();
190 // resolving sets scheme to tcp, have to rewrite for ssl
191 if (use_ssl_ == true)
192 {
193 listen_addr_.replace(0, 3, gu::scheme::ssl);
194 }
195
196 std::set<std::string>::iterator iaself(initial_addrs_.find(listen_addr_));
197 if (iaself != initial_addrs_.end())
198 {
199 log_debug << "removing own listen address '" << *iaself
200 << "' from initial address list";
201 initial_addrs_.erase(iaself);
202 }
203
204 if (mcast_addr_ != "")
205 {
206 try
207 {
208 port = param<std::string>(conf_, uri_, Conf::GMCastMCastPort, port);
209 }
210 catch (gu::NotFound&) {}
211
212 mcast_addr_ = gu::net::resolve(
213 uri_string(gu::scheme::udp, mcast_addr_, port)).to_string();
214 }
215
216 log_info << self_string() << " listening at " << listen_addr_;
217 log_info << self_string() << " multicast: " << mcast_addr_
218 << ", ttl: " << mcast_ttl_;
219
220 conf_.set(Conf::GMCastListenAddr, listen_addr_);
221 conf_.set(Conf::GMCastMCastAddr, mcast_addr_);
222 conf_.set(Conf::GMCastVersion, gu::to_string(version_));
223 conf_.set(Conf::GMCastTimeWait, gu::to_string(time_wait_));
224 conf_.set(Conf::GMCastMCastTTL, gu::to_string(mcast_ttl_));
225 conf_.set(Conf::GMCastPeerTimeout, gu::to_string(peer_timeout_));
226 conf_.set(Conf::GMCastSegment, gu::to_string<int>(segment_));
227 }
228
~GMCast()229 gcomm::GMCast::~GMCast()
230 {
231 if (listener_ != 0) close();
232
233 delete proto_map_;
234 }
235
set_initial_addr(const gu::URI & uri)236 void gcomm::GMCast::set_initial_addr(const gu::URI& uri)
237 {
238
239 const gu::URI::AuthorityList& al(uri.get_authority_list());
240
241 for (gu::URI::AuthorityList::const_iterator i(al.begin());
242 i != al.end(); ++i)
243 {
244 std::string host;
245 try
246 {
247 host = i->host();
248 }
249 catch (gu::NotSet& ns)
250 {
251 gu_throw_error(EINVAL) << "Unset host in URL " << uri;
252 }
253
254 if (host_is_any(host)) continue;
255
256 std::string port;
257 try
258 {
259 port = i->port();
260 }
261 catch (gu::NotSet&)
262 {
263 try
264 {
265 port = conf_.get(BASE_PORT_KEY);
266 }
267 catch (gu::NotFound&)
268 {
269 port = Defaults::GMCastTcpPort;
270 }
271 catch (gu::NotSet&)
272 {
273 port = Defaults::GMCastTcpPort;
274 }
275 }
276
277 std::string initial_uri = uri_string(get_scheme(use_ssl_), host, port);
278 std::string initial_addr;
279 try
280 {
281 initial_addr = gu::net::resolve(initial_uri).to_string();
282 }
283 catch (gu::Exception& )
284 {
285 log_warn << "Failed to resolve " << initial_uri;
286 continue;
287 }
288
289 // resolving sets scheme to tcp, have to rewrite for ssl
290 if (use_ssl_ == true)
291 {
292 initial_addr.replace(0, 3, gu::scheme::ssl);
293 }
294
295 if (check_tcp_uri(initial_addr) == false)
296 {
297 gu_throw_error (EINVAL) << "initial addr '" << initial_addr
298 << "' is not valid";
299 }
300
301 log_debug << self_string() << " initial addr: " << initial_addr;
302 initial_addrs_.insert(initial_addr);
303
304 }
305
306 }
307
connect_precheck(bool start_prim)308 void gcomm::GMCast::connect_precheck(bool start_prim)
309 {
310 if (!start_prim && initial_addrs_.empty()) {
311 gu_throw_fatal << "No address to connect";
312 }
313 }
314
connect()315 void gcomm::GMCast::connect()
316 {
317 pstack_.push_proto(this);
318 log_debug << "gmcast " << uuid() << " connect";
319
320 gu::URI listen_uri(listen_addr_);
321
322 set_tcp_defaults (&listen_uri);
323
324 listener_ = pnet().acceptor(listen_uri);
325 gu_trace (listener_->listen(listen_uri));
326
327 if (!mcast_addr_.empty())
328 {
329 gu::URI mcast_uri(
330 mcast_addr_ + '?'
331 + gcomm::Socket::OptIfAddr + '='
332 + gu::URI(listen_addr_).get_host()+'&'
333 + gcomm::Socket::OptNonBlocking + "=1&"
334 + gcomm::Socket::OptMcastTTL + '=' + gu::to_string(mcast_ttl_)
335 );
336
337 mcast_ = pnet().socket(mcast_uri);
338 gu_trace(mcast_->connect(mcast_uri));
339 }
340
341 if (!initial_addrs_.empty())
342 {
343 for (std::set<std::string>::const_iterator i(initial_addrs_.begin());
344 i != initial_addrs_.end(); ++i)
345 {
346 insert_address(*i, UUID(), pending_addrs_);
347 AddrList::iterator ai(pending_addrs_.find(*i));
348 AddrList::value(ai).set_max_retries(max_retry_cnt_);
349 gu_trace (gmcast_connect(*i));
350 }
351 }
352 }
353
354
connect(const gu::URI & uri)355 void gcomm::GMCast::connect(const gu::URI& uri)
356 {
357 set_initial_addr(uri);
358 connect();
359 }
360
361
362
close(bool force)363 void gcomm::GMCast::close(bool force)
364 {
365 log_debug << "gmcast " << uuid() << " close";
366 pstack_.pop_proto(this);
367 if (mcast_)
368 {
369 mcast_->close();
370 // delete mcast;
371 // mcast = 0;
372 }
373
374 gcomm_assert(listener_ != 0);
375 listener_->close();
376 delete listener_;
377 listener_ = 0;
378
379 segment_map_.clear();
380 for (ProtoMap::iterator
381 i = proto_map_->begin(); i != proto_map_->end(); ++i)
382 {
383 delete ProtoMap::value(i);
384 }
385
386 proto_map_->clear();
387 pending_addrs_.clear();
388 remote_addrs_.clear();
389 prim_view_reached_ = false;
390 }
391
392 //
393 // Private
394 //
395
396
397 // Find other local endpoint matching to proto
398 static const Proto*
find_other_local_endpoint(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::gmcast::Proto * proto)399 find_other_local_endpoint(const gcomm::gmcast::ProtoMap& proto_map,
400 const gcomm::gmcast::Proto* proto)
401 {
402 for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
403 i != proto_map.end(); ++i)
404 {
405 if (i->second != proto &&
406 i->second->handshake_uuid() == proto->handshake_uuid())
407 {
408 return i->second;
409 }
410 }
411 return 0;
412 }
413
414 // Find other endpoint with same remote UUID
415 static const Proto*
find_other_endpoint_same_remote_uuid(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::gmcast::Proto * proto)416 find_other_endpoint_same_remote_uuid(const gcomm::gmcast::ProtoMap& proto_map,
417 const gcomm::gmcast::Proto* proto)
418 {
419 for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
420 i != proto_map.end(); ++i)
421 {
422 if (i->second != proto &&
423 i->second->remote_uuid() == proto->remote_uuid())
424 {
425 return i->second;
426 }
427 }
428 return 0;
429 }
430
is_own(const gmcast::Proto * proto) const431 bool gcomm::GMCast::is_own(const gmcast::Proto* proto) const
432 {
433 assert(proto->remote_uuid() != gcomm::UUID::nil());
434 if (proto->remote_uuid() != uuid())
435 {
436 return false;
437 }
438 return find_other_local_endpoint(*proto_map_, proto);
439 }
440
blacklist(const gmcast::Proto * proto)441 void gcomm::GMCast::blacklist(const gmcast::Proto* proto)
442 {
443 initial_addrs_.erase(proto->remote_addr());
444 pending_addrs_.erase(proto->remote_addr());
445 addr_blacklist_.insert(std::make_pair(
446 proto->remote_addr(),
447 AddrEntry(gu::datetime::Date::monotonic(),
448 gu::datetime::Date::monotonic(),
449 proto->remote_uuid())));
450 }
451
is_not_own_and_duplicate_exists(const Proto * proto) const452 bool gcomm::GMCast::is_not_own_and_duplicate_exists(
453 const Proto* proto) const
454 {
455 assert(proto->remote_uuid() != gcomm::UUID::nil());
456 const Proto* other(find_other_local_endpoint(*proto_map_, proto));
457 if (!other)
458 {
459 // Not own
460 // Check if remote UUID matches to self
461 if (proto->remote_uuid() == uuid())
462 {
463 return true;
464 }
465 // Check if other proto entry with same remote
466 // UUID but different remote address exists.
467 other = find_other_endpoint_same_remote_uuid(*proto_map_, proto);
468 if (other && other->remote_addr() != proto->remote_addr())
469 {
470 return true;
471 }
472 }
473 return false;
474 }
475
476 // Erase proto entry in safe manner
477 // 1) Erase from relay_set_
478 // 2) Erase from proto_map_
479 // 3) Delete proto entry
erase_proto(gmcast::ProtoMap::iterator i)480 void gcomm::GMCast::erase_proto(gmcast::ProtoMap::iterator i)
481 {
482 Proto* p(ProtoMap::value(i));
483 RelayEntry e(p, p->socket().get());
484 RelaySet::iterator si(relay_set_.find(e));
485 if (si != relay_set_.end())
486 {
487 relay_set_.erase(si);
488 }
489 proto_map_->erase(i);
490 delete p;
491 }
492
gmcast_accept()493 void gcomm::GMCast::gmcast_accept()
494 {
495 SocketPtr tp;
496
497 try
498 {
499 tp = listener_->accept();
500 }
501 catch (gu::Exception& e)
502 {
503 log_warn << e.what();
504 return;
505 }
506
507 if (is_isolated(isolate_))
508 {
509 log_debug << "dropping accepted socket due to isolation";
510 tp->close();
511 return;
512 }
513
514 Proto* peer = new Proto (
515 *this,
516 version_,
517 tp,
518 listener_->listen_addr(),
519 "",
520 mcast_addr_,
521 segment_,
522 group_name_);
523 std::pair<ProtoMap::iterator, bool> ret =
524 proto_map_->insert(std::make_pair(tp->id(), peer));
525
526 if (ret.second == false)
527 {
528 delete peer;
529 gu_throw_fatal << "Failed to add peer to map";
530 }
531 if (tp->state() == Socket::S_CONNECTED)
532 {
533 peer->send_handshake();
534 }
535 else
536 {
537 log_debug << "accepted socket is connecting";
538 }
539 log_debug << "handshake sent";
540 }
541
542
gmcast_connect(const std::string & remote_addr)543 void gcomm::GMCast::gmcast_connect(const std::string& remote_addr)
544 {
545 if (remote_addr == listen_addr_) return;
546
547 gu::URI connect_uri(remote_addr);
548
549 set_tcp_defaults (&connect_uri);
550
551 if (!bind_ip_.empty())
552 {
553 connect_uri.set_option(gcomm::Socket::OptIfAddr, bind_ip_);
554 }
555
556 SocketPtr tp = pnet().socket(connect_uri);
557
558 try
559 {
560 tp->connect(connect_uri);
561 }
562 catch (gu::Exception& e)
563 {
564 log_debug << "Connect failed: " << e.what();
565 // delete tp;
566 return;
567 }
568
569 Proto* peer = new Proto (
570 *this,
571 version_,
572 tp,
573 listener_->listen_addr(),
574 remote_addr,
575 mcast_addr_,
576 segment_,
577 group_name_);
578
579 std::pair<ProtoMap::iterator, bool> ret =
580 proto_map_->insert(std::make_pair(tp->id(), peer));
581
582 if (ret.second == false)
583 {
584 delete peer;
585 gu_throw_fatal << "Failed to add peer to map";
586 }
587
588 ret.first->second->wait_handshake();
589 }
590
591
gmcast_forget(const UUID & uuid,const gu::datetime::Period & wait_period)592 void gcomm::GMCast::gmcast_forget(const UUID& uuid,
593 const gu::datetime::Period& wait_period)
594 {
595 /* Close all proto entries corresponding to uuid */
596
597 ProtoMap::iterator pi, pi_next;
598 for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
599 {
600 pi_next = pi, ++pi_next;
601 Proto* rp = ProtoMap::value(pi);
602 if (rp->remote_uuid() == uuid)
603 {
604 erase_proto(pi);
605 }
606 }
607
608 /* Set all corresponding entries in address list to have retry cnt
609 * greater than max retries and next reconnect time after some period */
610 AddrList::iterator ai;
611 for (ai = remote_addrs_.begin(); ai != remote_addrs_.end(); ++ai)
612 {
613 AddrEntry& ae(AddrList::value(ai));
614 if (ae.uuid() == uuid)
615 {
616 log_info << "forgetting " << uuid
617 << " (" << AddrList::key(ai) << ")";
618
619 ProtoMap::iterator pi, pi_next;
620 for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
621 {
622 pi_next = pi, ++pi_next;
623 if (ProtoMap::value(pi)->remote_addr() == AddrList::key(ai))
624 {
625 log_info << "deleting entry " << AddrList::key(ai);
626 erase_proto(pi);
627 }
628 }
629 ae.set_max_retries(0);
630 ae.set_retry_cnt(1);
631 gu::datetime::Date now(gu::datetime::Date::monotonic());
632 // Don't reduce next reconnect time if it is set greater than
633 // requested
634 if ((now + wait_period > ae.next_reconnect()) ||
635 (ae.next_reconnect() == gu::datetime::Date::max()))
636 {
637 ae.set_next_reconnect(gu::datetime::Date::monotonic() + wait_period);
638 }
639 else
640 {
641 log_debug << "not decreasing next reconnect for " << uuid;
642 }
643 }
644 }
645
646 /* Update state */
647 update_addresses();
648 }
649
handle_connected(Proto * rp)650 void gcomm::GMCast::handle_connected(Proto* rp)
651 {
652 const SocketPtr tp(rp->socket());
653 assert(tp->state() == Socket::S_CONNECTED);
654 log_debug << "transport " << tp << " connected";
655 if (rp->state() == Proto::S_INIT)
656 {
657 log_debug << "sending handshake";
658 // accepted socket was waiting for underlying transport
659 // handshake to finish
660 rp->send_handshake();
661 }
662 }
663
handle_established(Proto * est)664 void gcomm::GMCast::handle_established(Proto* est)
665 {
666 log_info << self_string() << " connection established to "
667 << est->remote_uuid() << " "
668 << est->remote_addr();
669 // UUID checks are handled during protocol handshake
670 assert(est->remote_uuid() != uuid());
671
672 if (is_evicted(est->remote_uuid()))
673 {
674 log_warn << "Closing connection to evicted node " << est->remote_uuid();
675 erase_proto(proto_map_->find_checked(est->socket()->id()));
676 update_addresses();
677 return;
678 }
679
680 // If address is found from pending_addrs_, move it to remote_addrs list
681 // and set retry cnt to -1
682 const std::string& remote_addr(est->remote_addr());
683 AddrList::iterator i(pending_addrs_.find(remote_addr));
684
685 if (i != pending_addrs_.end())
686 {
687 log_debug << "Erasing " << remote_addr << " from panding list";
688 pending_addrs_.erase(i);
689 }
690
691 if ((i = remote_addrs_.find(remote_addr)) == remote_addrs_.end())
692 {
693 log_debug << "Inserting " << remote_addr << " to remote list";
694
695 insert_address (remote_addr, est->remote_uuid(), remote_addrs_);
696 i = remote_addrs_.find(remote_addr);
697 }
698 else if (AddrList::value(i).uuid() != est->remote_uuid())
699 {
700 log_info << "remote endpoint " << est->remote_addr()
701 << " changed identity " << AddrList::value(i).uuid()
702 << " -> " << est->remote_uuid();
703 remote_addrs_.erase(i);
704 i = remote_addrs_.insert_unique(
705 make_pair(est->remote_addr(),
706 AddrEntry(gu::datetime::Date::monotonic(),
707 gu::datetime::Date::monotonic(),
708 est->remote_uuid())));
709 }
710
711 if (AddrList::value(i).retry_cnt() >
712 AddrList::value(i).max_retries())
713 {
714 log_warn << "discarding established (time wait) "
715 << est->remote_uuid()
716 << " (" << est->remote_addr() << ") ";
717 erase_proto(proto_map_->find(est->socket()->id()));
718 update_addresses();
719 return;
720 }
721
722 // send_up(Datagram(), p->remote_uuid());
723
724 // init retry cnt to -1 to avoid unnecessary logging at first attempt
725 // max retries will be readjusted in handle stable view
726 AddrList::value(i).set_retry_cnt(-1);
727 AddrList::value(i).set_max_retries(max_initial_reconnect_attempts_);
728
729 // Cleanup all previously established entries with same
730 // remote uuid. It is assumed that the most recent connection
731 // is usually the healthiest one.
732 ProtoMap::iterator j, j_next;
733 for (j = proto_map_->begin(); j != proto_map_->end(); j = j_next)
734 {
735 j_next = j, ++j_next;
736
737 Proto* p(ProtoMap::value(j));
738
739 if (p->remote_uuid() == est->remote_uuid())
740 {
741 if (p->handshake_uuid() < est->handshake_uuid())
742 {
743 log_debug << self_string()
744 << " cleaning up duplicate "
745 << p->socket()
746 << " after established "
747 << est->socket();
748 erase_proto(j);
749 }
750 else if (p->handshake_uuid() > est->handshake_uuid())
751 {
752 log_debug << self_string()
753 << " cleaning up established "
754 << est->socket()
755 << " which is duplicate of "
756 << p->socket();
757 erase_proto(proto_map_->find_checked(est->socket()->id()));
758 update_addresses();
759 return;
760 }
761 else
762 {
763 assert(p == est);
764 }
765 }
766 }
767
768 AddrList::iterator ali(find_if(remote_addrs_.begin(),
769 remote_addrs_.end(),
770 AddrListUUIDCmp(est->remote_uuid())));
771 if (ali != remote_addrs_.end())
772 {
773 AddrList::value(ali).set_last_connect();
774 }
775 else
776 {
777 log_warn << "peer " << est->remote_addr()
778 << " not found from remote addresses";
779 }
780
781 update_addresses();
782 }
783
handle_failed(Proto * failed)784 void gcomm::GMCast::handle_failed(Proto* failed)
785 {
786 log_debug << "handle failed: " << *failed;
787 const std::string& remote_addr = failed->remote_addr();
788
789 bool found_ok(false);
790 for (ProtoMap::const_iterator i = proto_map_->begin();
791 i != proto_map_->end(); ++i)
792 {
793 Proto* p(ProtoMap::value(i));
794 if (p != failed &&
795 p->state() <= Proto::S_OK &&
796 p->remote_addr() == failed->remote_addr())
797 {
798 log_debug << "found live " << *p;
799 found_ok = true;
800 break;
801 }
802 }
803
804 if (found_ok == false && remote_addr != "")
805 {
806 AddrList::iterator i;
807
808 if ((i = pending_addrs_.find(remote_addr)) != pending_addrs_.end() ||
809 (i = remote_addrs_.find(remote_addr)) != remote_addrs_.end())
810 {
811 AddrEntry& ae(AddrList::value(i));
812 ae.set_retry_cnt(ae.retry_cnt() + 1);
813
814 gu::datetime::Date rtime = gu::datetime::Date::monotonic() + gu::datetime::Period("PT1S");
815 log_debug << self_string()
816 << " setting next reconnect time to "
817 << rtime << " for " << remote_addr;
818 ae.set_next_reconnect(rtime);
819 }
820 }
821
822 erase_proto(proto_map_->find_checked(failed->socket()->id()));
823 update_addresses();
824 }
825
is_connected(const std::string & addr,const UUID & uuid) const826 bool gcomm::GMCast::is_connected(const std::string& addr, const UUID& uuid) const
827 {
828 for (ProtoMap::const_iterator i = proto_map_->begin();
829 i != proto_map_->end(); ++i)
830 {
831 Proto* conn = ProtoMap::value(i);
832
833 if (addr == conn->remote_addr() ||
834 uuid == conn->remote_uuid())
835 {
836 return true;
837 }
838 }
839
840 return false;
841 }
842
843
insert_address(const std::string & addr,const UUID & uuid,AddrList & alist)844 void gcomm::GMCast::insert_address (const std::string& addr,
845 const UUID& uuid,
846 AddrList& alist)
847 {
848 if (addr == listen_addr_)
849 {
850 gu_throw_fatal << "Trying to add self addr " << addr << " to addr list";
851 }
852
853 if (alist.insert(make_pair(addr,
854 AddrEntry(gu::datetime::Date::monotonic(),
855 gu::datetime::Date::monotonic(), uuid))).second == false)
856 {
857 log_warn << "Duplicate entry: " << addr;
858 }
859 else
860 {
861 log_debug << self_string() << ": new address entry " << uuid << ' '
862 << addr;
863 }
864 }
865
866
update_addresses()867 void gcomm::GMCast::update_addresses()
868 {
869 LinkMap link_map;
870 std::set<UUID> uuids;
871 /* Add all established connections into uuid_map and update
872 * list of remote addresses */
873
874 ProtoMap::iterator i, i_next;
875 for (i = proto_map_->begin(); i != proto_map_->end(); i = i_next)
876 {
877 i_next = i, ++i_next;
878
879 Proto* rp = ProtoMap::value(i);
880
881 if (rp->state() == Proto::S_OK)
882 {
883 if (rp->remote_addr() == "" ||
884 rp->remote_uuid() == UUID::nil())
885 {
886 gu_throw_fatal << "Protocol error: local: (" << my_uuid_
887 << ", '" << listen_addr_
888 << "'), remote: (" << rp->remote_uuid()
889 << ", '" << rp->remote_addr() << "')";
890 }
891
892 if (remote_addrs_.find(rp->remote_addr()) == remote_addrs_.end())
893 {
894 log_warn << "Connection exists but no addr on addr list for "
895 << rp->remote_addr();
896 insert_address(rp->remote_addr(), rp->remote_uuid(),
897 remote_addrs_);
898 }
899
900 if (uuids.insert(rp->remote_uuid()).second == false)
901 {
902 // Duplicate entry, drop this one
903 // @todo Deeper inspection about the connection states
904 log_debug << self_string() << " dropping duplicate entry";
905 erase_proto(i);
906 }
907 else
908 {
909 link_map.insert(Link(rp->remote_uuid(),
910 rp->remote_addr(),
911 rp->mcast_addr()));
912 }
913 }
914 }
915
916 /* Send topology change message containing only established
917 * connections */
918 for (ProtoMap::iterator i = proto_map_->begin(); i != proto_map_->end(); ++i)
919 {
920 Proto* gp = ProtoMap::value(i);
921
922 // @todo: a lot of stuff here is done for each connection, including
923 // message creation and serialization. Need a mcast_msg() call
924 // and move this loop in there.
925 if (gp->state() == Proto::S_OK)
926 gp->send_topology_change(link_map);
927 }
928
929 /* Add entries reported by all other nodes to address list to
930 * get complete view of existing uuids/addresses */
931 for (ProtoMap::iterator i = proto_map_->begin(); i != proto_map_->end(); ++i)
932 {
933 Proto* rp = ProtoMap::value(i);
934
935 if (rp->state() == Proto::S_OK)
936 {
937 for (LinkMap::const_iterator j = rp->link_map().begin();
938 j != rp->link_map().end(); ++j)
939 {
940 const UUID& link_uuid(LinkMap::key(j));
941 const std::string& link_addr(LinkMap::value(j).addr());
942 gcomm_assert(link_uuid != UUID::nil() && link_addr != "");
943
944 if (addr_blacklist_.find(link_addr) != addr_blacklist_.end())
945 {
946 log_debug << self_string()
947 << " address '" << link_addr
948 << "' pointing to uuid " << link_uuid
949 << " is blacklisted, skipping";
950 continue;
951 }
952
953 if (link_uuid != uuid() &&
954 remote_addrs_.find(link_addr) == remote_addrs_.end() &&
955 pending_addrs_.find(link_addr) == pending_addrs_.end())
956 {
957 log_debug << self_string()
958 << " conn refers to but no addr in addr list for "
959 << link_addr;
960 insert_address(link_addr, link_uuid, remote_addrs_);
961
962 AddrList::iterator pi(remote_addrs_.find(link_addr));
963
964 assert(pi != remote_addrs_.end());
965
966 AddrEntry& ae(AddrList::value(pi));
967
968 // init retry cnt to -1 to avoid unnecessary logging
969 // at first attempt
970 // max retries will be readjusted in handle stable view
971 ae.set_retry_cnt(-1);
972 ae.set_max_retries(max_initial_reconnect_attempts_);
973
974 // Add some randomness for first reconnect to avoid
975 // simultaneous connects
976 gu::datetime::Date rtime(gu::datetime::Date::monotonic());
977
978 rtime = rtime + ::rand() % (100*gu::datetime::MSec);
979 ae.set_next_reconnect(rtime);
980 next_check_ = std::min(next_check_, rtime);
981 }
982 }
983 }
984 }
985
986 // Build multicast tree
987 log_debug << self_string() << " --- mcast tree begin ---";
988 segment_map_.clear();
989
990 Segment& local_segment(segment_map_[segment_]);
991
992 if (mcast_)
993 {
994 log_debug << mcast_addr_;
995 local_segment.push_back(RelayEntry(0, mcast_.get()));
996 }
997
998 self_index_ = 0;
999 for (ProtoMap::const_iterator i(proto_map_->begin()); i != proto_map_->end();
1000 ++i)
1001 {
1002 Proto* p(ProtoMap::value(i));
1003
1004 log_debug << "Proto: " << *p;
1005
1006 if (p->remote_segment() == segment_)
1007 {
1008 if (p->state() == Proto::S_OK &&
1009 (p->mcast_addr() == "" ||
1010 p->mcast_addr() != mcast_addr_))
1011 {
1012 local_segment.push_back(RelayEntry(p, p->socket().get()));
1013 if (p->remote_uuid() < uuid())
1014 {
1015 ++self_index_;
1016 }
1017 }
1018 }
1019 else
1020 {
1021 if (p->state() == Proto::S_OK)
1022 {
1023 Segment& remote_segment(segment_map_[p->remote_segment()]);
1024 remote_segment.push_back(RelayEntry(p, p->socket().get()));
1025 }
1026 }
1027 }
1028 log_debug << self_string() << " self index: " << self_index_;
1029 log_debug << self_string() << " --- mcast tree end ---";
1030 }
1031
1032
reconnect()1033 void gcomm::GMCast::reconnect()
1034 {
1035 if (is_isolated(isolate_))
1036 {
1037 log_debug << "skipping reconnect due to isolation";
1038 return;
1039 }
1040
1041 /* Loop over known remote addresses and connect if proto entry
1042 * does not exist */
1043 gu::datetime::Date now = gu::datetime::Date::monotonic();
1044 AddrList::iterator i, i_next;
1045
1046 for (i = pending_addrs_.begin(); i != pending_addrs_.end(); i = i_next)
1047 {
1048 i_next = i, ++i_next;
1049
1050 const std::string& pending_addr(AddrList::key(i));
1051 const AddrEntry& ae(AddrList::value(i));
1052
1053 if (is_connected (pending_addr, UUID::nil()) == false &&
1054 ae.next_reconnect() <= now)
1055 {
1056 if (ae.retry_cnt() > ae.max_retries())
1057 {
1058 log_info << "cleaning up pending addr " << pending_addr;
1059 pending_addrs_.erase(i);
1060 continue; // no reference to pending_addr after this
1061 }
1062 else if (ae.next_reconnect() <= now)
1063 {
1064 log_debug << "connecting to pending " << pending_addr;
1065 gmcast_connect (pending_addr);
1066 }
1067 }
1068 }
1069
1070 for (i = remote_addrs_.begin(); i != remote_addrs_.end(); i = i_next)
1071 {
1072 i_next = i, ++i_next;
1073
1074 const std::string& remote_addr(AddrList::key(i));
1075 const AddrEntry& ae(AddrList::value(i));
1076 const UUID& remote_uuid(ae.uuid());
1077
1078 gcomm_assert(remote_uuid != uuid());
1079
1080 if (is_connected(remote_addr, remote_uuid) == false &&
1081 ae.next_reconnect() <= now)
1082 {
1083 if (ae.retry_cnt() > ae.max_retries())
1084 {
1085 log_info << " cleaning up " << remote_uuid << " ("
1086 << remote_addr << ")";
1087 remote_addrs_.erase(i);
1088 continue;//no reference to remote_addr or remote_uuid after this
1089 }
1090 else if (ae.next_reconnect() <= now)
1091 {
1092 if (ae.retry_cnt() % 30 == 0)
1093 {
1094 log_info << self_string() << " reconnecting to "
1095 << remote_uuid << " (" << remote_addr
1096 << "), attempt " << ae.retry_cnt();
1097 }
1098
1099 gmcast_connect(remote_addr);
1100 }
1101 else
1102 {
1103 //
1104 }
1105 }
1106 }
1107 }
1108
1109 namespace
1110 {
1111 class CmpUuidCounts
1112 {
1113 public:
CmpUuidCounts(const std::set<gcomm::UUID> & uuids,gcomm::SegmentId preferred_segment)1114 CmpUuidCounts(const std::set<gcomm::UUID>& uuids,
1115 gcomm::SegmentId preferred_segment)
1116 :
1117 uuids_(uuids),
1118 preferred_segment_(preferred_segment)
1119 { }
1120
count(const gcomm::gmcast::Proto * p) const1121 size_t count(const gcomm::gmcast::Proto* p) const
1122 {
1123 size_t cnt(0);
1124 for (std::set<gcomm::UUID>::const_iterator i(uuids_.begin());
1125 i != uuids_.end(); ++i)
1126 {
1127 for (gcomm::gmcast::LinkMap::const_iterator
1128 lm_i(p->link_map().begin());
1129 lm_i != p->link_map().end(); ++lm_i)
1130 {
1131 if (lm_i->uuid() == *i)
1132 {
1133 ++cnt;
1134 break;
1135 }
1136 }
1137 }
1138 return cnt;
1139 }
operator ()(const gcomm::gmcast::Proto * a,const gcomm::gmcast::Proto * b) const1140 bool operator()(const gcomm::gmcast::Proto* a,
1141 const gcomm::gmcast::Proto* b) const
1142 {
1143 size_t ac(count(a));
1144 size_t bc(count(b));
1145 // if counts are equal, prefer peer from the same segment
1146 return (ac < bc ||
1147 (ac == bc && a->remote_segment() != preferred_segment_));
1148 }
1149
1150 private:
1151 const std::set<gcomm::UUID>& uuids_;
1152 gcomm::SegmentId preferred_segment_;
1153 };
1154 }
1155
1156
check_liveness()1157 void gcomm::GMCast::check_liveness()
1158 {
1159 std::set<UUID> live_uuids;
1160
1161 // iterate over proto map and mark all timed out entries as failed
1162 gu::datetime::Date now(gu::datetime::Date::monotonic());
1163 for (ProtoMap::iterator i(proto_map_->begin()); i != proto_map_->end(); )
1164 {
1165 // Store next iterator into temporary, handle_failed() may remove
1166 // the entry proto_map_.
1167 ProtoMap::iterator i_next(i);
1168 ++i_next;
1169 Proto* p(ProtoMap::value(i));
1170 if (p->state() > Proto::S_INIT &&
1171 p->state() < Proto::S_FAILED &&
1172 p->recv_tstamp() + peer_timeout_ < now)
1173 {
1174 gcomm::SocketStats stats(p->socket()->stats());
1175 log_info << self_string()
1176 << " connection to peer "
1177 << p->remote_uuid() << " with addr "
1178 << p->remote_addr()
1179 << " timed out, no messages seen in " << peer_timeout_
1180 << ", socket stats: "
1181 << stats;
1182 p->set_state(Proto::S_FAILED);
1183 handle_failed(p);
1184 }
1185 else if (p->state() == Proto::S_OK)
1186 {
1187 gcomm::SocketStats stats(p->socket()->stats());
1188 if (stats.send_queue_length >= 1024)
1189 {
1190 log_debug << self_string()
1191 << " socket send queue to "
1192 << " peer "
1193 << p->remote_uuid() << " with addr "
1194 << p->remote_addr()
1195 << ", socket stats: "
1196 << stats;
1197 }
1198 if ((p->recv_tstamp() + peer_timeout_*2/3 < now) ||
1199 (p->send_tstamp() + peer_timeout_*1/3 < now))
1200 {
1201 p->send_keepalive();
1202 }
1203
1204 if (p->state() == Proto::S_FAILED)
1205 {
1206 handle_failed(p);
1207 }
1208 else
1209 {
1210 live_uuids.insert(p->remote_uuid());
1211 }
1212 }
1213 i = i_next;
1214 }
1215
1216 bool should_relay(false);
1217
1218 // iterate over addr list and check if there is at least one live
1219 // proto entry associated to each addr entry
1220
1221 std::set<UUID> nonlive_uuids;
1222 std::string nonlive_peers;
1223 for (AddrList::const_iterator i(remote_addrs_.begin());
1224 i != remote_addrs_.end(); ++i)
1225 {
1226 const AddrEntry& ae(AddrList::value(i));
1227 if (ae.retry_cnt() <= ae.max_retries() &&
1228 live_uuids.find(ae.uuid()) == live_uuids.end())
1229 {
1230 // log_info << self_string()
1231 // << " missing live proto entry for " << ae.uuid();
1232 nonlive_uuids.insert(ae.uuid());
1233 nonlive_peers += AddrList::key(i) + " ";
1234 should_relay = true;
1235 }
1236 else if (ae.last_connect() + peer_timeout_ > now)
1237 {
1238 log_debug << "continuing relaying for "
1239 << (ae.last_connect() + peer_timeout_ - now);
1240 should_relay = true;
1241 }
1242 }
1243
1244 if (should_relay == true)
1245 {
1246 if (relaying_ == false)
1247 {
1248 log_info << self_string()
1249 << " turning message relay requesting on, nonlive peers: "
1250 << nonlive_peers;
1251 relaying_ = true;
1252 }
1253 relay_set_.clear();
1254 // build set of protos having OK status
1255 std::set<Proto*> proto_set;
1256 for (ProtoMap::iterator i(proto_map_->begin()); i != proto_map_->end();
1257 ++i)
1258 {
1259 Proto* p(ProtoMap::value(i));
1260 if (p->state() == Proto::S_OK)
1261 {
1262 proto_set.insert(p);
1263 }
1264 }
1265 // find minimal set of proto entries required to reach maximum set
1266 // of nonlive peers
1267 while (nonlive_uuids.empty() == false &&
1268 proto_set.empty() == false)
1269 {
1270 std::set<Proto*>::iterator maxel(
1271 std::max_element(proto_set.begin(),
1272 proto_set.end(), CmpUuidCounts(nonlive_uuids, segment_)));
1273 Proto* p(*maxel);
1274 log_debug << "relay set maxel :" << *p << " count: "
1275 << CmpUuidCounts(nonlive_uuids, segment_).count(p);
1276
1277 relay_set_.insert(RelayEntry(p, p->socket().get()));
1278 const LinkMap& lm(p->link_map());
1279 for (LinkMap::const_iterator lm_i(lm.begin()); lm_i != lm.end();
1280 ++lm_i)
1281 {
1282 nonlive_uuids.erase((*lm_i).uuid());
1283 }
1284 proto_set.erase(maxel);
1285 }
1286 }
1287 else if (relaying_ == true && should_relay == false)
1288 {
1289 log_info << self_string() << " turning message relay requesting off";
1290 relay_set_.clear();
1291 relaying_ = false;
1292 }
1293 }
1294
1295
handle_timers()1296 gu::datetime::Date gcomm::GMCast::handle_timers()
1297 {
1298 const gu::datetime::Date now(gu::datetime::Date::monotonic());
1299
1300 if (now >= next_check_)
1301 {
1302 check_liveness();
1303 reconnect();
1304 next_check_ = now + check_period_;
1305 }
1306
1307 return next_check_;
1308 }
1309
1310
send(const RelayEntry & re,int segment,gcomm::Datagram & dg)1311 void gcomm::GMCast::send(const RelayEntry& re, int segment, gcomm::Datagram& dg)
1312 {
1313 int err;
1314 if ((err = re.socket->send(segment, dg)) != 0)
1315 {
1316 log_debug << "failed to send to " << re.socket->remote_addr()
1317 << ": (" << err << ") " << strerror(err);
1318 }
1319 else if (re.proto)
1320 {
1321 re.proto->set_send_tstamp(gu::datetime::Date::monotonic());
1322 }
1323 }
1324
relay(const Message & msg,const Datagram & dg,const void * exclude_id)1325 void gcomm::GMCast::relay(const Message& msg,
1326 const Datagram& dg,
1327 const void* exclude_id)
1328 {
1329 Datagram relay_dg(dg);
1330 relay_dg.normalize();
1331 Message relay_msg(msg);
1332
1333 // reset all relay flags from message to be relayed
1334 relay_msg.set_flags(relay_msg.flags() &
1335 ~(Message::F_RELAY | Message::F_SEGMENT_RELAY));
1336
1337 // if F_RELAY is set in received message, relay to all peers except
1338 // the originator
1339 if (msg.flags() & Message::F_RELAY)
1340 {
1341 gu_trace(push_header(relay_msg, relay_dg));
1342 for (SegmentMap::iterator segment_i(segment_map_.begin());
1343 segment_i != segment_map_.end(); ++segment_i)
1344 {
1345 Segment& segment(segment_i->second);
1346 for (Segment::iterator target_i(segment.begin());
1347 target_i != segment.end(); ++target_i)
1348 {
1349 if ((*target_i).socket->id() != exclude_id)
1350 {
1351 send(*target_i, msg.segment_id(), relay_dg);
1352 }
1353 }
1354 }
1355 }
1356 else if (msg.flags() & Message::F_SEGMENT_RELAY)
1357 {
1358 if (relay_set_.empty() == false)
1359 {
1360 // send message to all nodes in relay set to reach
1361 // nodes in local segment that are not directly reachable
1362 relay_msg.set_flags(relay_msg.flags() | Message::F_RELAY);
1363 gu_trace(push_header(relay_msg, relay_dg));
1364 for (RelaySet::iterator relay_i(relay_set_.begin());
1365 relay_i != relay_set_.end(); ++relay_i)
1366 {
1367 if ((*relay_i).socket->id() != exclude_id)
1368 {
1369 send(*relay_i, msg.segment_id(), relay_dg);
1370 }
1371 }
1372 gu_trace(pop_header(relay_msg, relay_dg));
1373 relay_msg.set_flags(relay_msg.flags() & ~Message::F_RELAY);
1374 }
1375
1376 if (msg.segment_id() == segment_)
1377 {
1378 log_warn << "message with F_SEGMENT_RELAY from own segment, "
1379 << "source " << msg.source_uuid();
1380 }
1381
1382 // Relay to local segment
1383 gu_trace(push_header(relay_msg, relay_dg));
1384 Segment& segment(segment_map_[segment_]);
1385 for (Segment::iterator i(segment.begin()); i != segment.end(); ++i)
1386 {
1387 send(*i, msg.segment_id(), relay_dg);
1388 }
1389 }
1390 else
1391 {
1392 log_warn << "GMCast::relay() called without relay flags set";
1393 }
1394 }
1395
handle_up(const void * id,const Datagram & dg,const ProtoUpMeta & um)1396 void gcomm::GMCast::handle_up(const void* id,
1397 const Datagram& dg,
1398 const ProtoUpMeta& um)
1399 {
1400 ProtoMap::iterator i;
1401
1402 if (listener_ == 0) { return; }
1403
1404 if (id == listener_->id())
1405 {
1406 gmcast_accept();
1407 }
1408 else if (mcast_ && id == mcast_->id())
1409 {
1410 Message msg;
1411
1412 try
1413 {
1414 if (dg.offset() < dg.header_len())
1415 {
1416 gu_trace(msg.unserialize(dg.header(), dg.header_size(),
1417 dg.header_offset() +
1418 dg.offset()));
1419 }
1420 else
1421 {
1422 gu_trace(msg.unserialize(dg.payload().data(),
1423 dg.len(),
1424 dg.offset()));
1425 }
1426 }
1427 catch (gu::Exception& e)
1428 {
1429 GU_TRACE(e);
1430 log_warn << e.what();
1431 return;
1432 }
1433
1434 if (msg.type() >= Message::GMCAST_T_USER_BASE)
1435 {
1436 gu_trace(send_up(Datagram(dg, dg.offset() + msg.serial_size()),
1437 ProtoUpMeta(msg.source_uuid())));
1438 }
1439 else
1440 {
1441 log_warn << "non-user message " << msg.type()
1442 << " from multicast socket";
1443 }
1444 }
1445 else if ((i = proto_map_->find(id)) != proto_map_->end())
1446 {
1447 Proto* p(ProtoMap::value(i));
1448
1449 if (dg.len() > 0)
1450 {
1451 const Proto::State prev_state(p->state());
1452
1453 if (prev_state == Proto::S_FAILED)
1454 {
1455 log_warn << "unhandled failed proto";
1456 handle_failed(p);
1457 return;
1458 }
1459
1460 Message msg;
1461
1462 try
1463 {
1464 msg.unserialize(dg.payload().data(), dg.len(),
1465 dg.offset());
1466 }
1467 catch (gu::Exception& e)
1468 {
1469 GU_TRACE(e);
1470 log_warn << e.what();
1471 p->set_state(Proto::S_FAILED);
1472 handle_failed(p);
1473 return;
1474 }
1475
1476 if (msg.type() >= Message::GMCAST_T_USER_BASE)
1477 {
1478 if (evict_list().empty() == false &&
1479 evict_list().find(msg.source_uuid()) != evict_list().end())
1480 {
1481 return;
1482 }
1483 if (msg.flags() &
1484 (Message::F_RELAY | Message::F_SEGMENT_RELAY))
1485 {
1486 relay(msg,
1487 Datagram(dg, dg.offset() + msg.serial_size()),
1488 id);
1489 }
1490 p->set_recv_tstamp(gu::datetime::Date::monotonic());
1491 send_up(Datagram(dg, dg.offset() + msg.serial_size()),
1492 ProtoUpMeta(msg.source_uuid()));
1493 return;
1494 }
1495 else
1496 {
1497 try
1498 {
1499 p->set_recv_tstamp(gu::datetime::Date::monotonic());
1500 gu_trace(p->handle_message(msg));
1501 }
1502 catch (const gu::Exception& e)
1503 {
1504 handle_failed(p);
1505 if (e.get_errno() == ENOTRECOVERABLE)
1506 {
1507 throw;
1508 }
1509 log_warn
1510 << "handling gmcast protocol message failed: "
1511 << e.what();
1512 return;
1513 }
1514
1515 if (p->state() == Proto::S_FAILED)
1516 {
1517 handle_failed(p);
1518 return;
1519 }
1520 else if (p->check_changed_and_reset() == true)
1521 {
1522 update_addresses();
1523 check_liveness();
1524 reconnect();
1525 }
1526 }
1527
1528 if (prev_state != Proto::S_OK && p->state() == Proto::S_OK)
1529 {
1530 handle_established(p);
1531 }
1532 }
1533 else if (p->socket()->state() == Socket::S_CONNECTED &&
1534 (p->state() == Proto::S_HANDSHAKE_WAIT ||
1535 p->state() == Proto::S_INIT))
1536 {
1537 handle_connected(p);
1538 }
1539 else if (p->socket()->state() == Socket::S_CONNECTED)
1540 {
1541 log_warn << "connection " << p->socket()->id()
1542 << " closed by peer";
1543 p->set_state(Proto::S_FAILED);
1544 handle_failed(p);
1545 }
1546 else
1547 {
1548 log_debug << "socket in state " << p->socket()->state();
1549 p->set_state(Proto::S_FAILED);
1550 handle_failed(p);
1551 }
1552 }
1553 else
1554 {
1555 // log_info << "proto entry " << id << " not found";
1556 }
1557 }
1558
find_by_remote_uuid(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::UUID & uuid)1559 static gcomm::gmcast::Proto* find_by_remote_uuid(
1560 const gcomm::gmcast::ProtoMap& proto_map,
1561 const gcomm::UUID& uuid)
1562 {
1563 for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
1564 i != proto_map.end(); ++i)
1565 {
1566 if (i->second->remote_uuid() == uuid)
1567 {
1568 return i->second;
1569 }
1570 }
1571 return 0;
1572 }
1573
handle_down(Datagram & dg,const ProtoDownMeta & dm)1574 int gcomm::GMCast::handle_down(Datagram& dg, const ProtoDownMeta& dm)
1575 {
1576 Message msg(version_, Message::GMCAST_T_USER_BASE, uuid(), 1, segment_);
1577
1578 // If target is set and proto entry for target is found,
1579 // send a direct message. Otherwise fall back for broadcast
1580 // to ensure message delivery via relay
1581 if (dm.target() != UUID::nil())
1582 {
1583 Proto* target_proto(find_by_remote_uuid(*proto_map_, dm.target()));
1584 if (target_proto && target_proto->state() == Proto::S_OK)
1585 {
1586 gu_trace(push_header(msg, dg));
1587 int err;
1588 if ((err = target_proto->socket()->send(msg.segment_id(), dg)) != 0)
1589 {
1590 log_debug << "failed to send to "
1591 << target_proto->socket()->remote_addr()
1592 << ": (" << err << ") " << strerror(err);
1593 }
1594 else
1595 {
1596 target_proto->set_send_tstamp(gu::datetime::Date::monotonic());
1597 }
1598 gu_trace(pop_header(msg, dg));
1599 if (err == 0)
1600 {
1601 return 0;
1602 }
1603 // In case of error fall back to broadcast
1604 }
1605 else
1606 {
1607 log_debug << "Target " << dm.target() << " proto not found";
1608 }
1609 }
1610
1611 // handle relay set first, skip these peers below
1612 if (relay_set_.empty() == false)
1613 {
1614 msg.set_flags(msg.flags() | Message::F_RELAY);
1615 gu_trace(push_header(msg, dg));
1616 for (RelaySet::iterator ri(relay_set_.begin());
1617 ri != relay_set_.end(); ++ri)
1618 {
1619 send(*ri, msg.segment_id(), dg);
1620 }
1621 gu_trace(pop_header(msg, dg));
1622 msg.set_flags(msg.flags() & ~Message::F_RELAY);
1623 }
1624
1625 for (SegmentMap::iterator si(segment_map_.begin());
1626 si != segment_map_.end(); ++si)
1627 {
1628 uint8_t segment_id(si->first);
1629 Segment& segment(si->second);
1630
1631 if (segment_id != segment_)
1632 {
1633 size_t target_idx((self_index_ + segment_id) % segment.size());
1634 msg.set_flags(msg.flags() | Message::F_SEGMENT_RELAY);
1635 // skip peers that are in relay set
1636 if (relay_set_.empty() == true ||
1637 relay_set_.find(segment[target_idx]) == relay_set_.end())
1638 {
1639 gu_trace(push_header(msg, dg));
1640 send(segment[target_idx], msg.segment_id(), dg);
1641 gu_trace(pop_header(msg, dg));
1642 }
1643 }
1644 else
1645 {
1646 msg.set_flags(msg.flags() & ~Message::F_SEGMENT_RELAY);
1647 gu_trace(push_header(msg, dg));
1648 for (Segment::iterator i(segment.begin());
1649 i != segment.end(); ++i)
1650 {
1651 // skip peers that are in relay set
1652 if (relay_set_.empty() == true ||
1653 relay_set_.find(*i) == relay_set_.end())
1654 {
1655 send(*i, msg.segment_id(), dg);
1656 }
1657 }
1658 gu_trace(pop_header(msg, dg));
1659 }
1660 }
1661
1662 return 0;
1663 }
1664
handle_stable_view(const View & view)1665 void gcomm::GMCast::handle_stable_view(const View& view)
1666 {
1667 log_debug << "GMCast::handle_stable_view: " << view;
1668 if (view.type() == V_PRIM)
1669 {
1670 // discard addr list entries not in view
1671 std::set<UUID> gmcast_lst;
1672 for (AddrList::const_iterator i(remote_addrs_.begin());
1673 i != remote_addrs_.end(); ++i)
1674 {
1675 gmcast_lst.insert(i->second.uuid());
1676 }
1677 std::set<UUID> view_lst;
1678 for (NodeList::const_iterator i(view.members().begin());
1679 i != view.members().end(); ++i)
1680 {
1681 view_lst.insert(i->first);
1682 }
1683 std::list<UUID> diff;
1684 std::set_difference(gmcast_lst.begin(),
1685 gmcast_lst.end(),
1686 view_lst.begin(),
1687 view_lst.end(),
1688 std::back_inserter(diff));
1689
1690 // Forget partitioned entries, allow them to reconnect
1691 // in time_wait_/2. Left nodes are given time_wait_ ban for
1692 // reconnecting when handling V_REG below.
1693 for (std::list<UUID>::const_iterator i(diff.begin());
1694 i != diff.end(); ++i)
1695 {
1696 gmcast_forget(*i, time_wait_/2);
1697 }
1698
1699 // mark nodes in view as stable
1700 for (std::set<UUID>::const_iterator i(view_lst.begin());
1701 i != view_lst.end(); ++i)
1702 {
1703 AddrList::iterator ai;
1704 if ((ai = find_if(remote_addrs_.begin(), remote_addrs_.end(),
1705 AddrListUUIDCmp(*i))) != remote_addrs_.end())
1706 {
1707 ai->second.set_retry_cnt(-1);
1708 ai->second.set_max_retries(max_retry_cnt_);
1709 }
1710 }
1711
1712 // iterate over pending address list and discard entries without UUID
1713 for (AddrList::iterator i(pending_addrs_.begin());
1714 i != pending_addrs_.end(); )
1715 {
1716 AddrList::iterator i_next(i);
1717 ++i_next;
1718 const AddrEntry& ae(AddrList::value(i));
1719 if (ae.uuid() == UUID())
1720 {
1721 const std::string addr(AddrList::key(i));
1722 log_info << "discarding pending addr without UUID: "
1723 << addr;
1724 for (ProtoMap::iterator pi(proto_map_->begin());
1725 pi != proto_map_->end();)
1726 {
1727 ProtoMap::iterator pi_next(pi);
1728 ++pi_next;
1729 Proto* p(ProtoMap::value(pi));
1730 if (p->remote_addr() == addr)
1731 {
1732 log_info << "discarding pending addr proto entry " << p;
1733 erase_proto(pi);
1734 }
1735 pi = pi_next;
1736 }
1737 pending_addrs_.erase(i);
1738 }
1739 i = i_next;
1740 }
1741 prim_view_reached_ = true;
1742 }
1743 else if (view.type() == V_REG)
1744 {
1745 for (NodeList::const_iterator i(view.members().begin());
1746 i != view.members().end(); ++i)
1747 {
1748 AddrList::iterator ai;
1749 if ((ai = find_if(remote_addrs_.begin(), remote_addrs_.end(),
1750 AddrListUUIDCmp(NodeList::key(i))))
1751 != remote_addrs_.end())
1752 {
1753 log_info << "declaring " << NodeList::key(i)
1754 << " at " << handle_get_address(NodeList::key(i))
1755 << " stable";
1756 ai->second.set_retry_cnt(-1);
1757 ai->second.set_max_retries(max_retry_cnt_);
1758 }
1759 }
1760
1761 // Forget left nodes
1762 for (NodeList::const_iterator i(view.left().begin());
1763 i != view.left().end(); ++i)
1764 {
1765 gmcast_forget(NodeList::key(i), time_wait_);
1766 }
1767 }
1768 check_liveness();
1769
1770 for (ProtoMap::const_iterator i(proto_map_->begin()); i != proto_map_->end();
1771 ++i)
1772 {
1773 log_debug << "proto: " << *ProtoMap::value(i);
1774 }
1775 }
1776
1777
handle_evict(const UUID & uuid)1778 void gcomm::GMCast::handle_evict(const UUID& uuid)
1779 {
1780 if (is_evicted(uuid) == true)
1781 {
1782 return;
1783 }
1784 gmcast_forget(uuid, time_wait_);
1785 }
1786
1787
handle_get_address(const UUID & uuid) const1788 std::string gcomm::GMCast::handle_get_address(const UUID& uuid) const
1789 {
1790 AddrList::const_iterator ali(
1791 find_if(remote_addrs_.begin(),
1792 remote_addrs_.end(),
1793 AddrListUUIDCmp(uuid)));
1794 return (ali == remote_addrs_.end() ? "" : AddrList::key(ali));
1795 }
1796
add_or_del_addr(const std::string & val)1797 void gcomm::GMCast::add_or_del_addr(const std::string& val)
1798 {
1799 if (val.compare(0, 4, "add:") == 0)
1800 {
1801 gu::URI uri(val.substr(4));
1802 std::string addr(gu::net::resolve(uri_string(get_scheme(use_ssl_),
1803 uri.get_host(),
1804 uri.get_port())).to_string());
1805 log_info << "inserting address '" << addr << "'";
1806 insert_address(addr, UUID(), remote_addrs_);
1807 AddrList::iterator ai(remote_addrs_.find(addr));
1808 AddrList::value(ai).set_max_retries(
1809 max_initial_reconnect_attempts_);
1810 AddrList::value(ai).set_retry_cnt(-1);
1811 }
1812 else if (val.compare(0, 4, "del:") == 0)
1813 {
1814 std::string addr(val.substr(4));
1815 AddrList::iterator ai(remote_addrs_.find(addr));
1816 if (ai != remote_addrs_.end())
1817 {
1818 ProtoMap::iterator pi, pi_next;
1819 for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
1820 {
1821 pi_next = pi, ++pi_next;
1822 Proto* rp = ProtoMap::value(pi);
1823 if (rp->remote_addr() == AddrList::key(ai))
1824 {
1825 log_info << "deleting entry " << AddrList::key(ai);
1826 erase_proto(pi);
1827 }
1828 }
1829 AddrEntry& ae(AddrList::value(ai));
1830 ae.set_max_retries(0);
1831 ae.set_retry_cnt(1);
1832 ae.set_next_reconnect(gu::datetime::Date::monotonic() + time_wait_);
1833 update_addresses();
1834 }
1835 else
1836 {
1837 log_info << "address '" << addr
1838 << "' not found from remote addrs list";
1839 }
1840 }
1841 else
1842 {
1843 gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1844 }
1845 }
1846
1847
set_param(const std::string & key,const std::string & val,Protolay::sync_param_cb_t & sync_param_cb)1848 bool gcomm::GMCast::set_param(const std::string& key, const std::string& val,
1849 Protolay::sync_param_cb_t& sync_param_cb)
1850 {
1851 try
1852 {
1853 if (key == Conf::GMCastMaxInitialReconnectAttempts)
1854 {
1855 max_initial_reconnect_attempts_ = gu::from_string<int>(val);
1856 return true;
1857 }
1858 else if (key == Conf::GMCastPeerAddr)
1859 {
1860 try
1861 {
1862 add_or_del_addr(val);
1863 }
1864 catch (gu::NotFound& nf)
1865 {
1866 gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1867 }
1868 catch (gu::NotSet& ns)
1869 {
1870 gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1871 }
1872 return true;
1873 }
1874 else if (key == Conf::GMCastIsolate)
1875 {
1876 int tmpval = gu::from_string<int>(val);
1877 if (tmpval < 0 || tmpval > 2)
1878 {
1879 gu_throw_error(EINVAL)
1880 << "invalid value for gmacst.isolate: '"
1881 << tmpval << "'";
1882 }
1883 isolate_ = tmpval;
1884 log_info << "turning isolation "
1885 << (isolate_ == 1 ? "on" :
1886 (isolate_ == 2 ? "force quit" : "off"));
1887 if (isolate_)
1888 {
1889 // delete all entries in proto map
1890 ProtoMap::iterator pi, pi_next;
1891 for (pi = proto_map_->begin(); pi != proto_map_->end();
1892 pi = pi_next)
1893 {
1894 pi_next = pi, ++pi_next;
1895 erase_proto(pi);
1896 }
1897 segment_map_.clear();
1898 }
1899 return true;
1900 }
1901 else if (key == Conf::SocketRecvBufSize)
1902 {
1903 gu_trace(Conf::check_recv_buf_size(val));
1904 conf_.set(key, val);
1905
1906 for (ProtoMap::iterator pi(proto_map_->begin());
1907 pi != proto_map_->end(); ++pi)
1908 {
1909 gu_trace(pi->second->socket()->set_option(key, val));
1910 // erase_proto(pi++);
1911 }
1912 // segment_map_.clear();
1913 // reconnect();
1914 return true;
1915 }
1916 else if (key == Conf::GMCastGroup ||
1917 key == Conf::GMCastListenAddr ||
1918 key == Conf::GMCastMCastAddr ||
1919 key == Conf::GMCastMCastPort ||
1920 key == Conf::GMCastMCastTTL ||
1921 key == Conf::GMCastTimeWait ||
1922 key == Conf::GMCastPeerTimeout ||
1923 key == Conf::GMCastSegment)
1924 {
1925 gu_throw_error(EPERM) << "can't change value during runtime";
1926 }
1927 }
1928 catch (gu::Exception& e)
1929 {
1930 GU_TRACE(e); throw;
1931 }
1932 catch (std::exception& e)
1933 {
1934 gu_throw_error(EINVAL) << e.what();
1935 }
1936 catch (...)
1937 {
1938 gu_throw_error(EINVAL) << "exception";
1939 }
1940
1941 return false;
1942 }
1943