1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "connection_pool.hpp"
18
19 #include "config.hpp"
20 #include "connection_pool_manager.hpp"
21 #include "metrics.hpp"
22 #include "utils.hpp"
23
24 #include <algorithm>
25
26 using namespace datastax;
27 using namespace datastax::internal::core;
28
least_busy_comp(const PooledConnection::Ptr & a,const PooledConnection::Ptr & b)29 static inline bool least_busy_comp(const PooledConnection::Ptr& a, const PooledConnection::Ptr& b) {
30 // Don't consider closed connections to be the least busy.
31 if (a->is_closing()) { // "a" is closed so it can't be the least busy.
32 return false;
33 } else if (b->is_closing()) { // "a" is not close, but "b" is closed so "a" is less busy.
34 return true;
35 }
36 // Both "a" and "b" are not closed so compare their inflight request counts.
37 return a->inflight_request_count() < b->inflight_request_count();
38 }
39
ConnectionPoolSettings()40 ConnectionPoolSettings::ConnectionPoolSettings()
41 : num_connections_per_host(CASS_DEFAULT_NUM_CONNECTIONS_PER_HOST)
42 , reconnection_policy(new ExponentialReconnectionPolicy()) {}
43
ConnectionPoolSettings(const Config & config)44 ConnectionPoolSettings::ConnectionPoolSettings(const Config& config)
45 : connection_settings(config)
46 , num_connections_per_host(config.core_connections_per_host())
47 , reconnection_policy(config.reconnection_policy()) {}
48
49 class NopConnectionPoolListener : public ConnectionPoolListener {
50 public:
on_pool_up(const Address & address)51 virtual void on_pool_up(const Address& address) {}
52
on_pool_down(const Address & address)53 virtual void on_pool_down(const Address& address) {}
54
on_pool_critical_error(const Address & address,Connector::ConnectionError code,const String & message)55 virtual void on_pool_critical_error(const Address& address, Connector::ConnectionError code,
56 const String& message) {}
57
on_close(ConnectionPool * pool)58 virtual void on_close(ConnectionPool* pool) {}
59 };
60
61 NopConnectionPoolListener nop_connection_pool_listener__;
62
ConnectionPool(const Connection::Vec & connections,ConnectionPoolListener * listener,const String & keyspace,uv_loop_t * loop,const Host::Ptr & host,ProtocolVersion protocol_version,const ConnectionPoolSettings & settings,Metrics * metrics)63 ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoolListener* listener,
64 const String& keyspace, uv_loop_t* loop, const Host::Ptr& host,
65 ProtocolVersion protocol_version,
66 const ConnectionPoolSettings& settings, Metrics* metrics)
67 : listener_(listener ? listener : &nop_connection_pool_listener__)
68 , keyspace_(keyspace)
69 , loop_(loop)
70 , host_(host)
71 , protocol_version_(protocol_version)
72 , settings_(settings)
73 , metrics_(metrics)
74 , close_state_(CLOSE_STATE_OPEN)
75 , notify_state_(NOTIFY_STATE_NEW) {
76 inc_ref(); // Reference for the lifetime of the pooled connections
77 set_pointer_keys(reconnection_schedules_);
78 set_pointer_keys(to_flush_);
79
80 for (Connection::Vec::const_iterator it = connections.begin(), end = connections.end(); it != end;
81 ++it) {
82 const Connection::Ptr& connection(*it);
83 if (!connection->is_closing()) {
84 add_connection(PooledConnection::Ptr(new PooledConnection(this, connection)));
85 }
86 }
87
88 notify_up_or_down();
89
90 // We had non-critical errors or some connections closed
91 assert(connections.size() <= settings_.num_connections_per_host);
92 size_t needed = settings_.num_connections_per_host - connections_.size();
93 for (size_t i = 0; i < needed; ++i) {
94 schedule_reconnect();
95 }
96 }
97
find_least_busy() const98 PooledConnection::Ptr ConnectionPool::find_least_busy() const {
99 PooledConnection::Vec::const_iterator it =
100 std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
101 if (it == connections_.end() || (*it)->is_closing()) {
102 return PooledConnection::Ptr();
103 }
104 return *it;
105 }
106
has_connections() const107 bool ConnectionPool::has_connections() const { return !connections_.empty(); }
108
flush()109 void ConnectionPool::flush() {
110 for (DenseHashSet<PooledConnection*>::const_iterator it = to_flush_.begin(),
111 end = to_flush_.end();
112 it != end; ++it) {
113 (*it)->flush();
114 }
115 to_flush_.clear();
116 }
117
close()118 void ConnectionPool::close() { internal_close(); }
119
attempt_immediate_connect()120 void ConnectionPool::attempt_immediate_connect() {
121 for (DelayedConnector::Vec::iterator it = pending_connections_.begin(),
122 end = pending_connections_.end();
123 it != end; ++it) {
124 (*it)->attempt_immediate_connect();
125 }
126 }
127
set_listener(ConnectionPoolListener * listener)128 void ConnectionPool::set_listener(ConnectionPoolListener* listener) {
129 listener_ = listener ? listener : &nop_connection_pool_listener__;
130 }
131
set_keyspace(const String & keyspace)132 void ConnectionPool::set_keyspace(const String& keyspace) { keyspace_ = keyspace; }
133
requires_flush(PooledConnection * connection,ConnectionPool::Protected)134 void ConnectionPool::requires_flush(PooledConnection* connection, ConnectionPool::Protected) {
135 if (to_flush_.empty()) {
136 listener_->on_requires_flush(this);
137 }
138 to_flush_.insert(connection);
139 }
140
close_connection(PooledConnection * connection,Protected)141 void ConnectionPool::close_connection(PooledConnection* connection, Protected) {
142 if (metrics_) {
143 metrics_->total_connections.dec();
144 }
145 connections_.erase(std::remove(connections_.begin(), connections_.end(), connection),
146 connections_.end());
147 to_flush_.erase(connection);
148
149 if (close_state_ != CLOSE_STATE_OPEN) {
150 maybe_closed();
151 return;
152 }
153
154 // When there are no more connections available then notify that the host
155 // is down.
156 notify_up_or_down();
157 schedule_reconnect();
158 }
159
add_connection(const PooledConnection::Ptr & connection)160 void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) {
161 if (metrics_) {
162 metrics_->total_connections.inc();
163 }
164 connections_.push_back(connection);
165 }
166
notify_up_or_down()167 void ConnectionPool::notify_up_or_down() {
168 if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) &&
169 connections_.empty()) {
170 notify_state_ = NOTIFY_STATE_DOWN;
171 listener_->on_pool_down(host_->address());
172 } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) &&
173 !connections_.empty()) {
174 notify_state_ = NOTIFY_STATE_UP;
175 listener_->on_pool_up(host_->address());
176 }
177 }
178
notify_critical_error(Connector::ConnectionError code,const String & message)179 void ConnectionPool::notify_critical_error(Connector::ConnectionError code, const String& message) {
180 if (notify_state_ != NOTIFY_STATE_CRITICAL) {
181 notify_state_ = NOTIFY_STATE_CRITICAL;
182 listener_->on_pool_critical_error(host_->address(), code, message);
183 }
184 }
185
schedule_reconnect(ReconnectionSchedule * schedule)186 void ConnectionPool::schedule_reconnect(ReconnectionSchedule* schedule) {
187 DelayedConnector::Ptr connector(new DelayedConnector(
188 host_, protocol_version_, bind_callback(&ConnectionPool::on_reconnect, this)));
189
190 if (!schedule) {
191 schedule = settings_.reconnection_policy->new_reconnection_schedule();
192 }
193 reconnection_schedules_[connector.get()] = schedule;
194
195 uint64_t delay_ms = schedule->next_delay_ms();
196 LOG_INFO("Scheduling %s reconnect for host %s in %llums on connection pool (%p) ",
197 settings_.reconnection_policy->name(), host_->address().to_string().c_str(),
198 static_cast<unsigned long long>(delay_ms), static_cast<void*>(this));
199
200 pending_connections_.push_back(connector);
201 connector->with_keyspace(keyspace())
202 ->with_metrics(metrics_)
203 ->with_settings(settings_.connection_settings)
204 ->delayed_connect(loop_, delay_ms);
205 }
206
internal_close()207 void ConnectionPool::internal_close() {
208 if (close_state_ == CLOSE_STATE_OPEN) {
209 close_state_ = CLOSE_STATE_CLOSING;
210
211 // Make copies of connection/connector data structures to prevent iterator
212 // invalidation.
213
214 PooledConnection::Vec connections(connections_);
215 for (PooledConnection::Vec::iterator it = connections.begin(), end = connections.end();
216 it != end; ++it) {
217 (*it)->close();
218 }
219
220 DelayedConnector::Vec pending_connections(pending_connections_);
221 for (DelayedConnector::Vec::iterator it = pending_connections.begin(),
222 end = pending_connections.end();
223 it != end; ++it) {
224 (*it)->cancel();
225 }
226
227 close_state_ = CLOSE_STATE_WAITING_FOR_CONNECTIONS;
228 maybe_closed();
229 }
230 }
231
maybe_closed()232 void ConnectionPool::maybe_closed() {
233 // Remove the pool once all current connections and pending connections
234 // are terminated.
235 if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty() &&
236 pending_connections_.empty()) {
237 close_state_ = CLOSE_STATE_CLOSED;
238 // Only mark DOWN if it's UP otherwise we might get multiple DOWN events
239 // when connecting the pool.
240 if (notify_state_ == NOTIFY_STATE_UP) {
241 listener_->on_pool_down(host_->address());
242 }
243 listener_->on_close(this);
244 dec_ref();
245 }
246 }
247
on_reconnect(DelayedConnector * connector)248 void ConnectionPool::on_reconnect(DelayedConnector* connector) {
249 pending_connections_.erase(
250 std::remove(pending_connections_.begin(), pending_connections_.end(), connector),
251 pending_connections_.end());
252
253 ReconnectionSchedules::iterator it = reconnection_schedules_.find(connector);
254 assert(it != reconnection_schedules_.end() &&
255 "No reconnection schedule associated with connector");
256
257 ScopedPtr<ReconnectionSchedule> schedule(it->second);
258 reconnection_schedules_.erase(it);
259
260 if (close_state_ != CLOSE_STATE_OPEN) {
261 maybe_closed();
262 return;
263 }
264
265 if (connector->is_ok()) {
266 add_connection(
267 PooledConnection::Ptr(new PooledConnection(this, connector->release_connection())));
268 notify_up_or_down();
269 } else if (!connector->is_canceled()) {
270 if (connector->is_critical_error()) {
271 LOG_ERROR("Closing established connection pool to host %s because of the following error: %s",
272 address().to_string().c_str(), connector->error_message().c_str());
273 notify_critical_error(connector->error_code(), connector->error_message());
274 internal_close();
275 } else {
276 LOG_WARN(
277 "Connection pool was unable to reconnect to host %s because of the following error: %s",
278 address().to_string().c_str(), connector->error_message().c_str());
279 schedule_reconnect(schedule.release());
280 }
281 }
282 }
283