1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifndef DATASTAX_INTERNAL_CONNECTION_POOL_HPP
18 #define DATASTAX_INTERNAL_CONNECTION_POOL_HPP
19 
20 #include "address.hpp"
21 #include "delayed_connector.hpp"
22 #include "dense_hash_map.hpp"
23 #include "pooled_connection.hpp"
24 #include "reconnection_policy.hpp"
25 
26 #include <uv.h>
27 
28 namespace datastax { namespace internal { namespace core {
29 
30 class ConnectionPool;
31 class ConnectionPoolConnector;
32 class ConnectionPoolManager;
33 class EventLoop;
34 
35 class ConnectionPoolStateListener {
36 public:
~ConnectionPoolStateListener()37   virtual ~ConnectionPoolStateListener() {}
38 
39   /**
40    * A callback that's called when a host is up.
41    *
42    * @param address The address of the host.
43    */
44   virtual void on_pool_up(const Address& address) = 0;
45 
46   /**
47    * A callback that's called when a host is down.
48    *
49    * @param address The address of the host.
50    */
51   virtual void on_pool_down(const Address& address) = 0;
52 
53   /**
54    * A callback that's called when a host has a critical error
55    * during reconnection.
56    *
57    * The following are critical errors:
58    * * Invalid keyspace
59    * * Invalid protocol version
60    * * Authentication failure
61    * * SSL failure
62    *
63    * @param address The address of the host.
64    * @param code The code of the critical error.
65    * @param message The message of the critical error.
66    */
67   virtual void on_pool_critical_error(const Address& address, Connector::ConnectionError code,
68                                       const String& message) = 0;
69 };
70 
71 class ConnectionPoolListener : public ConnectionPoolStateListener {
72 public:
on_requires_flush(ConnectionPool * pool)73   virtual void on_requires_flush(ConnectionPool* pool) {}
74 
75   virtual void on_close(ConnectionPool* pool) = 0;
76 };
77 
78 /**
79  * The connection pool settings.
80  */
81 struct ConnectionPoolSettings {
82   /**
83    * Constructor. Initialize with default settings.
84    */
85   ConnectionPoolSettings();
86 
87   /**
88    * Constructor. Initialize the pool settings from a config object.
89    *
90    * @param config The config object.
91    */
92   ConnectionPoolSettings(const Config& config);
93 
94   ConnectionSettings connection_settings;
95   size_t num_connections_per_host;
96   ReconnectionPolicy::Ptr reconnection_policy;
97 };
98 
99 /**
100  * A pool of connections to the same host.
101  */
102 class ConnectionPool : public RefCounted<ConnectionPool> {
103 public:
104   typedef SharedRefPtr<ConnectionPool> Ptr;
105   typedef DenseHashMap<DelayedConnector*, ReconnectionSchedule*> ReconnectionSchedules;
106 
107   class Map : public DenseHashMap<Address, Ptr> {
108   public:
Map()109     Map() {
110       set_empty_key(Address::EMPTY_KEY);
111       set_deleted_key(Address::DELETED_KEY);
112     }
113   };
114 
115   /**
116    * Constructor. Don't use directly.
117    *
118    * @param connections
119    * @param listener
120    * @param keyspace
121    * @param loop
122    * @param host
123    * @param protocol_version
124    * @param settings
125    * @param metrics
126    */
127   ConnectionPool(const Connection::Vec& connections, ConnectionPoolListener* listener,
128                  const String& keyspace, uv_loop_t* loop, const Host::Ptr& host,
129                  ProtocolVersion protocol_version, const ConnectionPoolSettings& settings,
130                  Metrics* metrics);
131 
132   /**
133    * Find the least busy connection for the pool. The least busy connection has
134    * the lowest number of outstanding requests and is not closed.
135    *
136    * @return The least busy connection or null if no connection is available.
137    */
138   PooledConnection::Ptr find_least_busy() const;
139 
140   /**
141    * Determine if the pool has any valid connections.
142    *
143    * @return Returns true if the pool has valid connections.
144    */
145   bool has_connections() const;
146 
147   /**
148    * Trigger immediate connection of any delayed (reconnecting) connections.
149    */
150   void attempt_immediate_connect();
151 
152   /**
153    * Flush connections with pending writes.
154    */
155   void flush();
156 
157   /**
158    * Close the pool.
159    */
160   void close();
161 
162   /**
163    * Set the listener that will handle events for the pool.
164    *
165    * @param listener The pool listener.
166    */
167   void set_listener(ConnectionPoolListener* listener = NULL);
168 
169 public:
loop() const170   const uv_loop_t* loop() const { return loop_; }
address() const171   const Address& address() const { return host_->address(); }
protocol_version() const172   ProtocolVersion protocol_version() const { return protocol_version_; }
keyspace() const173   const String& keyspace() const { return keyspace_; }
174 
175   void set_keyspace(const String& keyspace);
176 
177 public:
178   class Protected {
179     friend class PooledConnection;
180     friend class ConnectionPoolConnector;
Protected()181     Protected() {}
Protected(Protected const &)182     Protected(Protected const&) {}
183   };
184 
185   /**
186    * Remove the connection and schedule a reconnection.
187    *
188    * @param connection A connection that closed.
189    * @param A key to restrict access to the method.
190    */
191   void close_connection(PooledConnection* connection, Protected);
192 
193   /**
194    * Add a connection to be flushed.
195    *
196    * @param connection A connection with pending writes.
197    */
198   void requires_flush(PooledConnection* connection, Protected);
199 
200 private:
201   enum CloseState {
202     CLOSE_STATE_OPEN,
203     CLOSE_STATE_CLOSING,
204     CLOSE_STATE_WAITING_FOR_CONNECTIONS,
205     CLOSE_STATE_CLOSED
206   };
207 
208   enum NotifyState { NOTIFY_STATE_NEW, NOTIFY_STATE_UP, NOTIFY_STATE_DOWN, NOTIFY_STATE_CRITICAL };
209 
210 private:
211   friend class NotifyDownOnRemovePoolOp;
212 
213 private:
214   void notify_up_or_down();
215   void notify_critical_error(Connector::ConnectionError code, const String& message);
216   void add_connection(const PooledConnection::Ptr& connection);
217   void schedule_reconnect(ReconnectionSchedule* schedule = NULL);
218   void internal_close();
219   void maybe_closed();
220 
221   void on_reconnect(DelayedConnector* connector);
222 
223 private:
224   ConnectionPoolListener* listener_;
225   String keyspace_;
226   uv_loop_t* const loop_;
227   const Host::Ptr host_;
228   const ProtocolVersion protocol_version_;
229   const ConnectionPoolSettings settings_;
230   Metrics* const metrics_;
231   ReconnectionSchedules reconnection_schedules_;
232 
233   CloseState close_state_;
234   NotifyState notify_state_;
235   PooledConnection::Vec connections_;
236   DelayedConnector::Vec pending_connections_;
237   DenseHashSet<PooledConnection*> to_flush_;
238 };
239 
240 }}} // namespace datastax::internal::core
241 
242 #endif
243