1 /* 2 Copyright (c) DataStax, Inc. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef DATASTAX_INTERNAL_CLUSTER_HPP 18 #define DATASTAX_INTERNAL_CLUSTER_HPP 19 20 #include "config.hpp" 21 #include "control_connector.hpp" 22 #include "event_loop.hpp" 23 #include "external.hpp" 24 #include "metadata.hpp" 25 #include "monitor_reporting.hpp" 26 #include "prepare_host_handler.hpp" 27 #include "prepared.hpp" 28 29 #include <uv.h> 30 31 namespace datastax { namespace internal { namespace core { 32 33 class Cluster; 34 35 class LockedHostMap { 36 public: 37 typedef HostMap::iterator iterator; 38 typedef HostMap::const_iterator const_iterator; 39 40 LockedHostMap(const HostMap& hosts); 41 ~LockedHostMap(); 42 operator const HostMap&() const43 operator const HostMap&() const { return hosts_; } begin() const44 const_iterator begin() const { return hosts_.begin(); } end() const45 const_iterator end() const { return hosts_.end(); } 46 47 const_iterator find(const Address& address) const; 48 49 Host::Ptr get(const Address& address) const; 50 51 void erase(const Address& address); 52 53 Host::Ptr& operator[](const Address& address); 54 LockedHostMap& operator=(const HostMap& hosts); 55 56 private: 57 mutable uv_mutex_t mutex_; 58 HostMap hosts_; 59 }; 60 61 /** 62 * A listener that handles token map updates. 63 */ 64 class TokenMapListener { 65 public: ~TokenMapListener()66 virtual ~TokenMapListener() {} 67 68 /** 69 * A callback that's called when the token map has changed. This happens as 70 * a result of the token map being rebuilt which can happen if keyspace metadata 71 * has changed or if node is added/removed from a cluster. 72 * 73 * @param token_map The updated token map. 74 */ 75 virtual void on_token_map_updated(const TokenMap::Ptr& token_map) = 0; 76 }; 77 78 /** 79 * A listener that handles cluster events. 80 */ 81 class ClusterListener 82 : public HostListener 83 , public TokenMapListener { 84 public: 85 typedef Vector<ClusterListener*> Vec; 86 ~ClusterListener()87 virtual ~ClusterListener() {} 88 89 /** 90 * A callback that's called when the control connection receives an up event. 91 * It means that the host might be available to handle queries, but not 92 * necessarily. 93 * 94 * @param host A host that may be available. 95 */ on_host_maybe_up(const Host::Ptr & host)96 virtual void on_host_maybe_up(const Host::Ptr& host) {} 97 98 /** 99 * A callback that's called as the result of `Cluster::notify_host_up()`. 100 * It's *always* called for a valid (not ignored) host that's ready to 101 * receive queries. The ready state means the host has had any previously 102 * prepared queries setup on the newly available server. If the host was 103 * previously ready the callback is just called. 104 * 105 * @param host A host that's ready to receive queries. 106 */ on_host_ready(const Host::Ptr & host)107 virtual void on_host_ready(const Host::Ptr& host) {} 108 109 /** 110 * A callback that's called when the cluster connects or reconnects to a host. 111 * 112 * Note: This is mostly for testing. 113 * 114 * @param cluster The cluster object. 115 */ on_reconnect(Cluster * cluster)116 virtual void on_reconnect(Cluster* cluster) {} 117 118 /** 119 * A callback that's called when the cluster has closed. 120 * 121 * @param cluster The cluster object. 122 */ 123 virtual void on_close(Cluster* cluster) = 0; 124 }; 125 126 /** 127 * A class for recording host and token map events so they can be replayed. 128 */ 129 struct ClusterEvent { 130 typedef Vector<ClusterEvent> Vec; 131 enum Type { 132 HOST_UP, 133 HOST_DOWN, 134 HOST_ADD, 135 HOST_REMOVE, 136 HOST_MAYBE_UP, 137 HOST_READY, 138 TOKEN_MAP_UPDATE 139 }; 140 ClusterEventdatastax::internal::core::ClusterEvent141 ClusterEvent(Type type, const Host::Ptr& host) 142 : type(type) 143 , host(host) {} 144 ClusterEventdatastax::internal::core::ClusterEvent145 ClusterEvent(const TokenMap::Ptr& token_map) 146 : type(TOKEN_MAP_UPDATE) 147 , token_map(token_map) {} 148 149 static void process_event(const ClusterEvent& event, ClusterListener* listener); 150 static void process_events(const Vec& events, ClusterListener* listener); 151 152 Type type; 153 Host::Ptr host; 154 TokenMap::Ptr token_map; 155 }; 156 157 /** 158 * Cluster settings. 159 */ 160 struct ClusterSettings { 161 /** 162 * Constructor. Initialize with default settings. 163 */ 164 ClusterSettings(); 165 166 /** 167 * Constructor. Initialize with a config object. 168 * 169 * @param config The config object. 170 */ 171 ClusterSettings(const Config& config); 172 173 /** 174 * The settings for the underlying control connection. 175 */ 176 ControlConnectionSettings control_connection_settings; 177 178 /** 179 * The load balancing policy to use for reconnecting the control 180 * connection. 181 */ 182 LoadBalancingPolicy::Ptr load_balancing_policy; 183 184 /** 185 * Load balancing policies for all profiles. 186 */ 187 LoadBalancingPolicy::Vec load_balancing_policies; 188 189 /** 190 * The port to use for the contact points. This setting is spread to 191 * the other hosts using the contact point hosts. 192 */ 193 int port; 194 195 /** 196 * Reconnection policy to use when attempting to reconnect the control connection. 197 */ 198 ReconnectionPolicy::Ptr reconnection_policy; 199 200 /** 201 * If true then cached prepared statements are prepared when a host is brought 202 * up or is added. 203 */ 204 bool prepare_on_up_or_add_host; 205 206 /** 207 * Max number of requests to be written out to the socket per write system call. 208 */ 209 unsigned max_prepares_per_flush; 210 211 /** 212 * If true then events are disabled on startup. Events can be explicitly 213 * started by calling `Cluster::start_events()`. 214 */ 215 bool disable_events_on_startup; 216 217 /** 218 * A factory for creating cluster metadata resolvers. A cluster metadata resolver is used to 219 * determine contact points and retrieve other metadata required to connect the 220 * cluster. 221 */ 222 ClusterMetadataResolverFactory::Ptr cluster_metadata_resolver_factory; 223 }; 224 225 /** 226 * A cluster connection. This wraps and maintains a control connection to a 227 * cluster. If a host in the cluster fails then it re-establishes a new control 228 * connection to a different host. A cluster will never close without an 229 * explicit call to close because it repeatedly tries to re-establish its 230 * connection even if no hosts are available. 231 */ 232 class Cluster 233 : public RefCounted<Cluster> 234 , public ControlConnectionListener { 235 public: 236 typedef SharedRefPtr<Cluster> Ptr; 237 238 /** 239 * Constructor. Don't use directly. 240 * 241 * @param connection The current control connection. 242 * @param listener A listener to handle cluster events. 243 * @param event_loop The event loop. 244 * @param connected_host The currently connected host. 245 * @param hosts Available hosts for the cluster (based on load balancing 246 * policies). 247 * @param schema Current schema metadata. 248 * @param load_balancing_policy The default load balancing policy to use for 249 * determining the next control connection host. 250 * @param load_balancing_policies 251 * @param local_dc The local datacenter determined by the metadata service for initializing the 252 * load balancing policies. 253 * @param supported_options Supported options discovered during control connection. 254 * @param settings The control connection settings to use for reconnecting the 255 * control connection. 256 */ 257 Cluster(const ControlConnection::Ptr& connection, ClusterListener* listener, 258 EventLoop* event_loop, const Host::Ptr& connected_host, const HostMap& hosts, 259 const ControlConnectionSchema& schema, 260 const LoadBalancingPolicy::Ptr& load_balancing_policy, 261 const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc, 262 const StringMultimap& supported_options, const ClusterSettings& settings); 263 264 /** 265 * Set the listener that will handle events for the cluster 266 * (*NOT* thread-safe). 267 * 268 * @param listener The cluster listener. 269 */ 270 void set_listener(ClusterListener* listener = NULL); 271 272 /** 273 * Close the current connection and stop the re-connection process (thread-safe). 274 */ 275 void close(); 276 277 /** 278 * Notify that a node has been determined to be available via an external 279 * source (thread-safe). 280 * 281 * @param address The address of the host that is now available. 282 */ 283 void notify_host_up(const Address& address); 284 285 /** 286 * Notify that a node has been determined to be down via an external source. 287 * DOWN events from the control connection are ignored so it is up to other 288 * sources to determine a host is unavailable (thread-safe). 289 * 290 * @param address That address of the host that is now unavailable. 291 */ 292 void notify_host_down(const Address& address); 293 294 /** 295 * Start host and token map events. Events that occurred during startup will be 296 * replayed (thread-safe). 297 */ 298 void start_events(); 299 300 /** 301 * Start the client monitor events (thread-safe). 302 * 303 * @param client_id Client ID associated with the session. 304 * @param session_id Session ID associated with the session. 305 * @param config The config object. 306 */ 307 void start_monitor_reporting(const String& client_id, const String& session_id, 308 const Config& config); 309 310 /** 311 * Get the latest snapshot of the schema metadata (thread-safe). 312 * 313 * @return A schema metadata snapshot. 314 */ 315 Metadata::SchemaSnapshot schema_snapshot(); 316 317 /** 318 * Look up a host by address (thread-safe). 319 * 320 * @param address The address of the host. 321 * @return The host object for the specified address or a null object pointer 322 * if the host doesn't exist. 323 */ 324 Host::Ptr find_host(const Address& address) const; 325 326 /** 327 * Get a prepared metadata entry for a prepared ID (thread-safe). 328 * 329 * @param id A prepared ID 330 * @return The prepare metadata object for the specified ID or a null object 331 * pointer if the entry doesn't exist. 332 */ 333 PreparedMetadata::Entry::Ptr prepared(const String& id) const; 334 335 /** 336 * Set the prepared metadata for a given prepared ID (thread-safe). 337 * 338 * @param id A prepared ID. 339 * @param entry A prepared metadata entry. 340 */ 341 void prepared(const String& id, const PreparedMetadata::Entry::Ptr& entry); 342 343 /** 344 * Get available hosts (determined by host distance). This filters out ignored 345 * hosts (*NOT* thread-safe). 346 * 347 * @return A mapping of available hosts. 348 */ 349 HostMap available_hosts() const; 350 351 public: protocol_version() const352 ProtocolVersion protocol_version() const { return connection_->protocol_version(); } connected_host() const353 const Host::Ptr& connected_host() const { return connected_host_; } token_map() const354 const TokenMap::Ptr& token_map() const { return token_map_; } local_dc() const355 const String& local_dc() const { return local_dc_; } dse_server_version() const356 const VersionNumber& dse_server_version() const { return connection_->dse_server_version(); } supported_options() const357 const StringMultimap& supported_options() const { return supported_options_; } 358 359 private: 360 friend class ClusterRunClose; 361 friend class ClusterNotifyUp; 362 friend class ClusterNotifyDown; 363 friend class ClusterStartEvents; 364 friend class ClusterStartClientMonitor; 365 366 private: 367 void update_hosts(const HostMap& hosts); 368 void update_schema(const ControlConnectionSchema& schema); 369 void update_token_map(const HostMap& hosts, const String& partitioner, 370 const ControlConnectionSchema& schema); 371 372 bool is_host_ignored(const Host::Ptr& host) const; 373 374 void schedule_reconnect(); 375 376 void on_schedule_reconnect(Timer* timer); 377 void handle_schedule_reconnect(); 378 379 void on_reconnect(ControlConnector* connector); 380 381 private: 382 void internal_close(); 383 void handle_close(); 384 385 void internal_notify_host_up(const Address& address); 386 void notify_host_up_after_prepare(const Host::Ptr& host); 387 388 void internal_notify_host_down(const Address& address); 389 390 void internal_start_events(); 391 void internal_start_monitor_reporting(const String& client_id, const String& session_id, 392 const Config& config); 393 394 void on_monitor_reporting(Timer* timer); 395 396 void notify_host_add(const Host::Ptr& host); 397 void notify_host_add_after_prepare(const Host::Ptr& host); 398 399 void notify_host_remove(const Address& address); 400 401 private: 402 void notify_or_record(const ClusterEvent& event); 403 404 private: 405 bool prepare_host(const Host::Ptr& host, const PrepareHostHandler::Callback& callback); 406 407 void on_prepare_host_add(const PrepareHostHandler* handler); 408 void on_prepare_host_up(const PrepareHostHandler* handler); 409 410 private: 411 // Control connection listener methods 412 413 virtual void on_update_schema(SchemaType type, const ResultResponse::Ptr& result, 414 const String& keyspace_name, const String& target_name); 415 416 virtual void on_drop_schema(SchemaType type, const String& keyspace_name, 417 const String& target_name); 418 419 virtual void on_up(const Address& address); 420 virtual void on_down(const Address& address); 421 422 virtual void on_add(const Host::Ptr& host); 423 virtual void on_remove(const Address& address); 424 425 virtual void on_close(ControlConnection* connection); 426 427 private: 428 ControlConnection::Ptr connection_; 429 ControlConnector::Ptr reconnector_; 430 ClusterListener* listener_; 431 EventLoop* const event_loop_; 432 const LoadBalancingPolicy::Ptr load_balancing_policy_; 433 LoadBalancingPolicy::Vec load_balancing_policies_; 434 const ClusterSettings settings_; 435 ScopedPtr<QueryPlan> query_plan_; 436 bool is_closing_; 437 Host::Ptr connected_host_; 438 LockedHostMap hosts_; 439 Metadata metadata_; 440 PreparedMetadata prepared_metadata_; 441 TokenMap::Ptr token_map_; 442 String local_dc_; 443 StringMultimap supported_options_; 444 Timer timer_; 445 bool is_recording_events_; 446 ClusterEvent::Vec recorded_events_; 447 ScopedPtr<MonitorReporting> monitor_reporting_; 448 Timer monitor_reporting_timer_; 449 ScopedPtr<ReconnectionSchedule> reconnection_schedule_; 450 }; 451 452 }}} // namespace datastax::internal::core 453 454 #endif 455