1 /* 2 Copyright (c) DataStax, Inc. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef DATASTAX_INTERNAL_CONNECTION_POOL_HPP 18 #define DATASTAX_INTERNAL_CONNECTION_POOL_HPP 19 20 #include "address.hpp" 21 #include "delayed_connector.hpp" 22 #include "dense_hash_map.hpp" 23 #include "pooled_connection.hpp" 24 #include "reconnection_policy.hpp" 25 26 #include <uv.h> 27 28 namespace datastax { namespace internal { namespace core { 29 30 class ConnectionPool; 31 class ConnectionPoolConnector; 32 class ConnectionPoolManager; 33 class EventLoop; 34 35 class ConnectionPoolStateListener { 36 public: ~ConnectionPoolStateListener()37 virtual ~ConnectionPoolStateListener() {} 38 39 /** 40 * A callback that's called when a host is up. 41 * 42 * @param address The address of the host. 43 */ 44 virtual void on_pool_up(const Address& address) = 0; 45 46 /** 47 * A callback that's called when a host is down. 48 * 49 * @param address The address of the host. 50 */ 51 virtual void on_pool_down(const Address& address) = 0; 52 53 /** 54 * A callback that's called when a host has a critical error 55 * during reconnection. 56 * 57 * The following are critical errors: 58 * * Invalid keyspace 59 * * Invalid protocol version 60 * * Authentication failure 61 * * SSL failure 62 * 63 * @param address The address of the host. 64 * @param code The code of the critical error. 65 * @param message The message of the critical error. 66 */ 67 virtual void on_pool_critical_error(const Address& address, Connector::ConnectionError code, 68 const String& message) = 0; 69 }; 70 71 class ConnectionPoolListener : public ConnectionPoolStateListener { 72 public: on_requires_flush(ConnectionPool * pool)73 virtual void on_requires_flush(ConnectionPool* pool) {} 74 75 virtual void on_close(ConnectionPool* pool) = 0; 76 }; 77 78 /** 79 * The connection pool settings. 80 */ 81 struct ConnectionPoolSettings { 82 /** 83 * Constructor. Initialize with default settings. 84 */ 85 ConnectionPoolSettings(); 86 87 /** 88 * Constructor. Initialize the pool settings from a config object. 89 * 90 * @param config The config object. 91 */ 92 ConnectionPoolSettings(const Config& config); 93 94 ConnectionSettings connection_settings; 95 size_t num_connections_per_host; 96 ReconnectionPolicy::Ptr reconnection_policy; 97 }; 98 99 /** 100 * A pool of connections to the same host. 101 */ 102 class ConnectionPool : public RefCounted<ConnectionPool> { 103 public: 104 typedef SharedRefPtr<ConnectionPool> Ptr; 105 typedef DenseHashMap<DelayedConnector*, ReconnectionSchedule*> ReconnectionSchedules; 106 107 class Map : public DenseHashMap<Address, Ptr> { 108 public: Map()109 Map() { 110 set_empty_key(Address::EMPTY_KEY); 111 set_deleted_key(Address::DELETED_KEY); 112 } 113 }; 114 115 /** 116 * Constructor. Don't use directly. 117 * 118 * @param connections 119 * @param listener 120 * @param keyspace 121 * @param loop 122 * @param host 123 * @param protocol_version 124 * @param settings 125 * @param metrics 126 */ 127 ConnectionPool(const Connection::Vec& connections, ConnectionPoolListener* listener, 128 const String& keyspace, uv_loop_t* loop, const Host::Ptr& host, 129 ProtocolVersion protocol_version, const ConnectionPoolSettings& settings, 130 Metrics* metrics); 131 132 /** 133 * Find the least busy connection for the pool. The least busy connection has 134 * the lowest number of outstanding requests and is not closed. 135 * 136 * @return The least busy connection or null if no connection is available. 137 */ 138 PooledConnection::Ptr find_least_busy() const; 139 140 /** 141 * Determine if the pool has any valid connections. 142 * 143 * @return Returns true if the pool has valid connections. 144 */ 145 bool has_connections() const; 146 147 /** 148 * Trigger immediate connection of any delayed (reconnecting) connections. 149 */ 150 void attempt_immediate_connect(); 151 152 /** 153 * Flush connections with pending writes. 154 */ 155 void flush(); 156 157 /** 158 * Close the pool. 159 */ 160 void close(); 161 162 /** 163 * Set the listener that will handle events for the pool. 164 * 165 * @param listener The pool listener. 166 */ 167 void set_listener(ConnectionPoolListener* listener = NULL); 168 169 public: loop() const170 const uv_loop_t* loop() const { return loop_; } address() const171 const Address& address() const { return host_->address(); } protocol_version() const172 ProtocolVersion protocol_version() const { return protocol_version_; } keyspace() const173 const String& keyspace() const { return keyspace_; } 174 175 void set_keyspace(const String& keyspace); 176 177 public: 178 class Protected { 179 friend class PooledConnection; 180 friend class ConnectionPoolConnector; Protected()181 Protected() {} Protected(Protected const &)182 Protected(Protected const&) {} 183 }; 184 185 /** 186 * Remove the connection and schedule a reconnection. 187 * 188 * @param connection A connection that closed. 189 * @param A key to restrict access to the method. 190 */ 191 void close_connection(PooledConnection* connection, Protected); 192 193 /** 194 * Add a connection to be flushed. 195 * 196 * @param connection A connection with pending writes. 197 */ 198 void requires_flush(PooledConnection* connection, Protected); 199 200 private: 201 enum CloseState { 202 CLOSE_STATE_OPEN, 203 CLOSE_STATE_CLOSING, 204 CLOSE_STATE_WAITING_FOR_CONNECTIONS, 205 CLOSE_STATE_CLOSED 206 }; 207 208 enum NotifyState { NOTIFY_STATE_NEW, NOTIFY_STATE_UP, NOTIFY_STATE_DOWN, NOTIFY_STATE_CRITICAL }; 209 210 private: 211 friend class NotifyDownOnRemovePoolOp; 212 213 private: 214 void notify_up_or_down(); 215 void notify_critical_error(Connector::ConnectionError code, const String& message); 216 void add_connection(const PooledConnection::Ptr& connection); 217 void schedule_reconnect(ReconnectionSchedule* schedule = NULL); 218 void internal_close(); 219 void maybe_closed(); 220 221 void on_reconnect(DelayedConnector* connector); 222 223 private: 224 ConnectionPoolListener* listener_; 225 String keyspace_; 226 uv_loop_t* const loop_; 227 const Host::Ptr host_; 228 const ProtocolVersion protocol_version_; 229 const ConnectionPoolSettings settings_; 230 Metrics* const metrics_; 231 ReconnectionSchedules reconnection_schedules_; 232 233 CloseState close_state_; 234 NotifyState notify_state_; 235 PooledConnection::Vec connections_; 236 DelayedConnector::Vec pending_connections_; 237 DenseHashSet<PooledConnection*> to_flush_; 238 }; 239 240 }}} // namespace datastax::internal::core 241 242 #endif 243