1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifndef DATASTAX_INTERNAL_CONNECTION_POOL_MANAGER_HPP
18 #define DATASTAX_INTERNAL_CONNECTION_POOL_MANAGER_HPP
19 
20 #include "address.hpp"
21 #include "atomic.hpp"
22 #include "connection_pool.hpp"
23 #include "connection_pool_connector.hpp"
24 #include "histogram_wrapper.hpp"
25 #include "ref_counted.hpp"
26 #include "string.hpp"
27 
28 #include <uv.h>
29 
30 namespace datastax { namespace internal { namespace core {
31 
32 class EventLoop;
33 
34 /**
35  * A listener that handles connection pool events.
36  */
37 class ConnectionPoolManagerListener : public ConnectionPoolStateListener {
38 public:
~ConnectionPoolManagerListener()39   virtual ~ConnectionPoolManagerListener() {}
40 
41   /**
42    * A callback that's called when one of the manager's connections requires a
43    * flush. It's only called once on the first write to the connection.
44    */
on_requires_flush()45   virtual void on_requires_flush() {}
46 
47   /**
48    * A callback that's called when a manager is closed.
49    *
50    * @param manager The manager object that's closing.
51    */
52   virtual void on_close(ConnectionPoolManager* manager) = 0;
53 };
54 
55 /**
56  * A manager for one or more connection pools to different hosts.
57  */
58 class ConnectionPoolManager
59     : public RefCounted<ConnectionPoolManager>
60     , public ConnectionPoolListener {
61 public:
62   typedef SharedRefPtr<ConnectionPoolManager> Ptr;
63 
64   /**
65    * Constructor. Don't use directly.
66    *
67    * @param pools
68    * @param loop Event loop to utilize for handling requests.
69    * @param protocol_version The protocol version to use for connections.
70    * @param keyspace The current keyspace to use for connections.
71    * @param listener A listener that handles manager events.
72    * @param metrics An object for recording metrics.
73    * @param settings Settings for the manager and its connections.
74    */
75   ConnectionPoolManager(const ConnectionPool::Map& pools, uv_loop_t* loop,
76                         ProtocolVersion protocol_version, const String& keyspace,
77                         ConnectionPoolManagerListener* listener, Metrics* metrics,
78                         const ConnectionPoolSettings& settings);
79 
80   /**
81    * Find the least busy connection for a given host.
82    *
83    * @param address The address of the host to find a least busy connection.
84    * @return The least busy connection for a host or null if no connections are
85    * available.
86    */
87   PooledConnection::Ptr find_least_busy(const Address& address) const;
88 
89   /**
90    * Determine if a pool has any valid connections.
91    *
92    * @param address An address to check for valid connections.
93    * @return Returns true if the pool has valid connections.
94    */
95   bool has_connections(const Address& address) const;
96 
97   /**
98    * Flush connection pools with pending writes.
99    */
100   void flush();
101 
102   /**
103    * Get addresses for all available hosts.
104    *
105    * @return A vector of addresses.
106    */
107   AddressVec available() const;
108 
109   /**
110    * Add a connection pool for the given host.
111    *
112    * @param host The host to add.
113    */
114   void add(const Host::Ptr& host);
115 
116   /**
117    * Remove a connection pool for the given host.
118    *
119    * @param address The address of the host to remove.
120    */
121   void remove(const Address& address);
122 
123   /**
124    * Trigger immediate connection of any delayed (reconnecting) connections.
125    *
126    * @param address An address to trigger immediate connections.
127    */
128   void attempt_immediate_connect(const Address& address);
129 
130   /**
131    * Close all connection pools.
132    */
133   void close();
134 
135   /**
136    * Set the listener that will handle events for the connection pool manager.
137    *
138    * @param listener The connection pool manager listener.
139    */
140   void set_listener(ConnectionPoolManagerListener* listener = NULL);
141 
142 public:
loop() const143   uv_loop_t* loop() const { return loop_; }
protocol_version() const144   ProtocolVersion protocol_version() const { return protocol_version_; }
settings() const145   const ConnectionPoolSettings& settings() const { return settings_; }
listener() const146   ConnectionPoolManagerListener* listener() const { return listener_; }
keyspace() const147   const String& keyspace() const { return keyspace_; }
148 
149   void set_keyspace(const String& keyspace);
150 
metrics() const151   Metrics* metrics() const { return metrics_; }
152 
153 #ifdef CASS_INTERNAL_DIAGNOSTICS
flush_bytes()154   HistogramWrapper& flush_bytes() { return flush_bytes_; }
155 #endif
156 
157 private:
158   // Connection pool listener methods
159 
160   virtual void on_pool_up(const Address& address);
161 
162   virtual void on_pool_down(const Address& address);
163 
164   virtual void on_pool_critical_error(const Address& address, Connector::ConnectionError code,
165                                       const String& message);
166 
167   virtual void on_requires_flush(ConnectionPool* pool);
168 
169   virtual void on_close(ConnectionPool* pool);
170 
171 private:
172   enum CloseState {
173     CLOSE_STATE_OPEN,
174     CLOSE_STATE_CLOSING,
175     CLOSE_STATE_WAITING_FOR_POOLS,
176     CLOSE_STATE_CLOSED
177   };
178 
179 private:
180   void add_pool(const ConnectionPool::Ptr& pool);
181   void maybe_closed();
182 
183 private:
184   void on_connect(ConnectionPoolConnector* pool_connector);
185 
186 private:
187   uv_loop_t* loop_;
188 
189   const ProtocolVersion protocol_version_;
190   const ConnectionPoolSettings settings_;
191   ConnectionPoolManagerListener* listener_;
192 
193   CloseState close_state_;
194   ConnectionPool::Map pools_;
195   ConnectionPoolConnector::Vec pending_pools_;
196   DenseHashSet<ConnectionPool*> to_flush_;
197 
198   String keyspace_;
199 
200   Metrics* const metrics_;
201 
202 #ifdef CASS_INTERNAL_DIAGNOSTICS
203   HistogramWrapper flush_bytes_;
204 #endif
205 };
206 
207 }}} // namespace datastax::internal::core
208 
209 #endif
210