1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #include "connection_pool.hpp"
18 
19 #include "config.hpp"
20 #include "connection_pool_manager.hpp"
21 #include "metrics.hpp"
22 #include "utils.hpp"
23 
24 #include <algorithm>
25 
26 using namespace datastax;
27 using namespace datastax::internal::core;
28 
least_busy_comp(const PooledConnection::Ptr & a,const PooledConnection::Ptr & b)29 static inline bool least_busy_comp(const PooledConnection::Ptr& a, const PooledConnection::Ptr& b) {
30   // Don't consider closed connections to be the least busy.
31   if (a->is_closing()) { // "a" is closed so it can't be the least busy.
32     return false;
33   } else if (b->is_closing()) { // "a" is not close, but "b" is closed so "a" is less busy.
34     return true;
35   }
36   // Both "a" and "b" are not closed so compare their inflight request counts.
37   return a->inflight_request_count() < b->inflight_request_count();
38 }
39 
ConnectionPoolSettings()40 ConnectionPoolSettings::ConnectionPoolSettings()
41     : num_connections_per_host(CASS_DEFAULT_NUM_CONNECTIONS_PER_HOST)
42     , reconnection_policy(new ExponentialReconnectionPolicy()) {}
43 
ConnectionPoolSettings(const Config & config)44 ConnectionPoolSettings::ConnectionPoolSettings(const Config& config)
45     : connection_settings(config)
46     , num_connections_per_host(config.core_connections_per_host())
47     , reconnection_policy(config.reconnection_policy()) {}
48 
49 class NopConnectionPoolListener : public ConnectionPoolListener {
50 public:
on_pool_up(const Address & address)51   virtual void on_pool_up(const Address& address) {}
52 
on_pool_down(const Address & address)53   virtual void on_pool_down(const Address& address) {}
54 
on_pool_critical_error(const Address & address,Connector::ConnectionError code,const String & message)55   virtual void on_pool_critical_error(const Address& address, Connector::ConnectionError code,
56                                       const String& message) {}
57 
on_close(ConnectionPool * pool)58   virtual void on_close(ConnectionPool* pool) {}
59 };
60 
61 NopConnectionPoolListener nop_connection_pool_listener__;
62 
ConnectionPool(const Connection::Vec & connections,ConnectionPoolListener * listener,const String & keyspace,uv_loop_t * loop,const Host::Ptr & host,ProtocolVersion protocol_version,const ConnectionPoolSettings & settings,Metrics * metrics)63 ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoolListener* listener,
64                                const String& keyspace, uv_loop_t* loop, const Host::Ptr& host,
65                                ProtocolVersion protocol_version,
66                                const ConnectionPoolSettings& settings, Metrics* metrics)
67     : listener_(listener ? listener : &nop_connection_pool_listener__)
68     , keyspace_(keyspace)
69     , loop_(loop)
70     , host_(host)
71     , protocol_version_(protocol_version)
72     , settings_(settings)
73     , metrics_(metrics)
74     , close_state_(CLOSE_STATE_OPEN)
75     , notify_state_(NOTIFY_STATE_NEW) {
76   inc_ref(); // Reference for the lifetime of the pooled connections
77   set_pointer_keys(reconnection_schedules_);
78   set_pointer_keys(to_flush_);
79 
80   for (Connection::Vec::const_iterator it = connections.begin(), end = connections.end(); it != end;
81        ++it) {
82     const Connection::Ptr& connection(*it);
83     if (!connection->is_closing()) {
84       add_connection(PooledConnection::Ptr(new PooledConnection(this, connection)));
85     }
86   }
87 
88   notify_up_or_down();
89 
90   // We had non-critical errors or some connections closed
91   assert(connections.size() <= settings_.num_connections_per_host);
92   size_t needed = settings_.num_connections_per_host - connections_.size();
93   for (size_t i = 0; i < needed; ++i) {
94     schedule_reconnect();
95   }
96 }
97 
find_least_busy() const98 PooledConnection::Ptr ConnectionPool::find_least_busy() const {
99   PooledConnection::Vec::const_iterator it =
100       std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
101   if (it == connections_.end() || (*it)->is_closing()) {
102     return PooledConnection::Ptr();
103   }
104   return *it;
105 }
106 
has_connections() const107 bool ConnectionPool::has_connections() const { return !connections_.empty(); }
108 
flush()109 void ConnectionPool::flush() {
110   for (DenseHashSet<PooledConnection*>::const_iterator it = to_flush_.begin(),
111                                                        end = to_flush_.end();
112        it != end; ++it) {
113     (*it)->flush();
114   }
115   to_flush_.clear();
116 }
117 
close()118 void ConnectionPool::close() { internal_close(); }
119 
attempt_immediate_connect()120 void ConnectionPool::attempt_immediate_connect() {
121   for (DelayedConnector::Vec::iterator it = pending_connections_.begin(),
122                                        end = pending_connections_.end();
123        it != end; ++it) {
124     (*it)->attempt_immediate_connect();
125   }
126 }
127 
set_listener(ConnectionPoolListener * listener)128 void ConnectionPool::set_listener(ConnectionPoolListener* listener) {
129   listener_ = listener ? listener : &nop_connection_pool_listener__;
130 }
131 
set_keyspace(const String & keyspace)132 void ConnectionPool::set_keyspace(const String& keyspace) { keyspace_ = keyspace; }
133 
requires_flush(PooledConnection * connection,ConnectionPool::Protected)134 void ConnectionPool::requires_flush(PooledConnection* connection, ConnectionPool::Protected) {
135   if (to_flush_.empty()) {
136     listener_->on_requires_flush(this);
137   }
138   to_flush_.insert(connection);
139 }
140 
close_connection(PooledConnection * connection,Protected)141 void ConnectionPool::close_connection(PooledConnection* connection, Protected) {
142   if (metrics_) {
143     metrics_->total_connections.dec();
144   }
145   connections_.erase(std::remove(connections_.begin(), connections_.end(), connection),
146                      connections_.end());
147   to_flush_.erase(connection);
148 
149   if (close_state_ != CLOSE_STATE_OPEN) {
150     maybe_closed();
151     return;
152   }
153 
154   // When there are no more connections available then notify that the host
155   // is down.
156   notify_up_or_down();
157   schedule_reconnect();
158 }
159 
add_connection(const PooledConnection::Ptr & connection)160 void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) {
161   if (metrics_) {
162     metrics_->total_connections.inc();
163   }
164   connections_.push_back(connection);
165 }
166 
notify_up_or_down()167 void ConnectionPool::notify_up_or_down() {
168   if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) &&
169       connections_.empty()) {
170     notify_state_ = NOTIFY_STATE_DOWN;
171     listener_->on_pool_down(host_->address());
172   } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) &&
173              !connections_.empty()) {
174     notify_state_ = NOTIFY_STATE_UP;
175     listener_->on_pool_up(host_->address());
176   }
177 }
178 
notify_critical_error(Connector::ConnectionError code,const String & message)179 void ConnectionPool::notify_critical_error(Connector::ConnectionError code, const String& message) {
180   if (notify_state_ != NOTIFY_STATE_CRITICAL) {
181     notify_state_ = NOTIFY_STATE_CRITICAL;
182     listener_->on_pool_critical_error(host_->address(), code, message);
183   }
184 }
185 
schedule_reconnect(ReconnectionSchedule * schedule)186 void ConnectionPool::schedule_reconnect(ReconnectionSchedule* schedule) {
187   DelayedConnector::Ptr connector(new DelayedConnector(
188       host_, protocol_version_, bind_callback(&ConnectionPool::on_reconnect, this)));
189 
190   if (!schedule) {
191     schedule = settings_.reconnection_policy->new_reconnection_schedule();
192   }
193   reconnection_schedules_[connector.get()] = schedule;
194 
195   uint64_t delay_ms = schedule->next_delay_ms();
196   LOG_INFO("Scheduling %s reconnect for host %s in %llums on connection pool (%p) ",
197            settings_.reconnection_policy->name(), host_->address().to_string().c_str(),
198            static_cast<unsigned long long>(delay_ms), static_cast<void*>(this));
199 
200   pending_connections_.push_back(connector);
201   connector->with_keyspace(keyspace())
202       ->with_metrics(metrics_)
203       ->with_settings(settings_.connection_settings)
204       ->delayed_connect(loop_, delay_ms);
205 }
206 
internal_close()207 void ConnectionPool::internal_close() {
208   if (close_state_ == CLOSE_STATE_OPEN) {
209     close_state_ = CLOSE_STATE_CLOSING;
210 
211     // Make copies of connection/connector data structures to prevent iterator
212     // invalidation.
213 
214     PooledConnection::Vec connections(connections_);
215     for (PooledConnection::Vec::iterator it = connections.begin(), end = connections.end();
216          it != end; ++it) {
217       (*it)->close();
218     }
219 
220     DelayedConnector::Vec pending_connections(pending_connections_);
221     for (DelayedConnector::Vec::iterator it = pending_connections.begin(),
222                                          end = pending_connections.end();
223          it != end; ++it) {
224       (*it)->cancel();
225     }
226 
227     close_state_ = CLOSE_STATE_WAITING_FOR_CONNECTIONS;
228     maybe_closed();
229   }
230 }
231 
maybe_closed()232 void ConnectionPool::maybe_closed() {
233   // Remove the pool once all current connections and pending connections
234   // are terminated.
235   if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty() &&
236       pending_connections_.empty()) {
237     close_state_ = CLOSE_STATE_CLOSED;
238     // Only mark DOWN if it's UP otherwise we might get multiple DOWN events
239     // when connecting the pool.
240     if (notify_state_ == NOTIFY_STATE_UP) {
241       listener_->on_pool_down(host_->address());
242     }
243     listener_->on_close(this);
244     dec_ref();
245   }
246 }
247 
on_reconnect(DelayedConnector * connector)248 void ConnectionPool::on_reconnect(DelayedConnector* connector) {
249   pending_connections_.erase(
250       std::remove(pending_connections_.begin(), pending_connections_.end(), connector),
251       pending_connections_.end());
252 
253   ReconnectionSchedules::iterator it = reconnection_schedules_.find(connector);
254   assert(it != reconnection_schedules_.end() &&
255          "No reconnection schedule associated with connector");
256 
257   ScopedPtr<ReconnectionSchedule> schedule(it->second);
258   reconnection_schedules_.erase(it);
259 
260   if (close_state_ != CLOSE_STATE_OPEN) {
261     maybe_closed();
262     return;
263   }
264 
265   if (connector->is_ok()) {
266     add_connection(
267         PooledConnection::Ptr(new PooledConnection(this, connector->release_connection())));
268     notify_up_or_down();
269   } else if (!connector->is_canceled()) {
270     if (connector->is_critical_error()) {
271       LOG_ERROR("Closing established connection pool to host %s because of the following error: %s",
272                 address().to_string().c_str(), connector->error_message().c_str());
273       notify_critical_error(connector->error_code(), connector->error_message());
274       internal_close();
275     } else {
276       LOG_WARN(
277           "Connection pool was unable to reconnect to host %s because of the following error: %s",
278           address().to_string().c_str(), connector->error_message().c_str());
279       schedule_reconnect(schedule.release());
280     }
281   }
282 }
283