1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #include "cluster.hpp"
18 
19 #include "constants.hpp"
20 #include "dc_aware_policy.hpp"
21 #include "external.hpp"
22 #include "logger.hpp"
23 #include "resolver.hpp"
24 #include "round_robin_policy.hpp"
25 #include "speculative_execution.hpp"
26 #include "utils.hpp"
27 
28 using namespace datastax;
29 using namespace datastax::internal::core;
30 
31 namespace datastax { namespace internal { namespace core {
32 
33 /**
34  * A task for initiating the cluster close process.
35  */
36 class ClusterRunClose : public Task {
37 public:
ClusterRunClose(const Cluster::Ptr & cluster)38   ClusterRunClose(const Cluster::Ptr& cluster)
39       : cluster_(cluster) {}
40 
run(EventLoop * event_loop)41   void run(EventLoop* event_loop) { cluster_->internal_close(); }
42 
43 private:
44   Cluster::Ptr cluster_;
45 };
46 
47 /**
48  * A task for marking a node as UP.
49  */
50 class ClusterNotifyUp : public Task {
51 public:
ClusterNotifyUp(const Cluster::Ptr & cluster,const Address & address)52   ClusterNotifyUp(const Cluster::Ptr& cluster, const Address& address)
53       : cluster_(cluster)
54       , address_(address) {}
55 
run(EventLoop * event_loop)56   void run(EventLoop* event_loop) { cluster_->internal_notify_host_up(address_); }
57 
58 private:
59   Cluster::Ptr cluster_;
60   Address address_;
61 };
62 
63 /**
64  * A task for marking a node as DOWN.
65  */
66 class ClusterNotifyDown : public Task {
67 public:
ClusterNotifyDown(const Cluster::Ptr & cluster,const Address & address)68   ClusterNotifyDown(const Cluster::Ptr& cluster, const Address& address)
69       : cluster_(cluster)
70       , address_(address) {}
71 
run(EventLoop * event_loop)72   void run(EventLoop* event_loop) { cluster_->internal_notify_host_down(address_); }
73 
74 private:
75   Cluster::Ptr cluster_;
76   Address address_;
77 };
78 
79 class ClusterStartEvents : public Task {
80 public:
ClusterStartEvents(const Cluster::Ptr & cluster)81   ClusterStartEvents(const Cluster::Ptr& cluster)
82       : cluster_(cluster) {}
83 
run(EventLoop * event_loop)84   void run(EventLoop* event_loop) { cluster_->internal_start_events(); }
85 
86 private:
87   Cluster::Ptr cluster_;
88 };
89 
90 class ClusterStartClientMonitor : public Task {
91 public:
ClusterStartClientMonitor(const Cluster::Ptr & cluster,const String & client_id,const String & session_id,const Config & config)92   ClusterStartClientMonitor(const Cluster::Ptr& cluster, const String& client_id,
93                             const String& session_id, const Config& config)
94       : cluster_(cluster)
95       , client_id_(client_id)
96       , session_id_(session_id)
97       , config_(config) {}
98 
run(EventLoop * event_loop)99   void run(EventLoop* event_loop) {
100     cluster_->internal_start_monitor_reporting(client_id_, session_id_, config_);
101   }
102 
103 private:
104   Cluster::Ptr cluster_;
105   String client_id_;
106   String session_id_;
107   Config config_;
108 };
109 
110 /**
111  * A no operation cluster listener. This is used when a listener is not set.
112  */
113 class NopClusterListener : public ClusterListener {
114 public:
on_connect(Cluster * cluster)115   virtual void on_connect(Cluster* cluster) {}
116 
on_host_up(const Host::Ptr & host)117   virtual void on_host_up(const Host::Ptr& host) {}
on_host_down(const Host::Ptr & host)118   virtual void on_host_down(const Host::Ptr& host) {}
119 
on_host_added(const Host::Ptr & host)120   virtual void on_host_added(const Host::Ptr& host) {}
on_host_removed(const Host::Ptr & host)121   virtual void on_host_removed(const Host::Ptr& host) {}
122 
on_token_map_updated(const TokenMap::Ptr & token_map)123   virtual void on_token_map_updated(const TokenMap::Ptr& token_map) {}
124 
on_close(Cluster * cluster)125   virtual void on_close(Cluster* cluster) {}
126 };
127 
128 }}} // namespace datastax::internal::core
129 
process_event(const ClusterEvent & event,ClusterListener * listener)130 void ClusterEvent::process_event(const ClusterEvent& event, ClusterListener* listener) {
131   switch (event.type) {
132     case HOST_UP:
133       listener->on_host_up(event.host);
134       break;
135     case HOST_DOWN:
136       listener->on_host_down(event.host);
137       break;
138     case HOST_ADD:
139       listener->on_host_added(event.host);
140       break;
141     case HOST_REMOVE:
142       listener->on_host_removed(event.host);
143       break;
144     case HOST_MAYBE_UP:
145       listener->on_host_maybe_up(event.host);
146       break;
147     case HOST_READY:
148       listener->on_host_ready(event.host);
149       break;
150     case TOKEN_MAP_UPDATE:
151       listener->on_token_map_updated(event.token_map);
152       break;
153   }
154 }
155 
process_events(const ClusterEvent::Vec & events,ClusterListener * listener)156 void ClusterEvent::process_events(const ClusterEvent::Vec& events, ClusterListener* listener) {
157   for (ClusterEvent::Vec::const_iterator it = events.begin(), end = events.end(); it != end; ++it) {
158     process_event(*it, listener);
159   }
160 }
161 
162 static NopClusterListener nop_cluster_listener__;
163 
LockedHostMap(const HostMap & hosts)164 LockedHostMap::LockedHostMap(const HostMap& hosts)
165     : hosts_(hosts) {
166   uv_mutex_init(&mutex_);
167 }
168 
~LockedHostMap()169 LockedHostMap::~LockedHostMap() { uv_mutex_destroy(&mutex_); }
170 
find(const Address & address) const171 LockedHostMap::const_iterator LockedHostMap::find(const Address& address) const {
172   return hosts_.find(address);
173 }
174 
get(const Address & address) const175 Host::Ptr LockedHostMap::get(const Address& address) const {
176   ScopedMutex l(&mutex_);
177   const_iterator it = find(address);
178   if (it == end()) return Host::Ptr();
179   return it->second;
180 }
181 
erase(const Address & address)182 void LockedHostMap::erase(const Address& address) {
183   ScopedMutex l(&mutex_);
184   hosts_.erase(address);
185 }
186 
operator [](const Address & address)187 Host::Ptr& LockedHostMap::operator[](const Address& address) {
188   ScopedMutex l(&mutex_);
189   return hosts_[address];
190 }
191 
operator =(const HostMap & hosts)192 LockedHostMap& LockedHostMap::operator=(const HostMap& hosts) {
193   ScopedMutex l(&mutex_);
194   hosts_ = hosts;
195   return *this;
196 }
197 
ClusterSettings()198 ClusterSettings::ClusterSettings()
199     : load_balancing_policy(new RoundRobinPolicy())
200     , port(CASS_DEFAULT_PORT)
201     , reconnection_policy(new ExponentialReconnectionPolicy())
202     , prepare_on_up_or_add_host(CASS_DEFAULT_PREPARE_ON_UP_OR_ADD_HOST)
203     , max_prepares_per_flush(CASS_DEFAULT_MAX_PREPARES_PER_FLUSH)
204     , disable_events_on_startup(false)
205     , cluster_metadata_resolver_factory(new DefaultClusterMetadataResolverFactory()) {
206   load_balancing_policies.push_back(load_balancing_policy);
207 }
208 
ClusterSettings(const Config & config)209 ClusterSettings::ClusterSettings(const Config& config)
210     : control_connection_settings(config)
211     , load_balancing_policy(config.load_balancing_policy())
212     , load_balancing_policies(config.load_balancing_policies())
213     , port(config.port())
214     , reconnection_policy(config.reconnection_policy())
215     , prepare_on_up_or_add_host(config.prepare_on_up_or_add_host())
216     , max_prepares_per_flush(CASS_DEFAULT_MAX_PREPARES_PER_FLUSH)
217     , disable_events_on_startup(false)
218     , cluster_metadata_resolver_factory(config.cluster_metadata_resolver_factory()) {}
219 
Cluster(const ControlConnection::Ptr & connection,ClusterListener * listener,EventLoop * event_loop,const Host::Ptr & connected_host,const HostMap & hosts,const ControlConnectionSchema & schema,const LoadBalancingPolicy::Ptr & load_balancing_policy,const LoadBalancingPolicy::Vec & load_balancing_policies,const String & local_dc,const StringMultimap & supported_options,const ClusterSettings & settings)220 Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* listener,
221                  EventLoop* event_loop, const Host::Ptr& connected_host, const HostMap& hosts,
222                  const ControlConnectionSchema& schema,
223                  const LoadBalancingPolicy::Ptr& load_balancing_policy,
224                  const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
225                  const StringMultimap& supported_options, const ClusterSettings& settings)
226     : connection_(connection)
227     , listener_(listener ? listener : &nop_cluster_listener__)
228     , event_loop_(event_loop)
229     , load_balancing_policy_(load_balancing_policy)
230     , load_balancing_policies_(load_balancing_policies)
231     , settings_(settings)
232     , is_closing_(false)
233     , connected_host_(connected_host)
234     , hosts_(hosts)
235     , local_dc_(local_dc)
236     , supported_options_(supported_options)
237     , is_recording_events_(settings.disable_events_on_startup) {
238   inc_ref();
239   connection_->set_listener(this);
240 
241   query_plan_.reset(load_balancing_policy_->new_query_plan("", NULL, NULL));
242 
243   update_schema(schema);
244   update_token_map(hosts, connected_host_->partitioner(), schema);
245 
246   listener_->on_reconnect(this);
247 }
248 
close()249 void Cluster::close() { event_loop_->add(new ClusterRunClose(Ptr(this))); }
250 
notify_host_up(const Address & address)251 void Cluster::notify_host_up(const Address& address) {
252   event_loop_->add(new ClusterNotifyUp(Ptr(this), address));
253 }
254 
notify_host_down(const Address & address)255 void Cluster::notify_host_down(const Address& address) {
256   event_loop_->add(new ClusterNotifyDown(Ptr(this), address));
257 }
258 
start_events()259 void Cluster::start_events() { event_loop_->add(new ClusterStartEvents(Ptr(this))); }
260 
start_monitor_reporting(const String & client_id,const String & session_id,const Config & config)261 void Cluster::start_monitor_reporting(const String& client_id, const String& session_id,
262                                       const Config& config) {
263   event_loop_->add(new ClusterStartClientMonitor(Ptr(this), client_id, session_id, config));
264 }
265 
schema_snapshot()266 Metadata::SchemaSnapshot Cluster::schema_snapshot() { return metadata_.schema_snapshot(); }
267 
find_host(const Address & address) const268 Host::Ptr Cluster::find_host(const Address& address) const { return hosts_.get(address); }
269 
prepared(const String & id) const270 PreparedMetadata::Entry::Ptr Cluster::prepared(const String& id) const {
271   return prepared_metadata_.get(id);
272 }
273 
prepared(const String & id,const PreparedMetadata::Entry::Ptr & entry)274 void Cluster::prepared(const String& id, const PreparedMetadata::Entry::Ptr& entry) {
275   prepared_metadata_.set(id, entry);
276 }
277 
available_hosts() const278 HostMap Cluster::available_hosts() const {
279   HostMap available;
280   for (HostMap::const_iterator it = hosts_.begin(), end = hosts_.end(); it != end; ++it) {
281     if (!is_host_ignored(it->second)) {
282       available[it->first] = it->second;
283     }
284   }
285   return available;
286 }
287 
set_listener(ClusterListener * listener)288 void Cluster::set_listener(ClusterListener* listener) {
289   listener_ = listener ? listener : &nop_cluster_listener__;
290 }
291 
update_hosts(const HostMap & hosts)292 void Cluster::update_hosts(const HostMap& hosts) {
293   // Update the hosts and properly notify the listener
294   HostMap existing(hosts_);
295 
296   for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
297     HostMap::iterator find_it = existing.find(it->first);
298     if (find_it != existing.end()) {
299       existing.erase(find_it); // Already exists mark as visited
300     } else {
301       notify_host_add(it->second); // A new host has been added
302     }
303   }
304 
305   // Any hosts that existed before, but aren't in the new hosts
306   // need to be marked as removed.
307   for (HostMap::const_iterator it = existing.begin(), end = existing.end(); it != end; ++it) {
308     notify_host_remove(it->first);
309   }
310 }
311 
update_schema(const ControlConnectionSchema & schema)312 void Cluster::update_schema(const ControlConnectionSchema& schema) {
313   metadata_.clear_and_update_back(connection_->server_version());
314 
315   if (schema.keyspaces) {
316     metadata_.update_keyspaces(schema.keyspaces.get(), false);
317   }
318 
319   if (schema.tables) {
320     metadata_.update_tables(schema.tables.get());
321   }
322 
323   if (schema.views) {
324     metadata_.update_views(schema.views.get());
325   }
326 
327   if (schema.columns) {
328     metadata_.update_columns(schema.columns.get());
329   }
330 
331   if (schema.indexes) {
332     metadata_.update_indexes(schema.indexes.get());
333   }
334 
335   if (schema.user_types) {
336     metadata_.update_user_types(schema.user_types.get());
337   }
338 
339   if (schema.functions) {
340     metadata_.update_functions(schema.functions.get());
341   }
342 
343   if (schema.aggregates) {
344     metadata_.update_aggregates(schema.aggregates.get());
345   }
346 
347   if (schema.virtual_keyspaces) {
348     metadata_.update_keyspaces(schema.virtual_keyspaces.get(), true);
349   }
350 
351   if (schema.virtual_tables) {
352     metadata_.update_tables(schema.virtual_tables.get());
353   }
354 
355   if (schema.virtual_columns) {
356     metadata_.update_columns(schema.virtual_columns.get());
357   }
358 
359   metadata_.swap_to_back_and_update_front();
360 }
361 
update_token_map(const HostMap & hosts,const String & partitioner,const ControlConnectionSchema & schema)362 void Cluster::update_token_map(const HostMap& hosts, const String& partitioner,
363                                const ControlConnectionSchema& schema) {
364   if (settings_.control_connection_settings.use_token_aware_routing && schema.keyspaces) {
365     // Create a new token map and populate it
366     token_map_ = TokenMap::from_partitioner(partitioner);
367     if (!token_map_) {
368       return; // Partition is not supported
369     }
370     token_map_->add_keyspaces(connection_->server_version(), schema.keyspaces.get());
371     for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
372       token_map_->add_host(it->second);
373     }
374     token_map_->build();
375   }
376 }
377 
378 // All hosts from the cluster are included in the host map and in the load
379 // balancing policies (LBP) so that LBPs return the correct host distance (esp.
380 // important for DC-aware). This method prevents connection pools from being
381 // created to ignored hosts.
is_host_ignored(const Host::Ptr & host) const382 bool Cluster::is_host_ignored(const Host::Ptr& host) const {
383   return core::is_host_ignored(load_balancing_policies_, host);
384 }
385 
schedule_reconnect()386 void Cluster::schedule_reconnect() {
387   if (!reconnection_schedule_) {
388     reconnection_schedule_.reset(settings_.reconnection_policy->new_reconnection_schedule());
389   }
390   uint64_t delay_ms = reconnection_schedule_->next_delay_ms();
391   if (delay_ms > 0) {
392     timer_.start(connection_->loop(), delay_ms,
393                  bind_callback(&Cluster::on_schedule_reconnect, this));
394   } else {
395     handle_schedule_reconnect();
396   }
397 }
398 
on_schedule_reconnect(Timer * timer)399 void Cluster::on_schedule_reconnect(Timer* timer) { handle_schedule_reconnect(); }
400 
handle_schedule_reconnect()401 void Cluster::handle_schedule_reconnect() {
402   const Host::Ptr& host = query_plan_->compute_next();
403   if (host) {
404     reconnector_.reset(new ControlConnector(host, connection_->protocol_version(),
405                                             bind_callback(&Cluster::on_reconnect, this)));
406     reconnector_->with_settings(settings_.control_connection_settings)
407         ->connect(connection_->loop());
408   } else {
409     // No more hosts, refresh the query plan and schedule a re-connection
410     LOG_TRACE("Control connection query plan has no more hosts. "
411               "Reset query plan and schedule reconnect");
412     query_plan_.reset(load_balancing_policy_->new_query_plan("", NULL, NULL));
413     schedule_reconnect();
414   }
415 }
416 
on_reconnect(ControlConnector * connector)417 void Cluster::on_reconnect(ControlConnector* connector) {
418   reconnector_.reset();
419   if (is_closing_) {
420     handle_close();
421     return;
422   }
423 
424   if (connector->is_ok()) {
425     connection_ = connector->release_connection();
426     connection_->set_listener(this);
427 
428     // Incrementally update the hosts  (notifying the listener)
429     update_hosts(connector->hosts());
430 
431     // Get the newly connected host
432     connected_host_ = hosts_[connection_->address()];
433     assert(connected_host_ && "Connected host not found in hosts map");
434 
435     update_schema(connector->schema());
436     update_token_map(connector->hosts(), connected_host_->partitioner(), connector->schema());
437 
438     // Notify the listener that we've built a new token map
439     if (token_map_) {
440       notify_or_record(ClusterEvent(token_map_));
441     }
442 
443     LOG_INFO("Control connection connected to %s", connected_host_->address_string().c_str());
444 
445     listener_->on_reconnect(this);
446     reconnection_schedule_.reset();
447   } else if (!connector->is_canceled()) {
448     LOG_ERROR(
449         "Unable to reestablish a control connection to host %s because of the following error: %s",
450         connector->address().to_string().c_str(), connector->error_message().c_str());
451     schedule_reconnect();
452   }
453 }
454 
internal_close()455 void Cluster::internal_close() {
456   is_closing_ = true;
457   monitor_reporting_timer_.stop();
458   if (timer_.is_running()) {
459     timer_.stop();
460     handle_close();
461   } else if (reconnector_) {
462     reconnector_->cancel();
463   } else if (connection_) {
464     connection_->close();
465   }
466 }
467 
handle_close()468 void Cluster::handle_close() {
469   for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
470                                                 end = load_balancing_policies_.end();
471        it != end; ++it) {
472     (*it)->close_handles();
473   }
474   connection_.reset();
475   listener_->on_close(this);
476   dec_ref();
477 }
478 
internal_notify_host_up(const Address & address)479 void Cluster::internal_notify_host_up(const Address& address) {
480   LockedHostMap::const_iterator it = hosts_.find(address);
481 
482   if (it == hosts_.end()) {
483     LOG_WARN("Attempting to mark host %s that we don't have as UP", address.to_string().c_str());
484     return;
485   }
486 
487   Host::Ptr host(it->second);
488 
489   if (load_balancing_policy_->is_host_up(address)) {
490     // Already marked up so don't repeat duplicate notifications.
491     if (!is_host_ignored(host)) {
492       notify_or_record(ClusterEvent(ClusterEvent::HOST_READY, host));
493     }
494     return;
495   }
496 
497   for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
498                                                 end = load_balancing_policies_.end();
499        it != end; ++it) {
500     (*it)->on_host_up(host);
501   }
502 
503   if (is_host_ignored(host)) {
504     return; // Ignore host
505   }
506 
507   if (!prepare_host(host, bind_callback(&Cluster::on_prepare_host_up, Cluster::Ptr(this)))) {
508     notify_host_up_after_prepare(host);
509   }
510 }
511 
notify_host_up_after_prepare(const Host::Ptr & host)512 void Cluster::notify_host_up_after_prepare(const Host::Ptr& host) {
513   notify_or_record(ClusterEvent(ClusterEvent::HOST_READY, host));
514   notify_or_record(ClusterEvent(ClusterEvent::HOST_UP, host));
515 }
516 
internal_notify_host_down(const Address & address)517 void Cluster::internal_notify_host_down(const Address& address) {
518   LockedHostMap::const_iterator it = hosts_.find(address);
519 
520   if (it == hosts_.end()) {
521     // Using DEBUG level here because this can happen normally as the result of
522     // a remove event.
523     LOG_DEBUG("Attempting to mark host %s that we don't have as DOWN", address.to_string().c_str());
524     return;
525   }
526 
527   Host::Ptr host(it->second);
528 
529   if (!load_balancing_policy_->is_host_up(address)) {
530     // Already marked down so don't repeat duplicate notifications.
531     return;
532   }
533 
534   for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
535                                                 end = load_balancing_policies_.end();
536        it != end; ++it) {
537     (*it)->on_host_down(address);
538   }
539 
540   notify_or_record(ClusterEvent(ClusterEvent::HOST_DOWN, host));
541 }
542 
internal_start_events()543 void Cluster::internal_start_events() {
544   // Ignore if closing or already processed events
545   if (!is_closing_ && is_recording_events_) {
546     is_recording_events_ = false;
547     ClusterEvent::process_events(recorded_events_, listener_);
548     recorded_events_.clear();
549   }
550 }
551 
internal_start_monitor_reporting(const String & client_id,const String & session_id,const Config & config)552 void Cluster::internal_start_monitor_reporting(const String& client_id, const String& session_id,
553                                                const Config& config) {
554   monitor_reporting_.reset(create_monitor_reporting(client_id, session_id, config));
555 
556   if (!is_closing_ && monitor_reporting_->interval_ms(connection_->dse_server_version()) > 0) {
557     monitor_reporting_->send_startup_message(connection_->connection(), config, available_hosts(),
558                                              load_balancing_policies_);
559     monitor_reporting_timer_.start(
560         event_loop_->loop(), monitor_reporting_->interval_ms(connection_->dse_server_version()),
561         bind_callback(&Cluster::on_monitor_reporting, this));
562   }
563 }
564 
on_monitor_reporting(Timer * timer)565 void Cluster::on_monitor_reporting(Timer* timer) {
566   if (!is_closing_) {
567     monitor_reporting_->send_status_message(connection_->connection(), available_hosts());
568     monitor_reporting_timer_.start(
569         event_loop_->loop(), monitor_reporting_->interval_ms(connection_->dse_server_version()),
570         bind_callback(&Cluster::on_monitor_reporting, this));
571   }
572 }
573 
notify_host_add(const Host::Ptr & host)574 void Cluster::notify_host_add(const Host::Ptr& host) {
575   LockedHostMap::const_iterator host_it = hosts_.find(host->address());
576 
577   if (host_it != hosts_.end()) {
578     LOG_WARN("Attempting to add host %s that we already have", host->address_string().c_str());
579     // If an entry already exists then notify that the node has been removed
580     // then re-add it.
581     for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
582                                                   end = load_balancing_policies_.end();
583          it != end; ++it) {
584       (*it)->on_host_removed(host_it->second);
585     }
586     notify_or_record(ClusterEvent(ClusterEvent::HOST_REMOVE, host));
587   }
588 
589   hosts_[host->address()] = host;
590   for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
591                                                 end = load_balancing_policies_.end();
592        it != end; ++it) {
593     (*it)->on_host_added(host);
594   }
595 
596   if (is_host_ignored(host)) {
597     return; // Ignore host
598   }
599 
600   if (!prepare_host(host, bind_callback(&Cluster::on_prepare_host_add, Cluster::Ptr(this)))) {
601     notify_host_add_after_prepare(host);
602   }
603 }
604 
notify_host_add_after_prepare(const Host::Ptr & host)605 void Cluster::notify_host_add_after_prepare(const Host::Ptr& host) {
606   if (token_map_) {
607     token_map_ = token_map_->copy();
608     token_map_->update_host_and_build(host);
609     notify_or_record(ClusterEvent(token_map_));
610   }
611   notify_or_record(ClusterEvent(ClusterEvent::HOST_ADD, host));
612 }
613 
notify_host_remove(const Address & address)614 void Cluster::notify_host_remove(const Address& address) {
615   LockedHostMap::const_iterator it = hosts_.find(address);
616 
617   if (it == hosts_.end()) {
618     LOG_WARN("Attempting removing host %s that we don't have", address.to_string().c_str());
619     return;
620   }
621 
622   Host::Ptr host(it->second);
623 
624   if (token_map_) {
625     token_map_ = token_map_->copy();
626     token_map_->remove_host_and_build(host);
627     notify_or_record(ClusterEvent(token_map_));
628   }
629 
630   // If not marked down yet then explicitly trigger the event.
631   if (load_balancing_policy_->is_host_up(address)) {
632     notify_or_record(ClusterEvent(ClusterEvent::HOST_DOWN, host));
633   }
634 
635   hosts_.erase(address);
636   for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
637                                                 end = load_balancing_policies_.end();
638        it != end; ++it) {
639     (*it)->on_host_removed(host);
640   }
641 
642   notify_or_record(ClusterEvent(ClusterEvent::HOST_REMOVE, host));
643 }
644 
notify_or_record(const ClusterEvent & event)645 void Cluster::notify_or_record(const ClusterEvent& event) {
646   if (is_recording_events_) {
647     recorded_events_.push_back(event);
648   } else {
649     ClusterEvent::process_event(event, listener_);
650   }
651 }
652 
prepare_host(const Host::Ptr & host,const PrepareHostHandler::Callback & callback)653 bool Cluster::prepare_host(const Host::Ptr& host, const PrepareHostHandler::Callback& callback) {
654   if (connection_ && settings_.prepare_on_up_or_add_host) {
655     PrepareHostHandler::Ptr prepare_host_handler(
656         new PrepareHostHandler(host, prepared_metadata_.copy(), callback,
657                                connection_->protocol_version(), settings_.max_prepares_per_flush));
658 
659     prepare_host_handler->prepare(connection_->loop(),
660                                   settings_.control_connection_settings.connection_settings);
661     return true;
662   }
663   return false;
664 }
665 
on_prepare_host_add(const PrepareHostHandler * handler)666 void Cluster::on_prepare_host_add(const PrepareHostHandler* handler) {
667   notify_host_add_after_prepare(handler->host());
668 }
669 
on_prepare_host_up(const PrepareHostHandler * handler)670 void Cluster::on_prepare_host_up(const PrepareHostHandler* handler) {
671   notify_host_up_after_prepare(handler->host());
672 }
673 
on_update_schema(SchemaType type,const ResultResponse::Ptr & result,const String & keyspace_name,const String & target_name)674 void Cluster::on_update_schema(SchemaType type, const ResultResponse::Ptr& result,
675                                const String& keyspace_name, const String& target_name) {
676   switch (type) {
677     case KEYSPACE:
678       // Virtual keyspaces are not updated (always false)
679       metadata_.update_keyspaces(result.get(), false);
680       if (token_map_) {
681         token_map_ = token_map_->copy();
682         token_map_->update_keyspaces_and_build(connection_->server_version(), result.get());
683         notify_or_record(ClusterEvent(token_map_));
684       }
685       break;
686     case TABLE:
687       metadata_.update_tables(result.get());
688       break;
689     case VIEW:
690       metadata_.update_views(result.get());
691       break;
692     case COLUMN:
693       metadata_.update_columns(result.get());
694       break;
695     case INDEX:
696       metadata_.update_indexes(result.get());
697       break;
698     case USER_TYPE:
699       metadata_.update_user_types(result.get());
700       break;
701     case FUNCTION:
702       metadata_.update_functions(result.get());
703       break;
704     case AGGREGATE:
705       metadata_.update_aggregates(result.get());
706       break;
707   }
708 }
709 
on_drop_schema(SchemaType type,const String & keyspace_name,const String & target_name)710 void Cluster::on_drop_schema(SchemaType type, const String& keyspace_name,
711                              const String& target_name) {
712   switch (type) {
713     case KEYSPACE:
714       metadata_.drop_keyspace(keyspace_name);
715       if (token_map_) {
716         token_map_ = token_map_->copy();
717         token_map_->drop_keyspace(keyspace_name);
718         notify_or_record(ClusterEvent(token_map_));
719       }
720       break;
721     case TABLE:
722       metadata_.drop_table_or_view(keyspace_name, target_name);
723       break;
724     case VIEW:
725       metadata_.drop_table_or_view(keyspace_name, target_name);
726       break;
727     case USER_TYPE:
728       metadata_.drop_user_type(keyspace_name, target_name);
729       break;
730     case FUNCTION:
731       metadata_.drop_function(keyspace_name, target_name);
732       break;
733     case AGGREGATE:
734       metadata_.drop_aggregate(keyspace_name, target_name);
735       break;
736     default:
737       break;
738   }
739 }
740 
on_up(const Address & address)741 void Cluster::on_up(const Address& address) {
742   LockedHostMap::const_iterator it = hosts_.find(address);
743 
744   if (it == hosts_.end()) {
745     LOG_WARN("Received UP event for an unknown host %s", address.to_string().c_str());
746     return;
747   }
748 
749   notify_or_record(ClusterEvent(ClusterEvent::HOST_MAYBE_UP, it->second));
750 }
751 
on_down(const Address & address)752 void Cluster::on_down(const Address& address) {
753   // Ignore down events from the control connection. Use the method
754   // `notify_host_down()` to trigger the DOWN status.
755 }
756 
on_add(const Host::Ptr & host)757 void Cluster::on_add(const Host::Ptr& host) { notify_host_add(host); }
758 
on_remove(const Address & address)759 void Cluster::on_remove(const Address& address) { notify_host_remove(address); }
760 
on_close(ControlConnection * connection)761 void Cluster::on_close(ControlConnection* connection) {
762   if (!is_closing_) {
763     LOG_WARN("Lost control connection to host %s", connection_->address_string().c_str());
764     schedule_reconnect();
765   } else {
766     handle_close();
767   }
768 }
769