1 /*
2 * Copyright (C) 2009-2019 Codership Oy <info@codership.com>
3 */
4
5 #include "gmcast.hpp"
6 #include "gmcast_proto.hpp"
7
8 #include "gcomm/common.hpp"
9 #include "gcomm/conf.hpp"
10 #include "gcomm/util.hpp"
11 #include "gcomm/map.hpp"
12 #include "defaults.hpp"
13 #include "gu_convert.hpp"
14 #include "gu_resolver.hpp"
15 #include "gu_asio.hpp" // gu::conf::use_ssl
16
17 using namespace std::rel_ops;
18
19 using gcomm::gmcast::Proto;
20 using gcomm::gmcast::ProtoMap;
21 using gcomm::gmcast::Link;
22 using gcomm::gmcast::LinkMap;
23 using gcomm::gmcast::Message;
24
25 const long gcomm::GMCast::max_retry_cnt_(std::numeric_limits<int>::max());
26
set_tcp_defaults(gu::URI * uri)27 static void set_tcp_defaults (gu::URI* uri)
28 {
29 // what happens if there is already this parameter?
30 uri->set_option(gcomm::Conf::TcpNonBlocking, gu::to_string(1));
31 }
32
33
check_tcp_uri(const gu::URI & uri)34 static bool check_tcp_uri(const gu::URI& uri)
35 {
36 return (uri.get_scheme() == gu::scheme::tcp ||
37 uri.get_scheme() == gu::scheme::ssl);
38 }
39
get_scheme(bool use_ssl,bool dynamic_socket)40 static std::string get_scheme(bool use_ssl, bool dynamic_socket)
41 {
42 if (use_ssl == true && not dynamic_socket)
43 {
44 return gu::scheme::ssl;
45 }
46 return gu::scheme::tcp;
47 }
48
49 //
50 // Check if the node should stay isolated.
51 // Possible outcomes:
52 // * Return false, node should continue reconnecting and accepting connections
53 // (isolate = 0)
54 // * Return true, node should remain isolated (isolate = 1)
55 // * Throw fatal exception to terminate the backend (isolate = 2)
56 //
is_isolated(int isolate)57 static inline bool is_isolated(int isolate)
58 {
59 switch (isolate)
60 {
61 case 1:
62 return true;
63 case 2:
64 gu_throw_fatal<< "Gcomm backend termination was "
65 << "requested by setting gmcast.isolate=2.";
66 break;
67 default:
68 break;
69 }
70 return false;
71 }
72
GMCast(Protonet & net,const gu::URI & uri,const UUID * my_uuid)73 gcomm::GMCast::GMCast(Protonet& net, const gu::URI& uri,
74 const UUID* my_uuid)
75 :
76 Transport (net, uri),
77 version_(check_range(Conf::GMCastVersion,
78 param<int>(conf_, uri, Conf::GMCastVersion, "0"),
79 0, max_version_ + 1)),
80 segment_ (check_range(Conf::GMCastSegment,
81 param<int>(conf_, uri, Conf::GMCastSegment, "0"),
82 0, 255)),
83 my_uuid_ (my_uuid ? *my_uuid : UUID(0, 0)),
84 dynamic_socket_ (conf_.has(gu::conf::socket_dynamic) ?
85 param<bool>(conf_, uri, gu::conf::socket_dynamic,
86 "false") :
87 false),
88 #ifdef GALERA_HAVE_SSL
89 use_ssl_ (param<bool>(conf_, uri, gu::conf::use_ssl, "false")),
90 #else
91 use_ssl_(),
92 #endif // GALERA_HAVE_SSL
93 // @todo: technically group name should be in path component
94 group_name_ (param<std::string>(conf_, uri, Conf::GMCastGroup, "")),
95 listen_addr_ (
96 param<std::string>(
97 conf_, uri, Conf::GMCastListenAddr,
98 get_scheme(use_ssl_, dynamic_socket_) + "://0.0.0.0")), // how to make it IPv6 safe?
99 initial_addrs_(),
100 mcast_addr_ (param<std::string>(conf_, uri, Conf::GMCastMCastAddr, "")),
101 bind_ip_ (""),
102 mcast_ttl_ (check_range(
103 Conf::GMCastMCastTTL,
104 param<int>(conf_, uri, Conf::GMCastMCastTTL, "1"),
105 1, 256)),
106 listener_ (),
107 mcast_ (),
108 pending_addrs_(),
109 remote_addrs_ (),
110 addr_blacklist_(),
111 relaying_ (false),
112 isolate_ (0),
113 prim_view_reached_(false),
114 proto_map_ (new ProtoMap()),
115 relay_set_ (),
116 segment_map_ (),
117 self_index_ (std::numeric_limits<size_t>::max()),
118 time_wait_ (param<gu::datetime::Period>(
119 conf_, uri,
120 Conf::GMCastTimeWait, Defaults::GMCastTimeWait)),
121 check_period_ ("PT0.5S"),
122 peer_timeout_ (param<gu::datetime::Period>(
123 conf_, uri,
124 Conf::GMCastPeerTimeout, Defaults::GMCastPeerTimeout)),
125 max_initial_reconnect_attempts_(
126 param<int>(conf_, uri,
127 Conf::GMCastMaxInitialReconnectAttempts,
128 gu::to_string(max_retry_cnt_))),
129 next_check_ (gu::datetime::Date::monotonic())
130 {
131 log_info << "GMCast version " << version_;
132
133 if (group_name_ == "")
134 {
135 gu_throw_error (EINVAL) << "Group not defined in URL: "
136 << uri_.to_string();
137 }
138
139 set_initial_addr(uri_);
140
141 try
142 {
143 listen_addr_ = uri_.get_option (Conf::GMCastListenAddr);
144 }
145 catch (gu::NotFound&) {}
146
147 try
148 {
149 gu::URI uri(listen_addr_); /* check validity of the address */
150 }
151 catch (gu::Exception&)
152 {
153 /* most probably no scheme, try to append one and see if it succeeds */
154 listen_addr_ = uri_string(get_scheme(use_ssl_, dynamic_socket_), listen_addr_);
155 gu_trace(gu::URI uri(listen_addr_));
156 }
157
158 gu::URI listen_uri(listen_addr_);
159
160 if (check_tcp_uri(listen_uri) == false)
161 {
162 gu_throw_error (EINVAL) << "listen addr '" << listen_addr_
163 << "' does not specify supported protocol";
164 }
165
166 if (gu::net::resolve(listen_uri).get_addr().is_anyaddr() == false)
167 {
168 // bind outgoing connections to the same address as listening.
169 gu_trace(bind_ip_ = listen_uri.get_host());
170 }
171
172 std::string port(Defaults::GMCastTcpPort);
173
174 try
175 {
176 port = listen_uri.get_port();
177 }
178 catch (gu::NotSet&)
179 {
180 // if no listen port is set for listen address in the options,
181 // see if base port was configured
182 try
183 {
184 port = conf_.get(BASE_PORT_KEY);
185 }
186 catch (gu::NotSet&)
187 {
188 // if no base port configured, try port from the connection address
189 try { port = uri_.get_port(); } catch (gu::NotSet&) {}
190 }
191
192 listen_addr_ += ":" + port;
193 }
194
195 conf_.set(BASE_PORT_KEY, port);
196
197 listen_addr_ = gu::net::resolve(listen_addr_).to_string();
198 // resolving sets scheme to tcp, have to rewrite for ssl
199 if (use_ssl_ == true && not dynamic_socket_)
200 {
201 listen_addr_.replace(0, 3, gu::scheme::ssl);
202 }
203 std::set<std::string>::iterator iaself(initial_addrs_.find(listen_addr_));
204 if (iaself != initial_addrs_.end())
205 {
206 log_debug << "removing own listen address '" << *iaself
207 << "' from initial address list";
208 initial_addrs_.erase(iaself);
209 }
210
211 if (mcast_addr_ != "")
212 {
213 try
214 {
215 port = param<std::string>(conf_, uri_, Conf::GMCastMCastPort, port);
216 }
217 catch (gu::NotFound&) {}
218
219 mcast_addr_ = gu::net::resolve(
220 uri_string(gu::scheme::udp, mcast_addr_, port)).to_string();
221 }
222
223 log_info << self_string() << " listening at " << listen_addr_;
224 log_info << self_string() << " multicast: " << mcast_addr_
225 << ", ttl: " << mcast_ttl_;
226
227 conf_.set(Conf::GMCastListenAddr, listen_addr_);
228 conf_.set(Conf::GMCastMCastAddr, mcast_addr_);
229 conf_.set(Conf::GMCastVersion, gu::to_string(version_));
230 conf_.set(Conf::GMCastTimeWait, gu::to_string(time_wait_));
231 conf_.set(Conf::GMCastMCastTTL, gu::to_string(mcast_ttl_));
232 conf_.set(Conf::GMCastPeerTimeout, gu::to_string(peer_timeout_));
233 conf_.set(Conf::GMCastSegment, gu::to_string<int>(segment_));
234 }
235
~GMCast()236 gcomm::GMCast::~GMCast()
237 {
238 if (listener_ != 0) close();
239
240 delete proto_map_;
241 }
242
set_initial_addr(const gu::URI & uri)243 void gcomm::GMCast::set_initial_addr(const gu::URI& uri)
244 {
245
246 const gu::URI::AuthorityList& al(uri.get_authority_list());
247
248 for (gu::URI::AuthorityList::const_iterator i(al.begin());
249 i != al.end(); ++i)
250 {
251 std::string host;
252 try
253 {
254 host = i->host();
255 }
256 catch (gu::NotSet& ns)
257 {
258 gu_throw_error(EINVAL) << "Unset host in URL " << uri;
259 }
260
261 if (host_is_any(host)) continue;
262
263 std::string port;
264 try
265 {
266 port = i->port();
267 }
268 catch (gu::NotSet&)
269 {
270 try
271 {
272 port = conf_.get(BASE_PORT_KEY);
273 }
274 catch (gu::NotFound&)
275 {
276 port = Defaults::GMCastTcpPort;
277 }
278 catch (gu::NotSet&)
279 {
280 port = Defaults::GMCastTcpPort;
281 }
282 }
283
284 std::string initial_uri = uri_string(get_scheme(use_ssl_, dynamic_socket_), host, port);
285 std::string initial_addr;
286 try
287 {
288 initial_addr = gu::net::resolve(initial_uri).to_string();
289 }
290 catch (gu::Exception& )
291 {
292 log_warn << "Failed to resolve " << initial_uri;
293 continue;
294 }
295 // resolving sets scheme to tcp, have to rewrite for ssl
296 if (use_ssl_ == true && not dynamic_socket_)
297 {
298 initial_addr.replace(0, 3, gu::scheme::ssl);
299 }
300
301 if (check_tcp_uri(initial_addr) == false)
302 {
303 gu_throw_error (EINVAL) << "initial addr '" << initial_addr
304 << "' is not valid";
305 }
306
307 log_debug << self_string() << " initial addr: " << initial_addr;
308 initial_addrs_.insert(initial_addr);
309
310 }
311
312 }
313
connect_precheck(bool start_prim)314 void gcomm::GMCast::connect_precheck(bool start_prim)
315 {
316 if (!start_prim && initial_addrs_.empty()) {
317 gu_throw_fatal << "No address to connect";
318 }
319 }
320
connect()321 void gcomm::GMCast::connect()
322 {
323 pstack_.push_proto(this);
324 log_debug << "gmcast " << uuid() << " connect";
325
326 gu::URI listen_uri(listen_addr_);
327
328 set_tcp_defaults (&listen_uri);
329
330 listener_ = pnet().acceptor(listen_uri);
331 gu_trace (listener_->listen(listen_uri));
332
333 if (!mcast_addr_.empty())
334 {
335 gu::URI mcast_uri(
336 mcast_addr_ + '?'
337 + gcomm::Socket::OptIfAddr + '='
338 + gu::URI(listen_addr_).get_host()+'&'
339 + gcomm::Socket::OptNonBlocking + "=1&"
340 + gcomm::Socket::OptMcastTTL + '=' + gu::to_string(mcast_ttl_)
341 );
342
343 mcast_ = pnet().socket(mcast_uri);
344 gu_trace(mcast_->connect(mcast_uri));
345 }
346
347 if (!initial_addrs_.empty())
348 {
349 for (std::set<std::string>::const_iterator i(initial_addrs_.begin());
350 i != initial_addrs_.end(); ++i)
351 {
352 insert_address(*i, UUID(), pending_addrs_);
353 AddrList::iterator ai(pending_addrs_.find(*i));
354 AddrList::value(ai).set_max_retries(max_retry_cnt_);
355 gu_trace (gmcast_connect(*i));
356 }
357 }
358 }
359
360
connect(const gu::URI & uri)361 void gcomm::GMCast::connect(const gu::URI& uri)
362 {
363 set_initial_addr(uri);
364 connect();
365 }
366
367
368
close(bool force)369 void gcomm::GMCast::close(bool force)
370 {
371 log_debug << "gmcast " << uuid() << " close";
372 pstack_.pop_proto(this);
373 if (mcast_)
374 {
375 mcast_->close();
376 // delete mcast;
377 // mcast = 0;
378 }
379
380 gcomm_assert(listener_ != 0);
381 listener_->close();
382 listener_.reset();
383
384 segment_map_.clear();
385 for (ProtoMap::iterator
386 i = proto_map_->begin(); i != proto_map_->end(); ++i)
387 {
388 delete ProtoMap::value(i);
389 }
390
391 proto_map_->clear();
392 pending_addrs_.clear();
393 remote_addrs_.clear();
394 prim_view_reached_ = false;
395 }
396
397 //
398 // Private
399 //
400
401
402 // Find other local endpoint matching to proto
403 static const Proto*
find_other_local_endpoint(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::gmcast::Proto * proto)404 find_other_local_endpoint(const gcomm::gmcast::ProtoMap& proto_map,
405 const gcomm::gmcast::Proto* proto)
406 {
407 for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
408 i != proto_map.end(); ++i)
409 {
410 if (i->second != proto &&
411 i->second->handshake_uuid() == proto->handshake_uuid())
412 {
413 return i->second;
414 }
415 }
416 return 0;
417 }
418
419 // Find other endpoint with same remote UUID
420 static const Proto*
find_other_endpoint_same_remote_uuid(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::gmcast::Proto * proto)421 find_other_endpoint_same_remote_uuid(const gcomm::gmcast::ProtoMap& proto_map,
422 const gcomm::gmcast::Proto* proto)
423 {
424 for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
425 i != proto_map.end(); ++i)
426 {
427 if (i->second != proto &&
428 i->second->remote_uuid() == proto->remote_uuid())
429 {
430 return i->second;
431 }
432 }
433 return 0;
434 }
435
is_own(const gmcast::Proto * proto) const436 bool gcomm::GMCast::is_own(const gmcast::Proto* proto) const
437 {
438 assert(proto->remote_uuid() != gcomm::UUID::nil());
439 if (proto->remote_uuid() != uuid())
440 {
441 return false;
442 }
443 return find_other_local_endpoint(*proto_map_, proto);
444 }
445
blacklist(const gmcast::Proto * proto)446 void gcomm::GMCast::blacklist(const gmcast::Proto* proto)
447 {
448 initial_addrs_.erase(proto->remote_addr());
449 pending_addrs_.erase(proto->remote_addr());
450 addr_blacklist_.insert(std::make_pair(
451 proto->remote_addr(),
452 AddrEntry(gu::datetime::Date::monotonic(),
453 gu::datetime::Date::monotonic(),
454 proto->remote_uuid())));
455 }
456
is_not_own_and_duplicate_exists(const Proto * proto) const457 bool gcomm::GMCast::is_not_own_and_duplicate_exists(
458 const Proto* proto) const
459 {
460 assert(proto->remote_uuid() != gcomm::UUID::nil());
461 const Proto* other(find_other_local_endpoint(*proto_map_, proto));
462 if (!other)
463 {
464 // Not own
465 // Check if remote UUID matches to self
466 if (proto->remote_uuid() == uuid())
467 {
468 return true;
469 }
470 // Check if other proto entry with same remote
471 // UUID but different remote address exists.
472 other = find_other_endpoint_same_remote_uuid(*proto_map_, proto);
473 if (other && other->remote_addr() != proto->remote_addr())
474 {
475 return true;
476 }
477 }
478 return false;
479 }
480
481 // Erase proto entry in safe manner
482 // 1) Erase from relay_set_
483 // 2) Erase from proto_map_
484 // 3) Delete proto entry
erase_proto(gmcast::ProtoMap::iterator i)485 void gcomm::GMCast::erase_proto(gmcast::ProtoMap::iterator i)
486 {
487 Proto* p(ProtoMap::value(i));
488 RelayEntry e(p, p->socket().get());
489 RelaySet::iterator si(relay_set_.find(e));
490 if (si != relay_set_.end())
491 {
492 relay_set_.erase(si);
493 }
494 proto_map_->erase(i);
495 delete p;
496 }
497
gmcast_accept()498 void gcomm::GMCast::gmcast_accept()
499 {
500 SocketPtr tp;
501
502 try
503 {
504 tp = listener_->accept();
505 }
506 catch (gu::Exception& e)
507 {
508 log_warn << e.what();
509 return;
510 }
511
512 if (is_isolated(isolate_))
513 {
514 log_debug << "dropping accepted socket due to isolation";
515 tp->close();
516 return;
517 }
518
519 Proto* peer = new Proto (
520 *this,
521 version_,
522 tp,
523 listener_->listen_addr(),
524 "",
525 mcast_addr_,
526 segment_,
527 group_name_);
528 std::pair<ProtoMap::iterator, bool> ret =
529 proto_map_->insert(std::make_pair(tp->id(), peer));
530
531 if (ret.second == false)
532 {
533 delete peer;
534 gu_throw_fatal << "Failed to add peer to map";
535 }
536 if (tp->state() == Socket::S_CONNECTED)
537 {
538 peer->send_handshake();
539 }
540 else
541 {
542 log_debug << "accepted socket is connecting";
543 }
544 log_debug << "handshake sent";
545 }
546
547
gmcast_connect(const std::string & remote_addr)548 void gcomm::GMCast::gmcast_connect(const std::string& remote_addr)
549 {
550 if (remote_addr == listen_addr_) return;
551
552 gu::URI connect_uri(remote_addr);
553
554 set_tcp_defaults (&connect_uri);
555
556 if (!bind_ip_.empty())
557 {
558 connect_uri.set_option(gcomm::Socket::OptIfAddr, bind_ip_);
559 }
560
561 SocketPtr tp = pnet().socket(connect_uri);
562
563 try
564 {
565 tp->connect(connect_uri);
566 }
567 catch (gu::Exception& e)
568 {
569 log_debug << "Connect failed: " << e.what();
570 // delete tp;
571 return;
572 }
573
574 Proto* peer = new Proto (
575 *this,
576 version_,
577 tp,
578 listener_->listen_addr(),
579 remote_addr,
580 mcast_addr_,
581 segment_,
582 group_name_);
583
584 std::pair<ProtoMap::iterator, bool> ret =
585 proto_map_->insert(std::make_pair(tp->id(), peer));
586
587 if (ret.second == false)
588 {
589 delete peer;
590 gu_throw_fatal << "Failed to add peer to map";
591 }
592
593 ret.first->second->wait_handshake();
594 }
595
596
gmcast_forget(const UUID & uuid,const gu::datetime::Period & wait_period)597 void gcomm::GMCast::gmcast_forget(const UUID& uuid,
598 const gu::datetime::Period& wait_period)
599 {
600 /* Close all proto entries corresponding to uuid */
601
602 ProtoMap::iterator pi, pi_next;
603 for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
604 {
605 pi_next = pi, ++pi_next;
606 Proto* rp = ProtoMap::value(pi);
607 if (rp->remote_uuid() == uuid)
608 {
609 erase_proto(pi);
610 }
611 }
612
613 /* Set all corresponding entries in address list to have retry cnt
614 * greater than max retries and next reconnect time after some period */
615 AddrList::iterator ai;
616 for (ai = remote_addrs_.begin(); ai != remote_addrs_.end(); ++ai)
617 {
618 AddrEntry& ae(AddrList::value(ai));
619 if (ae.uuid() == uuid)
620 {
621 log_info << "forgetting " << uuid
622 << " (" << AddrList::key(ai) << ")";
623
624 ProtoMap::iterator pi, pi_next;
625 for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
626 {
627 pi_next = pi, ++pi_next;
628 if (ProtoMap::value(pi)->remote_addr() == AddrList::key(ai))
629 {
630 log_info << "deleting entry " << AddrList::key(ai);
631 erase_proto(pi);
632 }
633 }
634 ae.set_max_retries(0);
635 ae.set_retry_cnt(1);
636 gu::datetime::Date now(gu::datetime::Date::monotonic());
637 // Don't reduce next reconnect time if it is set greater than
638 // requested
639 if ((now + wait_period > ae.next_reconnect()) ||
640 (ae.next_reconnect() == gu::datetime::Date::max()))
641 {
642 ae.set_next_reconnect(gu::datetime::Date::monotonic() + wait_period);
643 }
644 else
645 {
646 log_debug << "not decreasing next reconnect for " << uuid;
647 }
648 }
649 }
650
651 /* Update state */
652 update_addresses();
653 }
654
handle_connected(Proto * rp)655 void gcomm::GMCast::handle_connected(Proto* rp)
656 {
657 const SocketPtr tp(rp->socket());
658 assert(tp->state() == Socket::S_CONNECTED);
659 log_debug << "transport " << tp << " connected";
660 if (rp->state() == Proto::S_INIT)
661 {
662 log_debug << "sending handshake";
663 // accepted socket was waiting for underlying transport
664 // handshake to finish
665 rp->send_handshake();
666 }
667 }
668
handle_established(Proto * est)669 void gcomm::GMCast::handle_established(Proto* est)
670 {
671 log_info << self_string() << " connection established to "
672 << est->remote_uuid() << " "
673 << est->remote_addr();
674 // UUID checks are handled during protocol handshake
675 assert(est->remote_uuid() != uuid());
676
677 if (is_evicted(est->remote_uuid()))
678 {
679 log_warn << "Closing connection to evicted node " << est->remote_uuid();
680 erase_proto(proto_map_->find_checked(est->socket()->id()));
681 update_addresses();
682 return;
683 }
684
685 // If address is found from pending_addrs_, move it to remote_addrs list
686 // and set retry cnt to -1
687 const std::string& remote_addr(est->remote_addr());
688 AddrList::iterator i(pending_addrs_.find(remote_addr));
689
690 if (i != pending_addrs_.end())
691 {
692 log_debug << "Erasing " << remote_addr << " from panding list";
693 pending_addrs_.erase(i);
694 }
695
696 if ((i = remote_addrs_.find(remote_addr)) == remote_addrs_.end())
697 {
698 log_debug << "Inserting " << remote_addr << " to remote list";
699
700 insert_address (remote_addr, est->remote_uuid(), remote_addrs_);
701 i = remote_addrs_.find(remote_addr);
702 }
703 else if (AddrList::value(i).uuid() != est->remote_uuid())
704 {
705 log_info << "remote endpoint " << est->remote_addr()
706 << " changed identity " << AddrList::value(i).uuid().full_str()
707 << " -> " << est->remote_uuid().full_str();
708 remote_addrs_.erase(i);
709 i = remote_addrs_.insert_unique(
710 make_pair(est->remote_addr(),
711 AddrEntry(gu::datetime::Date::monotonic(),
712 gu::datetime::Date::monotonic(),
713 est->remote_uuid())));
714 }
715
716 if (AddrList::value(i).retry_cnt() >
717 AddrList::value(i).max_retries())
718 {
719 log_warn << "discarding established (time wait) "
720 << est->remote_uuid()
721 << " (" << est->remote_addr() << ") ";
722 erase_proto(proto_map_->find(est->socket()->id()));
723 update_addresses();
724 return;
725 }
726
727 // send_up(Datagram(), p->remote_uuid());
728
729 // init retry cnt to -1 to avoid unnecessary logging at first attempt
730 // max retries will be readjusted in handle stable view
731 AddrList::value(i).set_retry_cnt(-1);
732 AddrList::value(i).set_max_retries(max_initial_reconnect_attempts_);
733
734 // Cleanup all previously established entries with same
735 // remote uuid. It is assumed that the most recent connection
736 // is usually the healthiest one.
737 ProtoMap::iterator j, j_next;
738 for (j = proto_map_->begin(); j != proto_map_->end(); j = j_next)
739 {
740 j_next = j, ++j_next;
741
742 Proto* p(ProtoMap::value(j));
743
744 if (p->remote_uuid() == est->remote_uuid())
745 {
746 if (p->handshake_uuid() < est->handshake_uuid())
747 {
748 log_debug << self_string()
749 << " cleaning up duplicate "
750 << p->socket()
751 << " after established "
752 << est->socket();
753 erase_proto(j);
754 }
755 else if (p->handshake_uuid() > est->handshake_uuid())
756 {
757 log_debug << self_string()
758 << " cleaning up established "
759 << est->socket()
760 << " which is duplicate of "
761 << p->socket();
762 erase_proto(proto_map_->find_checked(est->socket()->id()));
763 update_addresses();
764 return;
765 }
766 else
767 {
768 assert(p == est);
769 }
770 }
771 }
772
773 AddrList::iterator ali(find_if(remote_addrs_.begin(),
774 remote_addrs_.end(),
775 AddrListUUIDCmp(est->remote_uuid())));
776 if (ali != remote_addrs_.end())
777 {
778 AddrList::value(ali).set_last_connect();
779 }
780 else
781 {
782 log_warn << "peer " << est->remote_addr()
783 << " not found from remote addresses";
784 }
785
786 update_addresses();
787 }
788
handle_failed(Proto * failed)789 void gcomm::GMCast::handle_failed(Proto* failed)
790 {
791 log_debug << "handle failed: " << *failed;
792 const std::string& remote_addr = failed->remote_addr();
793
794 bool found_ok(false);
795 for (ProtoMap::const_iterator i = proto_map_->begin();
796 i != proto_map_->end(); ++i)
797 {
798 Proto* p(ProtoMap::value(i));
799 if (p != failed &&
800 p->state() <= Proto::S_OK &&
801 p->remote_addr() == failed->remote_addr())
802 {
803 log_debug << "found live " << *p;
804 found_ok = true;
805 break;
806 }
807 }
808
809 if (found_ok == false && remote_addr != "")
810 {
811 AddrList::iterator i;
812
813 if ((i = pending_addrs_.find(remote_addr)) != pending_addrs_.end() ||
814 (i = remote_addrs_.find(remote_addr)) != remote_addrs_.end())
815 {
816 AddrEntry& ae(AddrList::value(i));
817 ae.set_retry_cnt(ae.retry_cnt() + 1);
818
819 gu::datetime::Date rtime = gu::datetime::Date::monotonic() + gu::datetime::Period("PT1S");
820 log_debug << self_string()
821 << " setting next reconnect time to "
822 << rtime << " for " << remote_addr;
823 ae.set_next_reconnect(rtime);
824 }
825 }
826
827 erase_proto(proto_map_->find_checked(failed->socket()->id()));
828 update_addresses();
829 }
830
is_connected(const std::string & addr,const UUID & uuid) const831 bool gcomm::GMCast::is_connected(const std::string& addr, const UUID& uuid) const
832 {
833 for (ProtoMap::const_iterator i = proto_map_->begin();
834 i != proto_map_->end(); ++i)
835 {
836 Proto* conn = ProtoMap::value(i);
837
838 if (addr == conn->remote_addr() ||
839 uuid == conn->remote_uuid())
840 {
841 return true;
842 }
843 }
844
845 return false;
846 }
847
848
insert_address(const std::string & addr,const UUID & uuid,AddrList & alist)849 void gcomm::GMCast::insert_address (const std::string& addr,
850 const UUID& uuid,
851 AddrList& alist)
852 {
853 if (addr == listen_addr_)
854 {
855 gu_throw_fatal << "Trying to add self addr " << addr << " to addr list";
856 }
857
858 if (alist.insert(make_pair(addr,
859 AddrEntry(gu::datetime::Date::monotonic(),
860 gu::datetime::Date::monotonic(), uuid))).second == false)
861 {
862 log_warn << "Duplicate entry: " << addr;
863 }
864 else
865 {
866 log_debug << self_string() << ": new address entry " << uuid << ' '
867 << addr;
868 }
869 }
870
871
update_addresses()872 void gcomm::GMCast::update_addresses()
873 {
874 LinkMap link_map;
875 std::set<UUID> uuids;
876 /* Add all established connections into uuid_map and update
877 * list of remote addresses */
878
879 ProtoMap::iterator i, i_next;
880 for (i = proto_map_->begin(); i != proto_map_->end(); i = i_next)
881 {
882 i_next = i, ++i_next;
883
884 Proto* rp = ProtoMap::value(i);
885
886 if (rp->state() == Proto::S_OK)
887 {
888 if (rp->remote_addr() == "" ||
889 rp->remote_uuid() == UUID::nil())
890 {
891 gu_throw_fatal << "Protocol error: local: (" << my_uuid_
892 << ", '" << listen_addr_
893 << "'), remote: (" << rp->remote_uuid()
894 << ", '" << rp->remote_addr() << "')";
895 }
896
897 if (remote_addrs_.find(rp->remote_addr()) == remote_addrs_.end())
898 {
899 log_warn << "Connection exists but no addr on addr list for "
900 << rp->remote_addr();
901 insert_address(rp->remote_addr(), rp->remote_uuid(),
902 remote_addrs_);
903 }
904
905 if (uuids.insert(rp->remote_uuid()).second == false)
906 {
907 // Duplicate entry, drop this one
908 // @todo Deeper inspection about the connection states
909 log_debug << self_string() << " dropping duplicate entry";
910 erase_proto(i);
911 }
912 else
913 {
914 link_map.insert(Link(rp->remote_uuid(),
915 rp->remote_addr(),
916 rp->mcast_addr()));
917 }
918 }
919 }
920
921 /* Send topology change message containing only established
922 * connections */
923 for (ProtoMap::iterator i = proto_map_->begin(); i != proto_map_->end(); ++i)
924 {
925 Proto* gp = ProtoMap::value(i);
926
927 // @todo: a lot of stuff here is done for each connection, including
928 // message creation and serialization. Need a mcast_msg() call
929 // and move this loop in there.
930 if (gp->state() == Proto::S_OK)
931 gp->send_topology_change(link_map);
932 }
933
934 /* Add entries reported by all other nodes to address list to
935 * get complete view of existing uuids/addresses */
936 for (ProtoMap::iterator i = proto_map_->begin(); i != proto_map_->end(); ++i)
937 {
938 Proto* rp = ProtoMap::value(i);
939
940 if (rp->state() == Proto::S_OK)
941 {
942 for (LinkMap::const_iterator j = rp->link_map().begin();
943 j != rp->link_map().end(); ++j)
944 {
945 const UUID& link_uuid(LinkMap::key(j));
946 const std::string& link_addr(LinkMap::value(j).addr());
947 gcomm_assert(link_uuid != UUID::nil() && link_addr != "");
948
949 if (addr_blacklist_.find(link_addr) != addr_blacklist_.end())
950 {
951 log_debug << self_string()
952 << " address '" << link_addr
953 << "' pointing to uuid " << link_uuid
954 << " is blacklisted, skipping";
955 continue;
956 }
957
958 if (link_uuid != uuid() &&
959 remote_addrs_.find(link_addr) == remote_addrs_.end() &&
960 pending_addrs_.find(link_addr) == pending_addrs_.end())
961 {
962 log_debug << self_string()
963 << " conn refers to but no addr in addr list for "
964 << link_addr;
965 insert_address(link_addr, link_uuid, remote_addrs_);
966
967 AddrList::iterator pi(remote_addrs_.find(link_addr));
968
969 assert(pi != remote_addrs_.end());
970
971 AddrEntry& ae(AddrList::value(pi));
972
973 // init retry cnt to -1 to avoid unnecessary logging
974 // at first attempt
975 // max retries will be readjusted in handle stable view
976 ae.set_retry_cnt(-1);
977 ae.set_max_retries(max_initial_reconnect_attempts_);
978
979 // Add some randomness for first reconnect to avoid
980 // simultaneous connects
981 gu::datetime::Date rtime(gu::datetime::Date::monotonic());
982
983 rtime = rtime + ::rand() % (100*gu::datetime::MSec);
984 ae.set_next_reconnect(rtime);
985 next_check_ = std::min(next_check_, rtime);
986 }
987 }
988 }
989 }
990
991 // Build multicast tree
992 log_debug << self_string() << " --- mcast tree begin ---";
993 segment_map_.clear();
994
995 Segment& local_segment(segment_map_[segment_]);
996
997 if (mcast_)
998 {
999 log_debug << mcast_addr_;
1000 local_segment.push_back(RelayEntry(0, mcast_.get()));
1001 }
1002
1003 self_index_ = 0;
1004 for (ProtoMap::const_iterator i(proto_map_->begin()); i != proto_map_->end();
1005 ++i)
1006 {
1007 Proto* p(ProtoMap::value(i));
1008
1009 log_debug << "Proto: " << *p;
1010
1011 if (p->remote_segment() == segment_)
1012 {
1013 if (p->state() == Proto::S_OK &&
1014 (p->mcast_addr() == "" ||
1015 p->mcast_addr() != mcast_addr_))
1016 {
1017 local_segment.push_back(RelayEntry(p, p->socket().get()));
1018 if (p->remote_uuid() < uuid())
1019 {
1020 ++self_index_;
1021 }
1022 }
1023 }
1024 else
1025 {
1026 if (p->state() == Proto::S_OK)
1027 {
1028 Segment& remote_segment(segment_map_[p->remote_segment()]);
1029 remote_segment.push_back(RelayEntry(p, p->socket().get()));
1030 }
1031 }
1032 }
1033 log_debug << self_string() << " self index: " << self_index_;
1034 log_debug << self_string() << " --- mcast tree end ---";
1035 }
1036
1037
reconnect()1038 void gcomm::GMCast::reconnect()
1039 {
1040 if (is_isolated(isolate_))
1041 {
1042 log_debug << "skipping reconnect due to isolation";
1043 return;
1044 }
1045
1046 /* Loop over known remote addresses and connect if proto entry
1047 * does not exist */
1048 gu::datetime::Date now = gu::datetime::Date::monotonic();
1049 AddrList::iterator i, i_next;
1050
1051 for (i = pending_addrs_.begin(); i != pending_addrs_.end(); i = i_next)
1052 {
1053 i_next = i, ++i_next;
1054
1055 const std::string& pending_addr(AddrList::key(i));
1056 const AddrEntry& ae(AddrList::value(i));
1057
1058 if (is_connected (pending_addr, UUID::nil()) == false &&
1059 ae.next_reconnect() <= now)
1060 {
1061 if (ae.retry_cnt() > ae.max_retries())
1062 {
1063 log_info << "cleaning up pending addr " << pending_addr;
1064 pending_addrs_.erase(i);
1065 continue; // no reference to pending_addr after this
1066 }
1067 else if (ae.next_reconnect() <= now)
1068 {
1069 log_debug << "connecting to pending " << pending_addr;
1070 gmcast_connect (pending_addr);
1071 }
1072 }
1073 }
1074
1075 for (i = remote_addrs_.begin(); i != remote_addrs_.end(); i = i_next)
1076 {
1077 i_next = i, ++i_next;
1078
1079 const std::string& remote_addr(AddrList::key(i));
1080 const AddrEntry& ae(AddrList::value(i));
1081 const UUID& remote_uuid(ae.uuid());
1082
1083 gcomm_assert(remote_uuid != uuid());
1084
1085 if (is_connected(remote_addr, remote_uuid) == false &&
1086 ae.next_reconnect() <= now)
1087 {
1088 if (ae.retry_cnt() > ae.max_retries())
1089 {
1090 log_info << " cleaning up " << remote_uuid << " ("
1091 << remote_addr << ")";
1092 remote_addrs_.erase(i);
1093 continue;//no reference to remote_addr or remote_uuid after this
1094 }
1095 else if (ae.next_reconnect() <= now)
1096 {
1097 if (ae.retry_cnt() % 30 == 0)
1098 {
1099 log_info << self_string() << " reconnecting to "
1100 << remote_uuid << " (" << remote_addr
1101 << "), attempt " << ae.retry_cnt();
1102 }
1103
1104 gmcast_connect(remote_addr);
1105 }
1106 else
1107 {
1108 //
1109 }
1110 }
1111 }
1112 }
1113
1114 namespace
1115 {
1116 class CmpUuidCounts
1117 {
1118 public:
CmpUuidCounts(const std::set<gcomm::UUID> & uuids,gcomm::SegmentId preferred_segment)1119 CmpUuidCounts(const std::set<gcomm::UUID>& uuids,
1120 gcomm::SegmentId preferred_segment)
1121 :
1122 uuids_(uuids),
1123 preferred_segment_(preferred_segment)
1124 { }
1125
count(const gcomm::gmcast::Proto * p) const1126 size_t count(const gcomm::gmcast::Proto* p) const
1127 {
1128 size_t cnt(0);
1129 for (std::set<gcomm::UUID>::const_iterator i(uuids_.begin());
1130 i != uuids_.end(); ++i)
1131 {
1132 for (gcomm::gmcast::LinkMap::const_iterator
1133 lm_i(p->link_map().begin());
1134 lm_i != p->link_map().end(); ++lm_i)
1135 {
1136 if (lm_i->uuid() == *i)
1137 {
1138 ++cnt;
1139 break;
1140 }
1141 }
1142 }
1143 return cnt;
1144 }
operator ()(const gcomm::gmcast::Proto * a,const gcomm::gmcast::Proto * b) const1145 bool operator()(const gcomm::gmcast::Proto* a,
1146 const gcomm::gmcast::Proto* b) const
1147 {
1148 size_t ac(count(a));
1149 size_t bc(count(b));
1150 // if counts are equal, prefer peer from the same segment
1151 return (ac < bc ||
1152 (ac == bc && a->remote_segment() != preferred_segment_));
1153 }
1154
1155 private:
1156 const std::set<gcomm::UUID>& uuids_;
1157 gcomm::SegmentId preferred_segment_;
1158 };
1159 }
1160
1161
check_liveness()1162 void gcomm::GMCast::check_liveness()
1163 {
1164 std::set<UUID> live_uuids;
1165
1166 // iterate over proto map and mark all timed out entries as failed
1167 gu::datetime::Date now(gu::datetime::Date::monotonic());
1168 for (ProtoMap::iterator i(proto_map_->begin()); i != proto_map_->end(); )
1169 {
1170 // Store next iterator into temporary, handle_failed() may remove
1171 // the entry proto_map_.
1172 ProtoMap::iterator i_next(i);
1173 ++i_next;
1174 Proto* p(ProtoMap::value(i));
1175 if (p->state() > Proto::S_INIT &&
1176 p->state() < Proto::S_FAILED &&
1177 p->recv_tstamp() + peer_timeout_ < now)
1178 {
1179 gcomm::SocketStats stats(p->socket()->stats());
1180 log_info << self_string()
1181 << " connection to peer "
1182 << p->remote_uuid() << " with addr "
1183 << p->remote_addr()
1184 << " timed out, no messages seen in " << peer_timeout_
1185 << ", socket stats: "
1186 << stats;
1187 p->set_state(Proto::S_FAILED);
1188 handle_failed(p);
1189 }
1190 else if (p->state() == Proto::S_OK)
1191 {
1192 gcomm::SocketStats stats(p->socket()->stats());
1193 if (stats.send_queue_length >= 1024)
1194 {
1195 log_debug << self_string()
1196 << " socket send queue to "
1197 << " peer "
1198 << p->remote_uuid() << " with addr "
1199 << p->remote_addr()
1200 << ", socket stats: "
1201 << stats;
1202 }
1203 if ((p->recv_tstamp() + peer_timeout_*2/3 < now) ||
1204 (p->send_tstamp() + peer_timeout_*1/3 < now))
1205 {
1206 p->send_keepalive();
1207 }
1208
1209 if (p->state() == Proto::S_FAILED)
1210 {
1211 handle_failed(p);
1212 }
1213 else
1214 {
1215 live_uuids.insert(p->remote_uuid());
1216 }
1217 }
1218 i = i_next;
1219 }
1220
1221 bool should_relay(false);
1222
1223 // iterate over addr list and check if there is at least one live
1224 // proto entry associated to each addr entry
1225
1226 std::set<UUID> nonlive_uuids;
1227 std::string nonlive_peers;
1228 for (AddrList::const_iterator i(remote_addrs_.begin());
1229 i != remote_addrs_.end(); ++i)
1230 {
1231 const AddrEntry& ae(AddrList::value(i));
1232 if (ae.retry_cnt() <= ae.max_retries() &&
1233 live_uuids.find(ae.uuid()) == live_uuids.end())
1234 {
1235 // log_info << self_string()
1236 // << " missing live proto entry for " << ae.uuid();
1237 nonlive_uuids.insert(ae.uuid());
1238 nonlive_peers += AddrList::key(i) + " ";
1239 should_relay = true;
1240 }
1241 else if (ae.last_connect() + peer_timeout_ > now)
1242 {
1243 log_debug << "continuing relaying for "
1244 << (ae.last_connect() + peer_timeout_ - now);
1245 should_relay = true;
1246 }
1247 }
1248
1249 if (should_relay == true)
1250 {
1251 if (relaying_ == false)
1252 {
1253 log_info << self_string()
1254 << " turning message relay requesting on, nonlive peers: "
1255 << nonlive_peers;
1256 relaying_ = true;
1257 }
1258 relay_set_.clear();
1259 // build set of protos having OK status
1260 std::set<Proto*> proto_set;
1261 for (ProtoMap::iterator i(proto_map_->begin()); i != proto_map_->end();
1262 ++i)
1263 {
1264 Proto* p(ProtoMap::value(i));
1265 if (p->state() == Proto::S_OK)
1266 {
1267 proto_set.insert(p);
1268 }
1269 }
1270 // find minimal set of proto entries required to reach maximum set
1271 // of nonlive peers
1272 while (nonlive_uuids.empty() == false &&
1273 proto_set.empty() == false)
1274 {
1275 std::set<Proto*>::iterator maxel(
1276 std::max_element(proto_set.begin(),
1277 proto_set.end(), CmpUuidCounts(nonlive_uuids, segment_)));
1278 Proto* p(*maxel);
1279 log_debug << "relay set maxel :" << *p << " count: "
1280 << CmpUuidCounts(nonlive_uuids, segment_).count(p);
1281
1282 relay_set_.insert(RelayEntry(p, p->socket().get()));
1283 const LinkMap& lm(p->link_map());
1284 for (LinkMap::const_iterator lm_i(lm.begin()); lm_i != lm.end();
1285 ++lm_i)
1286 {
1287 nonlive_uuids.erase((*lm_i).uuid());
1288 }
1289 proto_set.erase(maxel);
1290 }
1291 }
1292 else if (relaying_ == true && should_relay == false)
1293 {
1294 log_info << self_string() << " turning message relay requesting off";
1295 relay_set_.clear();
1296 relaying_ = false;
1297 }
1298 }
1299
1300
handle_timers()1301 gu::datetime::Date gcomm::GMCast::handle_timers()
1302 {
1303 const gu::datetime::Date now(gu::datetime::Date::monotonic());
1304
1305 if (now >= next_check_)
1306 {
1307 check_liveness();
1308 reconnect();
1309 next_check_ = now + check_period_;
1310 }
1311
1312 return next_check_;
1313 }
1314
1315
send(const RelayEntry & re,int segment,gcomm::Datagram & dg)1316 void gcomm::GMCast::send(const RelayEntry& re, int segment, gcomm::Datagram& dg)
1317 {
1318 int err;
1319 if ((err = re.socket->send(segment, dg)) != 0)
1320 {
1321 log_debug << "failed to send to " << re.socket->remote_addr()
1322 << ": (" << err << ") " << strerror(err);
1323 }
1324 else if (re.proto)
1325 {
1326 re.proto->set_send_tstamp(gu::datetime::Date::monotonic());
1327 }
1328 }
1329
relay(const Message & msg,const Datagram & dg,const void * exclude_id)1330 void gcomm::GMCast::relay(const Message& msg,
1331 const Datagram& dg,
1332 const void* exclude_id)
1333 {
1334 Datagram relay_dg(dg);
1335 relay_dg.normalize();
1336 Message relay_msg(msg);
1337
1338 // reset all relay flags from message to be relayed
1339 relay_msg.set_flags(relay_msg.flags() &
1340 ~(Message::F_RELAY | Message::F_SEGMENT_RELAY));
1341
1342 // if F_RELAY is set in received message, relay to all peers except
1343 // the originator
1344 if (msg.flags() & Message::F_RELAY)
1345 {
1346 gu_trace(push_header(relay_msg, relay_dg));
1347 for (SegmentMap::iterator segment_i(segment_map_.begin());
1348 segment_i != segment_map_.end(); ++segment_i)
1349 {
1350 Segment& segment(segment_i->second);
1351 for (Segment::iterator target_i(segment.begin());
1352 target_i != segment.end(); ++target_i)
1353 {
1354 if ((*target_i).socket->id() != exclude_id)
1355 {
1356 send(*target_i, msg.segment_id(), relay_dg);
1357 }
1358 }
1359 }
1360 }
1361 else if (msg.flags() & Message::F_SEGMENT_RELAY)
1362 {
1363 if (relay_set_.empty() == false)
1364 {
1365 // send message to all nodes in relay set to reach
1366 // nodes in local segment that are not directly reachable
1367 relay_msg.set_flags(relay_msg.flags() | Message::F_RELAY);
1368 gu_trace(push_header(relay_msg, relay_dg));
1369 for (RelaySet::iterator relay_i(relay_set_.begin());
1370 relay_i != relay_set_.end(); ++relay_i)
1371 {
1372 if ((*relay_i).socket->id() != exclude_id)
1373 {
1374 send(*relay_i, msg.segment_id(), relay_dg);
1375 }
1376 }
1377 gu_trace(pop_header(relay_msg, relay_dg));
1378 relay_msg.set_flags(relay_msg.flags() & ~Message::F_RELAY);
1379 }
1380
1381 if (msg.segment_id() == segment_)
1382 {
1383 log_warn << "message with F_SEGMENT_RELAY from own segment, "
1384 << "source " << msg.source_uuid();
1385 }
1386
1387 // Relay to local segment
1388 gu_trace(push_header(relay_msg, relay_dg));
1389 Segment& segment(segment_map_[segment_]);
1390 for (Segment::iterator i(segment.begin()); i != segment.end(); ++i)
1391 {
1392 send(*i, msg.segment_id(), relay_dg);
1393 }
1394 }
1395 else
1396 {
1397 log_warn << "GMCast::relay() called without relay flags set";
1398 }
1399 }
1400
handle_up(const void * id,const Datagram & dg,const ProtoUpMeta & um)1401 void gcomm::GMCast::handle_up(const void* id,
1402 const Datagram& dg,
1403 const ProtoUpMeta& um)
1404 {
1405 ProtoMap::iterator i;
1406
1407 if (listener_ == 0) { return; }
1408
1409 if (id == listener_->id())
1410 {
1411 gmcast_accept();
1412 }
1413 else if (mcast_ && id == mcast_->id())
1414 {
1415 Message msg;
1416
1417 try
1418 {
1419 if (dg.offset() < dg.header_len())
1420 {
1421 gu_trace(msg.unserialize(dg.header(), dg.header_size(),
1422 dg.header_offset() +
1423 dg.offset()));
1424 }
1425 else
1426 {
1427 gu_trace(msg.unserialize(dg.payload().data(),
1428 dg.len(),
1429 dg.offset()));
1430 }
1431 }
1432 catch (gu::Exception& e)
1433 {
1434 GU_TRACE(e);
1435 log_warn << e.what();
1436 return;
1437 }
1438
1439 if (msg.type() >= Message::GMCAST_T_USER_BASE)
1440 {
1441 gu_trace(send_up(Datagram(dg, dg.offset() + msg.serial_size()),
1442 ProtoUpMeta(msg.source_uuid())));
1443 }
1444 else
1445 {
1446 log_warn << "non-user message " << msg.type()
1447 << " from multicast socket";
1448 }
1449 }
1450 else if ((i = proto_map_->find(id)) != proto_map_->end())
1451 {
1452 Proto* p(ProtoMap::value(i));
1453
1454 if (dg.len() > 0)
1455 {
1456 const Proto::State prev_state(p->state());
1457
1458 if (prev_state == Proto::S_FAILED)
1459 {
1460 log_warn << "unhandled failed proto";
1461 handle_failed(p);
1462 return;
1463 }
1464
1465 Message msg;
1466
1467 try
1468 {
1469 msg.unserialize(dg.payload().data(), dg.len(),
1470 dg.offset());
1471 }
1472 catch (gu::Exception& e)
1473 {
1474 GU_TRACE(e);
1475 log_warn << e.what();
1476 p->set_state(Proto::S_FAILED);
1477 handle_failed(p);
1478 return;
1479 }
1480
1481 if (msg.type() >= Message::GMCAST_T_USER_BASE)
1482 {
1483 if (evict_list().empty() == false &&
1484 evict_list().find(msg.source_uuid()) != evict_list().end())
1485 {
1486 return;
1487 }
1488 if (msg.flags() &
1489 (Message::F_RELAY | Message::F_SEGMENT_RELAY))
1490 {
1491 relay(msg,
1492 Datagram(dg, dg.offset() + msg.serial_size()),
1493 id);
1494 }
1495 p->set_recv_tstamp(gu::datetime::Date::monotonic());
1496 send_up(Datagram(dg, dg.offset() + msg.serial_size()),
1497 ProtoUpMeta(msg.source_uuid()));
1498 return;
1499 }
1500 else
1501 {
1502 try
1503 {
1504 p->set_recv_tstamp(gu::datetime::Date::monotonic());
1505 gu_trace(p->handle_message(msg));
1506 }
1507 catch (const gu::Exception& e)
1508 {
1509 handle_failed(p);
1510 if (e.get_errno() == ENOTRECOVERABLE)
1511 {
1512 throw;
1513 }
1514 log_warn
1515 << "handling gmcast protocol message failed: "
1516 << e.what();
1517 return;
1518 }
1519
1520 if (p->state() == Proto::S_FAILED)
1521 {
1522 handle_failed(p);
1523 return;
1524 }
1525 else if (p->check_changed_and_reset() == true)
1526 {
1527 update_addresses();
1528 check_liveness();
1529 reconnect();
1530 }
1531 }
1532
1533 if (prev_state != Proto::S_OK && p->state() == Proto::S_OK)
1534 {
1535 handle_established(p);
1536 }
1537 }
1538 else if (p->socket()->state() == Socket::S_CONNECTED &&
1539 (p->state() == Proto::S_HANDSHAKE_WAIT ||
1540 p->state() == Proto::S_INIT))
1541 {
1542 handle_connected(p);
1543 }
1544 else if (p->socket()->state() == Socket::S_CONNECTED)
1545 {
1546 log_warn << "connection " << p->socket()->id()
1547 << " closed by peer";
1548 p->set_state(Proto::S_FAILED);
1549 handle_failed(p);
1550 }
1551 else
1552 {
1553 log_debug << "socket in state " << p->socket()->state();
1554 p->set_state(Proto::S_FAILED);
1555 handle_failed(p);
1556 }
1557 }
1558 else
1559 {
1560 // log_info << "proto entry " << id << " not found";
1561 }
1562 }
1563
find_by_remote_uuid(const gcomm::gmcast::ProtoMap & proto_map,const gcomm::UUID & uuid)1564 static gcomm::gmcast::Proto* find_by_remote_uuid(
1565 const gcomm::gmcast::ProtoMap& proto_map,
1566 const gcomm::UUID& uuid)
1567 {
1568 for (gcomm::gmcast::ProtoMap::const_iterator i(proto_map.begin());
1569 i != proto_map.end(); ++i)
1570 {
1571 if (i->second->remote_uuid() == uuid)
1572 {
1573 return i->second;
1574 }
1575 }
1576 return 0;
1577 }
1578
handle_down(Datagram & dg,const ProtoDownMeta & dm)1579 int gcomm::GMCast::handle_down(Datagram& dg, const ProtoDownMeta& dm)
1580 {
1581 Message msg(version_, Message::GMCAST_T_USER_BASE, uuid(), 1, segment_);
1582
1583 // If target is set and proto entry for target is found,
1584 // send a direct message. Otherwise fall back for broadcast
1585 // to ensure message delivery via relay
1586 if (dm.target() != UUID::nil())
1587 {
1588 Proto* target_proto(find_by_remote_uuid(*proto_map_, dm.target()));
1589 if (target_proto && target_proto->state() == Proto::S_OK)
1590 {
1591 gu_trace(push_header(msg, dg));
1592 int err;
1593 if ((err = target_proto->socket()->send(msg.segment_id(), dg)) != 0)
1594 {
1595 log_debug << "failed to send to "
1596 << target_proto->socket()->remote_addr()
1597 << ": (" << err << ") " << strerror(err);
1598 }
1599 else
1600 {
1601 target_proto->set_send_tstamp(gu::datetime::Date::monotonic());
1602 }
1603 gu_trace(pop_header(msg, dg));
1604 if (err == 0)
1605 {
1606 return 0;
1607 }
1608 // In case of error fall back to broadcast
1609 }
1610 else
1611 {
1612 log_debug << "Target " << dm.target() << " proto not found";
1613 }
1614 }
1615
1616 // handle relay set first, skip these peers below
1617 if (relay_set_.empty() == false)
1618 {
1619 msg.set_flags(msg.flags() | Message::F_RELAY);
1620 gu_trace(push_header(msg, dg));
1621 for (RelaySet::iterator ri(relay_set_.begin());
1622 ri != relay_set_.end(); ++ri)
1623 {
1624 send(*ri, msg.segment_id(), dg);
1625 }
1626 gu_trace(pop_header(msg, dg));
1627 msg.set_flags(msg.flags() & ~Message::F_RELAY);
1628 }
1629
1630 for (SegmentMap::iterator si(segment_map_.begin());
1631 si != segment_map_.end(); ++si)
1632 {
1633 uint8_t segment_id(si->first);
1634 Segment& segment(si->second);
1635
1636 if (segment_id != segment_)
1637 {
1638 size_t target_idx((self_index_ + segment_id) % segment.size());
1639 msg.set_flags(msg.flags() | Message::F_SEGMENT_RELAY);
1640 // skip peers that are in relay set
1641 if (relay_set_.empty() == true ||
1642 relay_set_.find(segment[target_idx]) == relay_set_.end())
1643 {
1644 gu_trace(push_header(msg, dg));
1645 send(segment[target_idx], msg.segment_id(), dg);
1646 gu_trace(pop_header(msg, dg));
1647 }
1648 }
1649 else
1650 {
1651 msg.set_flags(msg.flags() & ~Message::F_SEGMENT_RELAY);
1652 gu_trace(push_header(msg, dg));
1653 for (Segment::iterator i(segment.begin());
1654 i != segment.end(); ++i)
1655 {
1656 // skip peers that are in relay set
1657 if (relay_set_.empty() == true ||
1658 relay_set_.find(*i) == relay_set_.end())
1659 {
1660 send(*i, msg.segment_id(), dg);
1661 }
1662 }
1663 gu_trace(pop_header(msg, dg));
1664 }
1665 }
1666
1667 return 0;
1668 }
1669
handle_stable_view(const View & view)1670 void gcomm::GMCast::handle_stable_view(const View& view)
1671 {
1672 log_debug << "GMCast::handle_stable_view: " << view;
1673 if (view.type() == V_PRIM)
1674 {
1675 // discard addr list entries not in view
1676 std::set<UUID> gmcast_lst;
1677 for (AddrList::const_iterator i(remote_addrs_.begin());
1678 i != remote_addrs_.end(); ++i)
1679 {
1680 gmcast_lst.insert(i->second.uuid());
1681 }
1682 std::set<UUID> view_lst;
1683 for (NodeList::const_iterator i(view.members().begin());
1684 i != view.members().end(); ++i)
1685 {
1686 view_lst.insert(i->first);
1687 }
1688 std::list<UUID> diff;
1689 std::set_difference(gmcast_lst.begin(),
1690 gmcast_lst.end(),
1691 view_lst.begin(),
1692 view_lst.end(),
1693 std::back_inserter(diff));
1694
1695 // Forget partitioned entries, allow them to reconnect
1696 // in time_wait_/2. Left nodes are given time_wait_ ban for
1697 // reconnecting when handling V_REG below.
1698 for (std::list<UUID>::const_iterator i(diff.begin());
1699 i != diff.end(); ++i)
1700 {
1701 gmcast_forget(*i, time_wait_/2);
1702 }
1703
1704 // mark nodes in view as stable
1705 for (std::set<UUID>::const_iterator i(view_lst.begin());
1706 i != view_lst.end(); ++i)
1707 {
1708 AddrList::iterator ai;
1709 if ((ai = find_if(remote_addrs_.begin(), remote_addrs_.end(),
1710 AddrListUUIDCmp(*i))) != remote_addrs_.end())
1711 {
1712 ai->second.set_retry_cnt(-1);
1713 ai->second.set_max_retries(max_retry_cnt_);
1714 }
1715 }
1716
1717 // iterate over pending address list and discard entries without UUID
1718 for (AddrList::iterator i(pending_addrs_.begin());
1719 i != pending_addrs_.end(); )
1720 {
1721 AddrList::iterator i_next(i);
1722 ++i_next;
1723 const AddrEntry& ae(AddrList::value(i));
1724 if (ae.uuid() == UUID())
1725 {
1726 const std::string addr(AddrList::key(i));
1727 log_info << "discarding pending addr without UUID: "
1728 << addr;
1729 for (ProtoMap::iterator pi(proto_map_->begin());
1730 pi != proto_map_->end();)
1731 {
1732 ProtoMap::iterator pi_next(pi);
1733 ++pi_next;
1734 Proto* p(ProtoMap::value(pi));
1735 if (p->remote_addr() == addr)
1736 {
1737 log_info << "discarding pending addr proto entry " << p;
1738 erase_proto(pi);
1739 }
1740 pi = pi_next;
1741 }
1742 pending_addrs_.erase(i);
1743 }
1744 i = i_next;
1745 }
1746 prim_view_reached_ = true;
1747 }
1748 else if (view.type() == V_REG)
1749 {
1750 for (NodeList::const_iterator i(view.members().begin());
1751 i != view.members().end(); ++i)
1752 {
1753 AddrList::iterator ai;
1754 if ((ai = find_if(remote_addrs_.begin(), remote_addrs_.end(),
1755 AddrListUUIDCmp(NodeList::key(i))))
1756 != remote_addrs_.end())
1757 {
1758 log_info << "declaring " << NodeList::key(i)
1759 << " at " << handle_get_address(NodeList::key(i))
1760 << " stable";
1761 ai->second.set_retry_cnt(-1);
1762 ai->second.set_max_retries(max_retry_cnt_);
1763 }
1764 }
1765
1766 // Forget left nodes
1767 for (NodeList::const_iterator i(view.left().begin());
1768 i != view.left().end(); ++i)
1769 {
1770 gmcast_forget(NodeList::key(i), time_wait_);
1771 }
1772 }
1773 check_liveness();
1774
1775 for (ProtoMap::const_iterator i(proto_map_->begin()); i != proto_map_->end();
1776 ++i)
1777 {
1778 log_debug << "proto: " << *ProtoMap::value(i);
1779 }
1780 }
1781
1782
handle_evict(const UUID & uuid)1783 void gcomm::GMCast::handle_evict(const UUID& uuid)
1784 {
1785 if (is_evicted(uuid) == true)
1786 {
1787 return;
1788 }
1789 gmcast_forget(uuid, time_wait_);
1790 }
1791
1792
handle_get_address(const UUID & uuid) const1793 std::string gcomm::GMCast::handle_get_address(const UUID& uuid) const
1794 {
1795 AddrList::const_iterator ali(
1796 find_if(remote_addrs_.begin(),
1797 remote_addrs_.end(),
1798 AddrListUUIDCmp(uuid)));
1799 return (ali == remote_addrs_.end() ? "" : AddrList::key(ali));
1800 }
1801
add_or_del_addr(const std::string & val)1802 void gcomm::GMCast::add_or_del_addr(const std::string& val)
1803 {
1804 if (val.compare(0, 4, "add:") == 0)
1805 {
1806 gu::URI uri(val.substr(4));
1807 std::string addr(gu::net::resolve(uri_string(get_scheme(use_ssl_, dynamic_socket_),
1808 uri.get_host(),
1809 uri.get_port())).to_string());
1810 log_info << "inserting address '" << addr << "'";
1811 insert_address(addr, UUID(), remote_addrs_);
1812 AddrList::iterator ai(remote_addrs_.find(addr));
1813 AddrList::value(ai).set_max_retries(
1814 max_initial_reconnect_attempts_);
1815 AddrList::value(ai).set_retry_cnt(-1);
1816 }
1817 else if (val.compare(0, 4, "del:") == 0)
1818 {
1819 std::string addr(val.substr(4));
1820 AddrList::iterator ai(remote_addrs_.find(addr));
1821 if (ai != remote_addrs_.end())
1822 {
1823 ProtoMap::iterator pi, pi_next;
1824 for (pi = proto_map_->begin(); pi != proto_map_->end(); pi = pi_next)
1825 {
1826 pi_next = pi, ++pi_next;
1827 Proto* rp = ProtoMap::value(pi);
1828 if (rp->remote_addr() == AddrList::key(ai))
1829 {
1830 log_info << "deleting entry " << AddrList::key(ai);
1831 erase_proto(pi);
1832 }
1833 }
1834 AddrEntry& ae(AddrList::value(ai));
1835 ae.set_max_retries(0);
1836 ae.set_retry_cnt(1);
1837 ae.set_next_reconnect(gu::datetime::Date::monotonic() + time_wait_);
1838 update_addresses();
1839 }
1840 else
1841 {
1842 log_info << "address '" << addr
1843 << "' not found from remote addrs list";
1844 }
1845 }
1846 else
1847 {
1848 gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1849 }
1850 }
1851
1852
set_param(const std::string & key,const std::string & val,Protolay::sync_param_cb_t & sync_param_cb)1853 bool gcomm::GMCast::set_param(const std::string& key, const std::string& val,
1854 Protolay::sync_param_cb_t& sync_param_cb)
1855 {
1856 try
1857 {
1858 if (key == Conf::GMCastMaxInitialReconnectAttempts)
1859 {
1860 max_initial_reconnect_attempts_ = gu::from_string<int>(val);
1861 return true;
1862 }
1863 else if (key == Conf::GMCastPeerAddr)
1864 {
1865 try
1866 {
1867 add_or_del_addr(val);
1868 }
1869 catch (gu::NotFound& nf)
1870 {
1871 gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1872 }
1873 catch (gu::NotSet& ns)
1874 {
1875 gu_throw_error(EINVAL) << "invalid addr spec '" << val << "'";
1876 }
1877 return true;
1878 }
1879 else if (key == Conf::GMCastIsolate)
1880 {
1881 int tmpval = gu::from_string<int>(val);
1882 if (tmpval < 0 || tmpval > 2)
1883 {
1884 gu_throw_error(EINVAL)
1885 << "invalid value for gmacst.isolate: '"
1886 << tmpval << "'";
1887 }
1888 isolate_ = tmpval;
1889 log_info << "turning isolation "
1890 << (isolate_ == 1 ? "on" :
1891 (isolate_ == 2 ? "force quit" : "off"));
1892 if (isolate_)
1893 {
1894 // delete all entries in proto map
1895 ProtoMap::iterator pi, pi_next;
1896 for (pi = proto_map_->begin(); pi != proto_map_->end();
1897 pi = pi_next)
1898 {
1899 pi_next = pi, ++pi_next;
1900 erase_proto(pi);
1901 }
1902 segment_map_.clear();
1903 }
1904 return true;
1905 }
1906 else if (key == Conf::SocketRecvBufSize)
1907 {
1908 gu_trace(Conf::check_recv_buf_size(val));
1909 conf_.set(key, val);
1910
1911 for (ProtoMap::iterator pi(proto_map_->begin());
1912 pi != proto_map_->end(); ++pi)
1913 {
1914 gu_trace(pi->second->socket()->set_option(key, val));
1915 // erase_proto(pi++);
1916 }
1917 // segment_map_.clear();
1918 // reconnect();
1919 return true;
1920 }
1921 else if (key == Conf::GMCastGroup ||
1922 key == Conf::GMCastListenAddr ||
1923 key == Conf::GMCastMCastAddr ||
1924 key == Conf::GMCastMCastPort ||
1925 key == Conf::GMCastMCastTTL ||
1926 key == Conf::GMCastTimeWait ||
1927 key == Conf::GMCastPeerTimeout ||
1928 key == Conf::GMCastSegment)
1929 {
1930 gu_throw_error(EPERM) << "can't change value during runtime";
1931 }
1932 }
1933 catch (gu::Exception& e)
1934 {
1935 GU_TRACE(e); throw;
1936 }
1937 catch (std::exception& e)
1938 {
1939 gu_throw_error(EINVAL) << e.what();
1940 }
1941 catch (...)
1942 {
1943 gu_throw_error(EINVAL) << "exception";
1944 }
1945
1946 return false;
1947 }
1948