1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "cluster.hpp"
18
19 #include "constants.hpp"
20 #include "dc_aware_policy.hpp"
21 #include "external.hpp"
22 #include "logger.hpp"
23 #include "resolver.hpp"
24 #include "round_robin_policy.hpp"
25 #include "speculative_execution.hpp"
26 #include "utils.hpp"
27
28 using namespace datastax;
29 using namespace datastax::internal::core;
30
31 namespace datastax { namespace internal { namespace core {
32
33 /**
34 * A task for initiating the cluster close process.
35 */
36 class ClusterRunClose : public Task {
37 public:
ClusterRunClose(const Cluster::Ptr & cluster)38 ClusterRunClose(const Cluster::Ptr& cluster)
39 : cluster_(cluster) {}
40
run(EventLoop * event_loop)41 void run(EventLoop* event_loop) { cluster_->internal_close(); }
42
43 private:
44 Cluster::Ptr cluster_;
45 };
46
47 /**
48 * A task for marking a node as UP.
49 */
50 class ClusterNotifyUp : public Task {
51 public:
ClusterNotifyUp(const Cluster::Ptr & cluster,const Address & address)52 ClusterNotifyUp(const Cluster::Ptr& cluster, const Address& address)
53 : cluster_(cluster)
54 , address_(address) {}
55
run(EventLoop * event_loop)56 void run(EventLoop* event_loop) { cluster_->internal_notify_host_up(address_); }
57
58 private:
59 Cluster::Ptr cluster_;
60 Address address_;
61 };
62
63 /**
64 * A task for marking a node as DOWN.
65 */
66 class ClusterNotifyDown : public Task {
67 public:
ClusterNotifyDown(const Cluster::Ptr & cluster,const Address & address)68 ClusterNotifyDown(const Cluster::Ptr& cluster, const Address& address)
69 : cluster_(cluster)
70 , address_(address) {}
71
run(EventLoop * event_loop)72 void run(EventLoop* event_loop) { cluster_->internal_notify_host_down(address_); }
73
74 private:
75 Cluster::Ptr cluster_;
76 Address address_;
77 };
78
79 class ClusterStartEvents : public Task {
80 public:
ClusterStartEvents(const Cluster::Ptr & cluster)81 ClusterStartEvents(const Cluster::Ptr& cluster)
82 : cluster_(cluster) {}
83
run(EventLoop * event_loop)84 void run(EventLoop* event_loop) { cluster_->internal_start_events(); }
85
86 private:
87 Cluster::Ptr cluster_;
88 };
89
90 class ClusterStartClientMonitor : public Task {
91 public:
ClusterStartClientMonitor(const Cluster::Ptr & cluster,const String & client_id,const String & session_id,const Config & config)92 ClusterStartClientMonitor(const Cluster::Ptr& cluster, const String& client_id,
93 const String& session_id, const Config& config)
94 : cluster_(cluster)
95 , client_id_(client_id)
96 , session_id_(session_id)
97 , config_(config) {}
98
run(EventLoop * event_loop)99 void run(EventLoop* event_loop) {
100 cluster_->internal_start_monitor_reporting(client_id_, session_id_, config_);
101 }
102
103 private:
104 Cluster::Ptr cluster_;
105 String client_id_;
106 String session_id_;
107 Config config_;
108 };
109
110 /**
111 * A no operation cluster listener. This is used when a listener is not set.
112 */
113 class NopClusterListener : public ClusterListener {
114 public:
on_connect(Cluster * cluster)115 virtual void on_connect(Cluster* cluster) {}
116
on_host_up(const Host::Ptr & host)117 virtual void on_host_up(const Host::Ptr& host) {}
on_host_down(const Host::Ptr & host)118 virtual void on_host_down(const Host::Ptr& host) {}
119
on_host_added(const Host::Ptr & host)120 virtual void on_host_added(const Host::Ptr& host) {}
on_host_removed(const Host::Ptr & host)121 virtual void on_host_removed(const Host::Ptr& host) {}
122
on_token_map_updated(const TokenMap::Ptr & token_map)123 virtual void on_token_map_updated(const TokenMap::Ptr& token_map) {}
124
on_close(Cluster * cluster)125 virtual void on_close(Cluster* cluster) {}
126 };
127
128 }}} // namespace datastax::internal::core
129
process_event(const ClusterEvent & event,ClusterListener * listener)130 void ClusterEvent::process_event(const ClusterEvent& event, ClusterListener* listener) {
131 switch (event.type) {
132 case HOST_UP:
133 listener->on_host_up(event.host);
134 break;
135 case HOST_DOWN:
136 listener->on_host_down(event.host);
137 break;
138 case HOST_ADD:
139 listener->on_host_added(event.host);
140 break;
141 case HOST_REMOVE:
142 listener->on_host_removed(event.host);
143 break;
144 case HOST_MAYBE_UP:
145 listener->on_host_maybe_up(event.host);
146 break;
147 case HOST_READY:
148 listener->on_host_ready(event.host);
149 break;
150 case TOKEN_MAP_UPDATE:
151 listener->on_token_map_updated(event.token_map);
152 break;
153 }
154 }
155
process_events(const ClusterEvent::Vec & events,ClusterListener * listener)156 void ClusterEvent::process_events(const ClusterEvent::Vec& events, ClusterListener* listener) {
157 for (ClusterEvent::Vec::const_iterator it = events.begin(), end = events.end(); it != end; ++it) {
158 process_event(*it, listener);
159 }
160 }
161
162 static NopClusterListener nop_cluster_listener__;
163
LockedHostMap(const HostMap & hosts)164 LockedHostMap::LockedHostMap(const HostMap& hosts)
165 : hosts_(hosts) {
166 uv_mutex_init(&mutex_);
167 }
168
~LockedHostMap()169 LockedHostMap::~LockedHostMap() { uv_mutex_destroy(&mutex_); }
170
find(const Address & address) const171 LockedHostMap::const_iterator LockedHostMap::find(const Address& address) const {
172 return hosts_.find(address);
173 }
174
get(const Address & address) const175 Host::Ptr LockedHostMap::get(const Address& address) const {
176 ScopedMutex l(&mutex_);
177 const_iterator it = find(address);
178 if (it == end()) return Host::Ptr();
179 return it->second;
180 }
181
erase(const Address & address)182 void LockedHostMap::erase(const Address& address) {
183 ScopedMutex l(&mutex_);
184 hosts_.erase(address);
185 }
186
operator [](const Address & address)187 Host::Ptr& LockedHostMap::operator[](const Address& address) {
188 ScopedMutex l(&mutex_);
189 return hosts_[address];
190 }
191
operator =(const HostMap & hosts)192 LockedHostMap& LockedHostMap::operator=(const HostMap& hosts) {
193 ScopedMutex l(&mutex_);
194 hosts_ = hosts;
195 return *this;
196 }
197
ClusterSettings()198 ClusterSettings::ClusterSettings()
199 : load_balancing_policy(new RoundRobinPolicy())
200 , port(CASS_DEFAULT_PORT)
201 , reconnection_policy(new ExponentialReconnectionPolicy())
202 , prepare_on_up_or_add_host(CASS_DEFAULT_PREPARE_ON_UP_OR_ADD_HOST)
203 , max_prepares_per_flush(CASS_DEFAULT_MAX_PREPARES_PER_FLUSH)
204 , disable_events_on_startup(false)
205 , cluster_metadata_resolver_factory(new DefaultClusterMetadataResolverFactory()) {
206 load_balancing_policies.push_back(load_balancing_policy);
207 }
208
ClusterSettings(const Config & config)209 ClusterSettings::ClusterSettings(const Config& config)
210 : control_connection_settings(config)
211 , load_balancing_policy(config.load_balancing_policy())
212 , load_balancing_policies(config.load_balancing_policies())
213 , port(config.port())
214 , reconnection_policy(config.reconnection_policy())
215 , prepare_on_up_or_add_host(config.prepare_on_up_or_add_host())
216 , max_prepares_per_flush(CASS_DEFAULT_MAX_PREPARES_PER_FLUSH)
217 , disable_events_on_startup(false)
218 , cluster_metadata_resolver_factory(config.cluster_metadata_resolver_factory()) {}
219
Cluster(const ControlConnection::Ptr & connection,ClusterListener * listener,EventLoop * event_loop,const Host::Ptr & connected_host,const HostMap & hosts,const ControlConnectionSchema & schema,const LoadBalancingPolicy::Ptr & load_balancing_policy,const LoadBalancingPolicy::Vec & load_balancing_policies,const String & local_dc,const StringMultimap & supported_options,const ClusterSettings & settings)220 Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* listener,
221 EventLoop* event_loop, const Host::Ptr& connected_host, const HostMap& hosts,
222 const ControlConnectionSchema& schema,
223 const LoadBalancingPolicy::Ptr& load_balancing_policy,
224 const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
225 const StringMultimap& supported_options, const ClusterSettings& settings)
226 : connection_(connection)
227 , listener_(listener ? listener : &nop_cluster_listener__)
228 , event_loop_(event_loop)
229 , load_balancing_policy_(load_balancing_policy)
230 , load_balancing_policies_(load_balancing_policies)
231 , settings_(settings)
232 , is_closing_(false)
233 , connected_host_(connected_host)
234 , hosts_(hosts)
235 , local_dc_(local_dc)
236 , supported_options_(supported_options)
237 , is_recording_events_(settings.disable_events_on_startup) {
238 inc_ref();
239 connection_->set_listener(this);
240
241 query_plan_.reset(load_balancing_policy_->new_query_plan("", NULL, NULL));
242
243 update_schema(schema);
244 update_token_map(hosts, connected_host_->partitioner(), schema);
245
246 listener_->on_reconnect(this);
247 }
248
close()249 void Cluster::close() { event_loop_->add(new ClusterRunClose(Ptr(this))); }
250
notify_host_up(const Address & address)251 void Cluster::notify_host_up(const Address& address) {
252 event_loop_->add(new ClusterNotifyUp(Ptr(this), address));
253 }
254
notify_host_down(const Address & address)255 void Cluster::notify_host_down(const Address& address) {
256 event_loop_->add(new ClusterNotifyDown(Ptr(this), address));
257 }
258
start_events()259 void Cluster::start_events() { event_loop_->add(new ClusterStartEvents(Ptr(this))); }
260
start_monitor_reporting(const String & client_id,const String & session_id,const Config & config)261 void Cluster::start_monitor_reporting(const String& client_id, const String& session_id,
262 const Config& config) {
263 event_loop_->add(new ClusterStartClientMonitor(Ptr(this), client_id, session_id, config));
264 }
265
schema_snapshot()266 Metadata::SchemaSnapshot Cluster::schema_snapshot() { return metadata_.schema_snapshot(); }
267
find_host(const Address & address) const268 Host::Ptr Cluster::find_host(const Address& address) const { return hosts_.get(address); }
269
prepared(const String & id) const270 PreparedMetadata::Entry::Ptr Cluster::prepared(const String& id) const {
271 return prepared_metadata_.get(id);
272 }
273
prepared(const String & id,const PreparedMetadata::Entry::Ptr & entry)274 void Cluster::prepared(const String& id, const PreparedMetadata::Entry::Ptr& entry) {
275 prepared_metadata_.set(id, entry);
276 }
277
available_hosts() const278 HostMap Cluster::available_hosts() const {
279 HostMap available;
280 for (HostMap::const_iterator it = hosts_.begin(), end = hosts_.end(); it != end; ++it) {
281 if (!is_host_ignored(it->second)) {
282 available[it->first] = it->second;
283 }
284 }
285 return available;
286 }
287
set_listener(ClusterListener * listener)288 void Cluster::set_listener(ClusterListener* listener) {
289 listener_ = listener ? listener : &nop_cluster_listener__;
290 }
291
update_hosts(const HostMap & hosts)292 void Cluster::update_hosts(const HostMap& hosts) {
293 // Update the hosts and properly notify the listener
294 HostMap existing(hosts_);
295
296 for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
297 HostMap::iterator find_it = existing.find(it->first);
298 if (find_it != existing.end()) {
299 existing.erase(find_it); // Already exists mark as visited
300 } else {
301 notify_host_add(it->second); // A new host has been added
302 }
303 }
304
305 // Any hosts that existed before, but aren't in the new hosts
306 // need to be marked as removed.
307 for (HostMap::const_iterator it = existing.begin(), end = existing.end(); it != end; ++it) {
308 notify_host_remove(it->first);
309 }
310 }
311
update_schema(const ControlConnectionSchema & schema)312 void Cluster::update_schema(const ControlConnectionSchema& schema) {
313 metadata_.clear_and_update_back(connection_->server_version());
314
315 if (schema.keyspaces) {
316 metadata_.update_keyspaces(schema.keyspaces.get(), false);
317 }
318
319 if (schema.tables) {
320 metadata_.update_tables(schema.tables.get());
321 }
322
323 if (schema.views) {
324 metadata_.update_views(schema.views.get());
325 }
326
327 if (schema.columns) {
328 metadata_.update_columns(schema.columns.get());
329 }
330
331 if (schema.indexes) {
332 metadata_.update_indexes(schema.indexes.get());
333 }
334
335 if (schema.user_types) {
336 metadata_.update_user_types(schema.user_types.get());
337 }
338
339 if (schema.functions) {
340 metadata_.update_functions(schema.functions.get());
341 }
342
343 if (schema.aggregates) {
344 metadata_.update_aggregates(schema.aggregates.get());
345 }
346
347 if (schema.virtual_keyspaces) {
348 metadata_.update_keyspaces(schema.virtual_keyspaces.get(), true);
349 }
350
351 if (schema.virtual_tables) {
352 metadata_.update_tables(schema.virtual_tables.get());
353 }
354
355 if (schema.virtual_columns) {
356 metadata_.update_columns(schema.virtual_columns.get());
357 }
358
359 metadata_.swap_to_back_and_update_front();
360 }
361
update_token_map(const HostMap & hosts,const String & partitioner,const ControlConnectionSchema & schema)362 void Cluster::update_token_map(const HostMap& hosts, const String& partitioner,
363 const ControlConnectionSchema& schema) {
364 if (settings_.control_connection_settings.use_token_aware_routing && schema.keyspaces) {
365 // Create a new token map and populate it
366 token_map_ = TokenMap::from_partitioner(partitioner);
367 if (!token_map_) {
368 return; // Partition is not supported
369 }
370 token_map_->add_keyspaces(connection_->server_version(), schema.keyspaces.get());
371 for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
372 token_map_->add_host(it->second);
373 }
374 token_map_->build();
375 }
376 }
377
378 // All hosts from the cluster are included in the host map and in the load
379 // balancing policies (LBP) so that LBPs return the correct host distance (esp.
380 // important for DC-aware). This method prevents connection pools from being
381 // created to ignored hosts.
is_host_ignored(const Host::Ptr & host) const382 bool Cluster::is_host_ignored(const Host::Ptr& host) const {
383 return core::is_host_ignored(load_balancing_policies_, host);
384 }
385
schedule_reconnect()386 void Cluster::schedule_reconnect() {
387 if (!reconnection_schedule_) {
388 reconnection_schedule_.reset(settings_.reconnection_policy->new_reconnection_schedule());
389 }
390 uint64_t delay_ms = reconnection_schedule_->next_delay_ms();
391 if (delay_ms > 0) {
392 timer_.start(connection_->loop(), delay_ms,
393 bind_callback(&Cluster::on_schedule_reconnect, this));
394 } else {
395 handle_schedule_reconnect();
396 }
397 }
398
on_schedule_reconnect(Timer * timer)399 void Cluster::on_schedule_reconnect(Timer* timer) { handle_schedule_reconnect(); }
400
handle_schedule_reconnect()401 void Cluster::handle_schedule_reconnect() {
402 const Host::Ptr& host = query_plan_->compute_next();
403 if (host) {
404 reconnector_.reset(new ControlConnector(host, connection_->protocol_version(),
405 bind_callback(&Cluster::on_reconnect, this)));
406 reconnector_->with_settings(settings_.control_connection_settings)
407 ->connect(connection_->loop());
408 } else {
409 // No more hosts, refresh the query plan and schedule a re-connection
410 LOG_TRACE("Control connection query plan has no more hosts. "
411 "Reset query plan and schedule reconnect");
412 query_plan_.reset(load_balancing_policy_->new_query_plan("", NULL, NULL));
413 schedule_reconnect();
414 }
415 }
416
on_reconnect(ControlConnector * connector)417 void Cluster::on_reconnect(ControlConnector* connector) {
418 reconnector_.reset();
419 if (is_closing_) {
420 handle_close();
421 return;
422 }
423
424 if (connector->is_ok()) {
425 connection_ = connector->release_connection();
426 connection_->set_listener(this);
427
428 // Incrementally update the hosts (notifying the listener)
429 update_hosts(connector->hosts());
430
431 // Get the newly connected host
432 connected_host_ = hosts_[connection_->address()];
433 assert(connected_host_ && "Connected host not found in hosts map");
434
435 update_schema(connector->schema());
436 update_token_map(connector->hosts(), connected_host_->partitioner(), connector->schema());
437
438 // Notify the listener that we've built a new token map
439 if (token_map_) {
440 notify_or_record(ClusterEvent(token_map_));
441 }
442
443 LOG_INFO("Control connection connected to %s", connected_host_->address_string().c_str());
444
445 listener_->on_reconnect(this);
446 reconnection_schedule_.reset();
447 } else if (!connector->is_canceled()) {
448 LOG_ERROR(
449 "Unable to reestablish a control connection to host %s because of the following error: %s",
450 connector->address().to_string().c_str(), connector->error_message().c_str());
451 schedule_reconnect();
452 }
453 }
454
internal_close()455 void Cluster::internal_close() {
456 is_closing_ = true;
457 monitor_reporting_timer_.stop();
458 if (timer_.is_running()) {
459 timer_.stop();
460 handle_close();
461 } else if (reconnector_) {
462 reconnector_->cancel();
463 } else if (connection_) {
464 connection_->close();
465 }
466 }
467
handle_close()468 void Cluster::handle_close() {
469 for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
470 end = load_balancing_policies_.end();
471 it != end; ++it) {
472 (*it)->close_handles();
473 }
474 connection_.reset();
475 listener_->on_close(this);
476 dec_ref();
477 }
478
internal_notify_host_up(const Address & address)479 void Cluster::internal_notify_host_up(const Address& address) {
480 LockedHostMap::const_iterator it = hosts_.find(address);
481
482 if (it == hosts_.end()) {
483 LOG_WARN("Attempting to mark host %s that we don't have as UP", address.to_string().c_str());
484 return;
485 }
486
487 Host::Ptr host(it->second);
488
489 if (load_balancing_policy_->is_host_up(address)) {
490 // Already marked up so don't repeat duplicate notifications.
491 if (!is_host_ignored(host)) {
492 notify_or_record(ClusterEvent(ClusterEvent::HOST_READY, host));
493 }
494 return;
495 }
496
497 for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
498 end = load_balancing_policies_.end();
499 it != end; ++it) {
500 (*it)->on_host_up(host);
501 }
502
503 if (is_host_ignored(host)) {
504 return; // Ignore host
505 }
506
507 if (!prepare_host(host, bind_callback(&Cluster::on_prepare_host_up, Cluster::Ptr(this)))) {
508 notify_host_up_after_prepare(host);
509 }
510 }
511
notify_host_up_after_prepare(const Host::Ptr & host)512 void Cluster::notify_host_up_after_prepare(const Host::Ptr& host) {
513 notify_or_record(ClusterEvent(ClusterEvent::HOST_READY, host));
514 notify_or_record(ClusterEvent(ClusterEvent::HOST_UP, host));
515 }
516
internal_notify_host_down(const Address & address)517 void Cluster::internal_notify_host_down(const Address& address) {
518 LockedHostMap::const_iterator it = hosts_.find(address);
519
520 if (it == hosts_.end()) {
521 // Using DEBUG level here because this can happen normally as the result of
522 // a remove event.
523 LOG_DEBUG("Attempting to mark host %s that we don't have as DOWN", address.to_string().c_str());
524 return;
525 }
526
527 Host::Ptr host(it->second);
528
529 if (!load_balancing_policy_->is_host_up(address)) {
530 // Already marked down so don't repeat duplicate notifications.
531 return;
532 }
533
534 for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
535 end = load_balancing_policies_.end();
536 it != end; ++it) {
537 (*it)->on_host_down(address);
538 }
539
540 notify_or_record(ClusterEvent(ClusterEvent::HOST_DOWN, host));
541 }
542
internal_start_events()543 void Cluster::internal_start_events() {
544 // Ignore if closing or already processed events
545 if (!is_closing_ && is_recording_events_) {
546 is_recording_events_ = false;
547 ClusterEvent::process_events(recorded_events_, listener_);
548 recorded_events_.clear();
549 }
550 }
551
internal_start_monitor_reporting(const String & client_id,const String & session_id,const Config & config)552 void Cluster::internal_start_monitor_reporting(const String& client_id, const String& session_id,
553 const Config& config) {
554 monitor_reporting_.reset(create_monitor_reporting(client_id, session_id, config));
555
556 if (!is_closing_ && monitor_reporting_->interval_ms(connection_->dse_server_version()) > 0) {
557 monitor_reporting_->send_startup_message(connection_->connection(), config, available_hosts(),
558 load_balancing_policies_);
559 monitor_reporting_timer_.start(
560 event_loop_->loop(), monitor_reporting_->interval_ms(connection_->dse_server_version()),
561 bind_callback(&Cluster::on_monitor_reporting, this));
562 }
563 }
564
on_monitor_reporting(Timer * timer)565 void Cluster::on_monitor_reporting(Timer* timer) {
566 if (!is_closing_) {
567 monitor_reporting_->send_status_message(connection_->connection(), available_hosts());
568 monitor_reporting_timer_.start(
569 event_loop_->loop(), monitor_reporting_->interval_ms(connection_->dse_server_version()),
570 bind_callback(&Cluster::on_monitor_reporting, this));
571 }
572 }
573
notify_host_add(const Host::Ptr & host)574 void Cluster::notify_host_add(const Host::Ptr& host) {
575 LockedHostMap::const_iterator host_it = hosts_.find(host->address());
576
577 if (host_it != hosts_.end()) {
578 LOG_WARN("Attempting to add host %s that we already have", host->address_string().c_str());
579 // If an entry already exists then notify that the node has been removed
580 // then re-add it.
581 for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
582 end = load_balancing_policies_.end();
583 it != end; ++it) {
584 (*it)->on_host_removed(host_it->second);
585 }
586 notify_or_record(ClusterEvent(ClusterEvent::HOST_REMOVE, host));
587 }
588
589 hosts_[host->address()] = host;
590 for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
591 end = load_balancing_policies_.end();
592 it != end; ++it) {
593 (*it)->on_host_added(host);
594 }
595
596 if (is_host_ignored(host)) {
597 return; // Ignore host
598 }
599
600 if (!prepare_host(host, bind_callback(&Cluster::on_prepare_host_add, Cluster::Ptr(this)))) {
601 notify_host_add_after_prepare(host);
602 }
603 }
604
notify_host_add_after_prepare(const Host::Ptr & host)605 void Cluster::notify_host_add_after_prepare(const Host::Ptr& host) {
606 if (token_map_) {
607 token_map_ = token_map_->copy();
608 token_map_->update_host_and_build(host);
609 notify_or_record(ClusterEvent(token_map_));
610 }
611 notify_or_record(ClusterEvent(ClusterEvent::HOST_ADD, host));
612 }
613
notify_host_remove(const Address & address)614 void Cluster::notify_host_remove(const Address& address) {
615 LockedHostMap::const_iterator it = hosts_.find(address);
616
617 if (it == hosts_.end()) {
618 LOG_WARN("Attempting removing host %s that we don't have", address.to_string().c_str());
619 return;
620 }
621
622 Host::Ptr host(it->second);
623
624 if (token_map_) {
625 token_map_ = token_map_->copy();
626 token_map_->remove_host_and_build(host);
627 notify_or_record(ClusterEvent(token_map_));
628 }
629
630 // If not marked down yet then explicitly trigger the event.
631 if (load_balancing_policy_->is_host_up(address)) {
632 notify_or_record(ClusterEvent(ClusterEvent::HOST_DOWN, host));
633 }
634
635 hosts_.erase(address);
636 for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
637 end = load_balancing_policies_.end();
638 it != end; ++it) {
639 (*it)->on_host_removed(host);
640 }
641
642 notify_or_record(ClusterEvent(ClusterEvent::HOST_REMOVE, host));
643 }
644
notify_or_record(const ClusterEvent & event)645 void Cluster::notify_or_record(const ClusterEvent& event) {
646 if (is_recording_events_) {
647 recorded_events_.push_back(event);
648 } else {
649 ClusterEvent::process_event(event, listener_);
650 }
651 }
652
prepare_host(const Host::Ptr & host,const PrepareHostHandler::Callback & callback)653 bool Cluster::prepare_host(const Host::Ptr& host, const PrepareHostHandler::Callback& callback) {
654 if (connection_ && settings_.prepare_on_up_or_add_host) {
655 PrepareHostHandler::Ptr prepare_host_handler(
656 new PrepareHostHandler(host, prepared_metadata_.copy(), callback,
657 connection_->protocol_version(), settings_.max_prepares_per_flush));
658
659 prepare_host_handler->prepare(connection_->loop(),
660 settings_.control_connection_settings.connection_settings);
661 return true;
662 }
663 return false;
664 }
665
on_prepare_host_add(const PrepareHostHandler * handler)666 void Cluster::on_prepare_host_add(const PrepareHostHandler* handler) {
667 notify_host_add_after_prepare(handler->host());
668 }
669
on_prepare_host_up(const PrepareHostHandler * handler)670 void Cluster::on_prepare_host_up(const PrepareHostHandler* handler) {
671 notify_host_up_after_prepare(handler->host());
672 }
673
on_update_schema(SchemaType type,const ResultResponse::Ptr & result,const String & keyspace_name,const String & target_name)674 void Cluster::on_update_schema(SchemaType type, const ResultResponse::Ptr& result,
675 const String& keyspace_name, const String& target_name) {
676 switch (type) {
677 case KEYSPACE:
678 // Virtual keyspaces are not updated (always false)
679 metadata_.update_keyspaces(result.get(), false);
680 if (token_map_) {
681 token_map_ = token_map_->copy();
682 token_map_->update_keyspaces_and_build(connection_->server_version(), result.get());
683 notify_or_record(ClusterEvent(token_map_));
684 }
685 break;
686 case TABLE:
687 metadata_.update_tables(result.get());
688 break;
689 case VIEW:
690 metadata_.update_views(result.get());
691 break;
692 case COLUMN:
693 metadata_.update_columns(result.get());
694 break;
695 case INDEX:
696 metadata_.update_indexes(result.get());
697 break;
698 case USER_TYPE:
699 metadata_.update_user_types(result.get());
700 break;
701 case FUNCTION:
702 metadata_.update_functions(result.get());
703 break;
704 case AGGREGATE:
705 metadata_.update_aggregates(result.get());
706 break;
707 }
708 }
709
on_drop_schema(SchemaType type,const String & keyspace_name,const String & target_name)710 void Cluster::on_drop_schema(SchemaType type, const String& keyspace_name,
711 const String& target_name) {
712 switch (type) {
713 case KEYSPACE:
714 metadata_.drop_keyspace(keyspace_name);
715 if (token_map_) {
716 token_map_ = token_map_->copy();
717 token_map_->drop_keyspace(keyspace_name);
718 notify_or_record(ClusterEvent(token_map_));
719 }
720 break;
721 case TABLE:
722 metadata_.drop_table_or_view(keyspace_name, target_name);
723 break;
724 case VIEW:
725 metadata_.drop_table_or_view(keyspace_name, target_name);
726 break;
727 case USER_TYPE:
728 metadata_.drop_user_type(keyspace_name, target_name);
729 break;
730 case FUNCTION:
731 metadata_.drop_function(keyspace_name, target_name);
732 break;
733 case AGGREGATE:
734 metadata_.drop_aggregate(keyspace_name, target_name);
735 break;
736 default:
737 break;
738 }
739 }
740
on_up(const Address & address)741 void Cluster::on_up(const Address& address) {
742 LockedHostMap::const_iterator it = hosts_.find(address);
743
744 if (it == hosts_.end()) {
745 LOG_WARN("Received UP event for an unknown host %s", address.to_string().c_str());
746 return;
747 }
748
749 notify_or_record(ClusterEvent(ClusterEvent::HOST_MAYBE_UP, it->second));
750 }
751
on_down(const Address & address)752 void Cluster::on_down(const Address& address) {
753 // Ignore down events from the control connection. Use the method
754 // `notify_host_down()` to trigger the DOWN status.
755 }
756
on_add(const Host::Ptr & host)757 void Cluster::on_add(const Host::Ptr& host) { notify_host_add(host); }
758
on_remove(const Address & address)759 void Cluster::on_remove(const Address& address) { notify_host_remove(address); }
760
on_close(ControlConnection * connection)761 void Cluster::on_close(ControlConnection* connection) {
762 if (!is_closing_) {
763 LOG_WARN("Lost control connection to host %s", connection_->address_string().c_str());
764 schedule_reconnect();
765 } else {
766 handle_close();
767 }
768 }
769