1 /* 2 Copyright (c) DataStax, Inc. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef DATASTAX_INTERNAL_CONNECTION_POOL_MANAGER_HPP 18 #define DATASTAX_INTERNAL_CONNECTION_POOL_MANAGER_HPP 19 20 #include "address.hpp" 21 #include "atomic.hpp" 22 #include "connection_pool.hpp" 23 #include "connection_pool_connector.hpp" 24 #include "histogram_wrapper.hpp" 25 #include "ref_counted.hpp" 26 #include "string.hpp" 27 28 #include <uv.h> 29 30 namespace datastax { namespace internal { namespace core { 31 32 class EventLoop; 33 34 /** 35 * A listener that handles connection pool events. 36 */ 37 class ConnectionPoolManagerListener : public ConnectionPoolStateListener { 38 public: ~ConnectionPoolManagerListener()39 virtual ~ConnectionPoolManagerListener() {} 40 41 /** 42 * A callback that's called when one of the manager's connections requires a 43 * flush. It's only called once on the first write to the connection. 44 */ on_requires_flush()45 virtual void on_requires_flush() {} 46 47 /** 48 * A callback that's called when a manager is closed. 49 * 50 * @param manager The manager object that's closing. 51 */ 52 virtual void on_close(ConnectionPoolManager* manager) = 0; 53 }; 54 55 /** 56 * A manager for one or more connection pools to different hosts. 57 */ 58 class ConnectionPoolManager 59 : public RefCounted<ConnectionPoolManager> 60 , public ConnectionPoolListener { 61 public: 62 typedef SharedRefPtr<ConnectionPoolManager> Ptr; 63 64 /** 65 * Constructor. Don't use directly. 66 * 67 * @param pools 68 * @param loop Event loop to utilize for handling requests. 69 * @param protocol_version The protocol version to use for connections. 70 * @param keyspace The current keyspace to use for connections. 71 * @param listener A listener that handles manager events. 72 * @param metrics An object for recording metrics. 73 * @param settings Settings for the manager and its connections. 74 */ 75 ConnectionPoolManager(const ConnectionPool::Map& pools, uv_loop_t* loop, 76 ProtocolVersion protocol_version, const String& keyspace, 77 ConnectionPoolManagerListener* listener, Metrics* metrics, 78 const ConnectionPoolSettings& settings); 79 80 /** 81 * Find the least busy connection for a given host. 82 * 83 * @param address The address of the host to find a least busy connection. 84 * @return The least busy connection for a host or null if no connections are 85 * available. 86 */ 87 PooledConnection::Ptr find_least_busy(const Address& address) const; 88 89 /** 90 * Determine if a pool has any valid connections. 91 * 92 * @param address An address to check for valid connections. 93 * @return Returns true if the pool has valid connections. 94 */ 95 bool has_connections(const Address& address) const; 96 97 /** 98 * Flush connection pools with pending writes. 99 */ 100 void flush(); 101 102 /** 103 * Get addresses for all available hosts. 104 * 105 * @return A vector of addresses. 106 */ 107 AddressVec available() const; 108 109 /** 110 * Add a connection pool for the given host. 111 * 112 * @param host The host to add. 113 */ 114 void add(const Host::Ptr& host); 115 116 /** 117 * Remove a connection pool for the given host. 118 * 119 * @param address The address of the host to remove. 120 */ 121 void remove(const Address& address); 122 123 /** 124 * Trigger immediate connection of any delayed (reconnecting) connections. 125 * 126 * @param address An address to trigger immediate connections. 127 */ 128 void attempt_immediate_connect(const Address& address); 129 130 /** 131 * Close all connection pools. 132 */ 133 void close(); 134 135 /** 136 * Set the listener that will handle events for the connection pool manager. 137 * 138 * @param listener The connection pool manager listener. 139 */ 140 void set_listener(ConnectionPoolManagerListener* listener = NULL); 141 142 public: loop() const143 uv_loop_t* loop() const { return loop_; } protocol_version() const144 ProtocolVersion protocol_version() const { return protocol_version_; } settings() const145 const ConnectionPoolSettings& settings() const { return settings_; } listener() const146 ConnectionPoolManagerListener* listener() const { return listener_; } keyspace() const147 const String& keyspace() const { return keyspace_; } 148 149 void set_keyspace(const String& keyspace); 150 metrics() const151 Metrics* metrics() const { return metrics_; } 152 153 #ifdef CASS_INTERNAL_DIAGNOSTICS flush_bytes()154 HistogramWrapper& flush_bytes() { return flush_bytes_; } 155 #endif 156 157 private: 158 // Connection pool listener methods 159 160 virtual void on_pool_up(const Address& address); 161 162 virtual void on_pool_down(const Address& address); 163 164 virtual void on_pool_critical_error(const Address& address, Connector::ConnectionError code, 165 const String& message); 166 167 virtual void on_requires_flush(ConnectionPool* pool); 168 169 virtual void on_close(ConnectionPool* pool); 170 171 private: 172 enum CloseState { 173 CLOSE_STATE_OPEN, 174 CLOSE_STATE_CLOSING, 175 CLOSE_STATE_WAITING_FOR_POOLS, 176 CLOSE_STATE_CLOSED 177 }; 178 179 private: 180 void add_pool(const ConnectionPool::Ptr& pool); 181 void maybe_closed(); 182 183 private: 184 void on_connect(ConnectionPoolConnector* pool_connector); 185 186 private: 187 uv_loop_t* loop_; 188 189 const ProtocolVersion protocol_version_; 190 const ConnectionPoolSettings settings_; 191 ConnectionPoolManagerListener* listener_; 192 193 CloseState close_state_; 194 ConnectionPool::Map pools_; 195 ConnectionPoolConnector::Vec pending_pools_; 196 DenseHashSet<ConnectionPool*> to_flush_; 197 198 String keyspace_; 199 200 Metrics* const metrics_; 201 202 #ifdef CASS_INTERNAL_DIAGNOSTICS 203 HistogramWrapper flush_bytes_; 204 #endif 205 }; 206 207 }}} // namespace datastax::internal::core 208 209 #endif 210