1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifndef DATASTAX_INTERNAL_CLUSTER_HPP
18 #define DATASTAX_INTERNAL_CLUSTER_HPP
19 
20 #include "config.hpp"
21 #include "control_connector.hpp"
22 #include "event_loop.hpp"
23 #include "external.hpp"
24 #include "metadata.hpp"
25 #include "monitor_reporting.hpp"
26 #include "prepare_host_handler.hpp"
27 #include "prepared.hpp"
28 
29 #include <uv.h>
30 
31 namespace datastax { namespace internal { namespace core {
32 
33 class Cluster;
34 
35 class LockedHostMap {
36 public:
37   typedef HostMap::iterator iterator;
38   typedef HostMap::const_iterator const_iterator;
39 
40   LockedHostMap(const HostMap& hosts);
41   ~LockedHostMap();
42 
operator const HostMap&() const43   operator const HostMap&() const { return hosts_; }
begin() const44   const_iterator begin() const { return hosts_.begin(); }
end() const45   const_iterator end() const { return hosts_.end(); }
46 
47   const_iterator find(const Address& address) const;
48 
49   Host::Ptr get(const Address& address) const;
50 
51   void erase(const Address& address);
52 
53   Host::Ptr& operator[](const Address& address);
54   LockedHostMap& operator=(const HostMap& hosts);
55 
56 private:
57   mutable uv_mutex_t mutex_;
58   HostMap hosts_;
59 };
60 
61 /**
62  * A listener that handles token map updates.
63  */
64 class TokenMapListener {
65 public:
~TokenMapListener()66   virtual ~TokenMapListener() {}
67 
68   /**
69    * A callback that's called when the token map has changed. This happens as
70    * a result of the token map being rebuilt which can happen if keyspace metadata
71    * has changed or if node is added/removed from a cluster.
72    *
73    * @param token_map The updated token map.
74    */
75   virtual void on_token_map_updated(const TokenMap::Ptr& token_map) = 0;
76 };
77 
78 /**
79  * A listener that handles cluster events.
80  */
81 class ClusterListener
82     : public HostListener
83     , public TokenMapListener {
84 public:
85   typedef Vector<ClusterListener*> Vec;
86 
~ClusterListener()87   virtual ~ClusterListener() {}
88 
89   /**
90    * A callback that's called when the control connection receives an up event.
91    * It means that the host might be available to handle queries, but not
92    * necessarily.
93    *
94    * @param host A host that may be available.
95    */
on_host_maybe_up(const Host::Ptr & host)96   virtual void on_host_maybe_up(const Host::Ptr& host) {}
97 
98   /**
99    * A callback that's called as the result of `Cluster::notify_host_up()`.
100    * It's *always* called for a valid (not ignored) host that's ready to
101    * receive queries. The ready state means the host has had any previously
102    * prepared queries setup on the newly available server. If the host was
103    * previously ready the callback is just called.
104    *
105    * @param host A host that's ready to receive queries.
106    */
on_host_ready(const Host::Ptr & host)107   virtual void on_host_ready(const Host::Ptr& host) {}
108 
109   /**
110    * A callback that's called when the cluster connects or reconnects to a host.
111    *
112    * Note: This is mostly for testing.
113    *
114    * @param cluster The cluster object.
115    */
on_reconnect(Cluster * cluster)116   virtual void on_reconnect(Cluster* cluster) {}
117 
118   /**
119    * A callback that's called when the cluster has closed.
120    *
121    * @param cluster The cluster object.
122    */
123   virtual void on_close(Cluster* cluster) = 0;
124 };
125 
126 /**
127  * A class for recording host and token map events so they can be replayed.
128  */
129 struct ClusterEvent {
130   typedef Vector<ClusterEvent> Vec;
131   enum Type {
132     HOST_UP,
133     HOST_DOWN,
134     HOST_ADD,
135     HOST_REMOVE,
136     HOST_MAYBE_UP,
137     HOST_READY,
138     TOKEN_MAP_UPDATE
139   };
140 
ClusterEventdatastax::internal::core::ClusterEvent141   ClusterEvent(Type type, const Host::Ptr& host)
142       : type(type)
143       , host(host) {}
144 
ClusterEventdatastax::internal::core::ClusterEvent145   ClusterEvent(const TokenMap::Ptr& token_map)
146       : type(TOKEN_MAP_UPDATE)
147       , token_map(token_map) {}
148 
149   static void process_event(const ClusterEvent& event, ClusterListener* listener);
150   static void process_events(const Vec& events, ClusterListener* listener);
151 
152   Type type;
153   Host::Ptr host;
154   TokenMap::Ptr token_map;
155 };
156 
157 /**
158  * Cluster settings.
159  */
160 struct ClusterSettings {
161   /**
162    * Constructor. Initialize with default settings.
163    */
164   ClusterSettings();
165 
166   /**
167    * Constructor. Initialize with a config object.
168    *
169    * @param config The config object.
170    */
171   ClusterSettings(const Config& config);
172 
173   /**
174    * The settings for the underlying control connection.
175    */
176   ControlConnectionSettings control_connection_settings;
177 
178   /**
179    * The load balancing policy to use for reconnecting the control
180    * connection.
181    */
182   LoadBalancingPolicy::Ptr load_balancing_policy;
183 
184   /**
185    * Load balancing policies for all profiles.
186    */
187   LoadBalancingPolicy::Vec load_balancing_policies;
188 
189   /**
190    * The port to use for the contact points. This setting is spread to
191    * the other hosts using the contact point hosts.
192    */
193   int port;
194 
195   /**
196    * Reconnection policy to use when attempting to reconnect the control connection.
197    */
198   ReconnectionPolicy::Ptr reconnection_policy;
199 
200   /**
201    * If true then cached prepared statements are prepared when a host is brought
202    * up or is added.
203    */
204   bool prepare_on_up_or_add_host;
205 
206   /**
207    * Max number of requests to be written out to the socket per write system call.
208    */
209   unsigned max_prepares_per_flush;
210 
211   /**
212    * If true then events are disabled on startup. Events can be explicitly
213    * started by calling `Cluster::start_events()`.
214    */
215   bool disable_events_on_startup;
216 
217   /**
218    * A factory for creating cluster metadata resolvers. A cluster metadata resolver is used to
219    * determine contact points and retrieve other metadata required to connect the
220    * cluster.
221    */
222   ClusterMetadataResolverFactory::Ptr cluster_metadata_resolver_factory;
223 };
224 
225 /**
226  * A cluster connection. This wraps and maintains a control connection to a
227  * cluster. If a host in the cluster fails then it re-establishes a new control
228  * connection to a different host. A cluster will never close without an
229  * explicit call to close because it repeatedly tries to re-establish its
230  * connection even if no hosts are available.
231  */
232 class Cluster
233     : public RefCounted<Cluster>
234     , public ControlConnectionListener {
235 public:
236   typedef SharedRefPtr<Cluster> Ptr;
237 
238   /**
239    * Constructor. Don't use directly.
240    *
241    * @param connection The current control connection.
242    * @param listener A listener to handle cluster events.
243    * @param event_loop The event loop.
244    * @param connected_host The currently connected host.
245    * @param hosts Available hosts for the cluster (based on load balancing
246    * policies).
247    * @param schema Current schema metadata.
248    * @param load_balancing_policy The default load balancing policy to use for
249    * determining the next control connection host.
250    * @param load_balancing_policies
251    * @param local_dc The local datacenter determined by the metadata service for initializing the
252    * load balancing policies.
253    * @param supported_options Supported options discovered during control connection.
254    * @param settings The control connection settings to use for reconnecting the
255    * control connection.
256    */
257   Cluster(const ControlConnection::Ptr& connection, ClusterListener* listener,
258           EventLoop* event_loop, const Host::Ptr& connected_host, const HostMap& hosts,
259           const ControlConnectionSchema& schema,
260           const LoadBalancingPolicy::Ptr& load_balancing_policy,
261           const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
262           const StringMultimap& supported_options, const ClusterSettings& settings);
263 
264   /**
265    * Set the listener that will handle events for the cluster
266    * (*NOT* thread-safe).
267    *
268    * @param listener The cluster listener.
269    */
270   void set_listener(ClusterListener* listener = NULL);
271 
272   /**
273    * Close the current connection and stop the re-connection process (thread-safe).
274    */
275   void close();
276 
277   /**
278    * Notify that a node has been determined to be available via an external
279    * source (thread-safe).
280    *
281    * @param address The address of the host that is now available.
282    */
283   void notify_host_up(const Address& address);
284 
285   /**
286    * Notify that a node has been determined to be down via an external source.
287    * DOWN events from the control connection are ignored so it is up to other
288    * sources to determine a host is unavailable (thread-safe).
289    *
290    * @param address That address of the host that is now unavailable.
291    */
292   void notify_host_down(const Address& address);
293 
294   /**
295    * Start host and token map events. Events that occurred during startup will be
296    * replayed (thread-safe).
297    */
298   void start_events();
299 
300   /**
301    * Start the client monitor events (thread-safe).
302    *
303    * @param client_id Client ID associated with the session.
304    * @param session_id Session ID associated with the session.
305    * @param config The config object.
306    */
307   void start_monitor_reporting(const String& client_id, const String& session_id,
308                                const Config& config);
309 
310   /**
311    * Get the latest snapshot of the schema metadata (thread-safe).
312    *
313    * @return A schema metadata snapshot.
314    */
315   Metadata::SchemaSnapshot schema_snapshot();
316 
317   /**
318    * Look up a host by address (thread-safe).
319    *
320    * @param address The address of the host.
321    * @return The host object for the specified address or a null object pointer
322    * if the host doesn't exist.
323    */
324   Host::Ptr find_host(const Address& address) const;
325 
326   /**
327    * Get a prepared metadata entry for a prepared ID (thread-safe).
328    *
329    * @param id A prepared ID
330    * @return The prepare metadata object for the specified ID or a null object
331    * pointer if the entry doesn't exist.
332    */
333   PreparedMetadata::Entry::Ptr prepared(const String& id) const;
334 
335   /**
336    * Set the prepared metadata for a given prepared ID (thread-safe).
337    *
338    * @param id A prepared ID.
339    * @param entry A prepared metadata entry.
340    */
341   void prepared(const String& id, const PreparedMetadata::Entry::Ptr& entry);
342 
343   /**
344    * Get available hosts (determined by host distance). This filters out ignored
345    * hosts (*NOT* thread-safe).
346    *
347    * @return A mapping of available hosts.
348    */
349   HostMap available_hosts() const;
350 
351 public:
protocol_version() const352   ProtocolVersion protocol_version() const { return connection_->protocol_version(); }
connected_host() const353   const Host::Ptr& connected_host() const { return connected_host_; }
token_map() const354   const TokenMap::Ptr& token_map() const { return token_map_; }
local_dc() const355   const String& local_dc() const { return local_dc_; }
dse_server_version() const356   const VersionNumber& dse_server_version() const { return connection_->dse_server_version(); }
supported_options() const357   const StringMultimap& supported_options() const { return supported_options_; }
358 
359 private:
360   friend class ClusterRunClose;
361   friend class ClusterNotifyUp;
362   friend class ClusterNotifyDown;
363   friend class ClusterStartEvents;
364   friend class ClusterStartClientMonitor;
365 
366 private:
367   void update_hosts(const HostMap& hosts);
368   void update_schema(const ControlConnectionSchema& schema);
369   void update_token_map(const HostMap& hosts, const String& partitioner,
370                         const ControlConnectionSchema& schema);
371 
372   bool is_host_ignored(const Host::Ptr& host) const;
373 
374   void schedule_reconnect();
375 
376   void on_schedule_reconnect(Timer* timer);
377   void handle_schedule_reconnect();
378 
379   void on_reconnect(ControlConnector* connector);
380 
381 private:
382   void internal_close();
383   void handle_close();
384 
385   void internal_notify_host_up(const Address& address);
386   void notify_host_up_after_prepare(const Host::Ptr& host);
387 
388   void internal_notify_host_down(const Address& address);
389 
390   void internal_start_events();
391   void internal_start_monitor_reporting(const String& client_id, const String& session_id,
392                                         const Config& config);
393 
394   void on_monitor_reporting(Timer* timer);
395 
396   void notify_host_add(const Host::Ptr& host);
397   void notify_host_add_after_prepare(const Host::Ptr& host);
398 
399   void notify_host_remove(const Address& address);
400 
401 private:
402   void notify_or_record(const ClusterEvent& event);
403 
404 private:
405   bool prepare_host(const Host::Ptr& host, const PrepareHostHandler::Callback& callback);
406 
407   void on_prepare_host_add(const PrepareHostHandler* handler);
408   void on_prepare_host_up(const PrepareHostHandler* handler);
409 
410 private:
411   // Control connection listener methods
412 
413   virtual void on_update_schema(SchemaType type, const ResultResponse::Ptr& result,
414                                 const String& keyspace_name, const String& target_name);
415 
416   virtual void on_drop_schema(SchemaType type, const String& keyspace_name,
417                               const String& target_name);
418 
419   virtual void on_up(const Address& address);
420   virtual void on_down(const Address& address);
421 
422   virtual void on_add(const Host::Ptr& host);
423   virtual void on_remove(const Address& address);
424 
425   virtual void on_close(ControlConnection* connection);
426 
427 private:
428   ControlConnection::Ptr connection_;
429   ControlConnector::Ptr reconnector_;
430   ClusterListener* listener_;
431   EventLoop* const event_loop_;
432   const LoadBalancingPolicy::Ptr load_balancing_policy_;
433   LoadBalancingPolicy::Vec load_balancing_policies_;
434   const ClusterSettings settings_;
435   ScopedPtr<QueryPlan> query_plan_;
436   bool is_closing_;
437   Host::Ptr connected_host_;
438   LockedHostMap hosts_;
439   Metadata metadata_;
440   PreparedMetadata prepared_metadata_;
441   TokenMap::Ptr token_map_;
442   String local_dc_;
443   StringMultimap supported_options_;
444   Timer timer_;
445   bool is_recording_events_;
446   ClusterEvent::Vec recorded_events_;
447   ScopedPtr<MonitorReporting> monitor_reporting_;
448   Timer monitor_reporting_timer_;
449   ScopedPtr<ReconnectionSchedule> reconnection_schedule_;
450 };
451 
452 }}} // namespace datastax::internal::core
453 
454 #endif
455