1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "session.hpp"
18
19 #include "batch_request.hpp"
20 #include "cluster_config.hpp"
21 #include "constants.hpp"
22 #include "execute_request.hpp"
23 #include "external.hpp"
24 #include "logger.hpp"
25 #include "metrics.hpp"
26 #include "monitor_reporting.hpp"
27 #include "prepare_all_handler.hpp"
28 #include "prepare_request.hpp"
29 #include "request_processor_initializer.hpp"
30 #include "scoped_lock.hpp"
31 #include "statement.hpp"
32
33 using namespace datastax;
34 using namespace datastax::internal::core;
35
36 extern "C" {
37
cass_session_new()38 CassSession* cass_session_new() {
39 Session* session = new Session();
40 return CassSession::to(session);
41 }
42
cass_session_free(CassSession * session)43 void cass_session_free(CassSession* session) {
44 // This attempts to close the session because the joining will
45 // hang indefinitely otherwise. This causes minimal delay
46 // if the session is already closed.
47 session->close()->wait();
48
49 delete session->from();
50 }
51
cass_session_connect(CassSession * session,const CassCluster * cluster)52 CassFuture* cass_session_connect(CassSession* session, const CassCluster* cluster) {
53 return cass_session_connect_keyspace(session, cluster, "");
54 }
55
cass_session_connect_keyspace(CassSession * session,const CassCluster * cluster,const char * keyspace)56 CassFuture* cass_session_connect_keyspace(CassSession* session, const CassCluster* cluster,
57 const char* keyspace) {
58 return cass_session_connect_keyspace_n(session, cluster, keyspace, SAFE_STRLEN(keyspace));
59 }
60
cass_session_connect_keyspace_n(CassSession * session,const CassCluster * cluster,const char * keyspace,size_t keyspace_length)61 CassFuture* cass_session_connect_keyspace_n(CassSession* session, const CassCluster* cluster,
62 const char* keyspace, size_t keyspace_length) {
63 Future::Ptr future(session->connect(cluster->config(), String(keyspace, keyspace_length)));
64 future->inc_ref();
65 return CassFuture::to(future.get());
66 }
67
cass_session_close(CassSession * session)68 CassFuture* cass_session_close(CassSession* session) {
69 Future::Ptr future(session->close());
70 future->inc_ref();
71 return CassFuture::to(future.get());
72 }
73
cass_session_prepare(CassSession * session,const char * query)74 CassFuture* cass_session_prepare(CassSession* session, const char* query) {
75 return cass_session_prepare_n(session, query, SAFE_STRLEN(query));
76 }
77
cass_session_prepare_n(CassSession * session,const char * query,size_t query_length)78 CassFuture* cass_session_prepare_n(CassSession* session, const char* query, size_t query_length) {
79 Future::Ptr future(session->prepare(query, query_length));
80 future->inc_ref();
81 return CassFuture::to(future.get());
82 }
83
cass_session_prepare_from_existing(CassSession * session,CassStatement * statement)84 CassFuture* cass_session_prepare_from_existing(CassSession* session, CassStatement* statement) {
85 Future::Ptr future(session->prepare(statement));
86 future->inc_ref();
87 return CassFuture::to(future.get());
88 }
89
cass_session_execute(CassSession * session,const CassStatement * statement)90 CassFuture* cass_session_execute(CassSession* session, const CassStatement* statement) {
91 Future::Ptr future(session->execute(Request::ConstPtr(statement->from())));
92 future->inc_ref();
93 return CassFuture::to(future.get());
94 }
95
cass_session_execute_batch(CassSession * session,const CassBatch * batch)96 CassFuture* cass_session_execute_batch(CassSession* session, const CassBatch* batch) {
97 Future::Ptr future(session->execute(Request::ConstPtr(batch->from())));
98 future->inc_ref();
99 return CassFuture::to(future.get());
100 }
101
cass_session_get_schema_meta(const CassSession * session)102 const CassSchemaMeta* cass_session_get_schema_meta(const CassSession* session) {
103 return CassSchemaMeta::to(new Metadata::SchemaSnapshot(session->cluster()->schema_snapshot()));
104 }
105
cass_session_get_metrics(const CassSession * session,CassMetrics * metrics)106 void cass_session_get_metrics(const CassSession* session, CassMetrics* metrics) {
107 const Metrics* internal_metrics = session->metrics();
108
109 if (internal_metrics == NULL) {
110 LOG_WARN("Attempted to get metrics before connecting session object");
111 memset(metrics, 0, sizeof(CassMetrics));
112 return;
113 }
114
115 Metrics::Histogram::Snapshot requests_snapshot;
116 internal_metrics->request_latencies.get_snapshot(&requests_snapshot);
117
118 metrics->requests.min = requests_snapshot.min;
119 metrics->requests.max = requests_snapshot.max;
120 metrics->requests.mean = requests_snapshot.mean;
121 metrics->requests.stddev = requests_snapshot.stddev;
122 metrics->requests.median = requests_snapshot.median;
123 metrics->requests.percentile_75th = requests_snapshot.percentile_75th;
124 metrics->requests.percentile_95th = requests_snapshot.percentile_95th;
125 metrics->requests.percentile_98th = requests_snapshot.percentile_98th;
126 metrics->requests.percentile_99th = requests_snapshot.percentile_99th;
127 metrics->requests.percentile_999th = requests_snapshot.percentile_999th;
128 metrics->requests.one_minute_rate = internal_metrics->request_rates.one_minute_rate();
129 metrics->requests.five_minute_rate = internal_metrics->request_rates.five_minute_rate();
130 metrics->requests.fifteen_minute_rate = internal_metrics->request_rates.fifteen_minute_rate();
131 metrics->requests.mean_rate = internal_metrics->request_rates.mean_rate();
132
133 metrics->stats.total_connections = internal_metrics->total_connections.sum();
134 metrics->stats.available_connections = metrics->stats.total_connections; // Deprecated
135 metrics->stats.exceeded_write_bytes_water_mark = 0; // Deprecated
136 metrics->stats.exceeded_pending_requests_water_mark = 0; // Deprecated
137
138 metrics->errors.connection_timeouts = internal_metrics->connection_timeouts.sum();
139 metrics->errors.pending_request_timeouts = 0; // Deprecated
140 metrics->errors.request_timeouts = internal_metrics->request_timeouts.sum();
141 }
142
cass_session_get_speculative_execution_metrics(const CassSession * session,CassSpeculativeExecutionMetrics * metrics)143 void cass_session_get_speculative_execution_metrics(const CassSession* session,
144 CassSpeculativeExecutionMetrics* metrics) {
145 const Metrics* internal_metrics = session->metrics();
146
147 if (internal_metrics == NULL) {
148 LOG_WARN("Attempted to get speculative execution metrics before connecting session object");
149 memset(metrics, 0, sizeof(CassSpeculativeExecutionMetrics));
150 return;
151 }
152
153 Metrics::Histogram::Snapshot speculative_snapshot;
154 internal_metrics->speculative_request_latencies.get_snapshot(&speculative_snapshot);
155
156 metrics->min = speculative_snapshot.min;
157 metrics->max = speculative_snapshot.max;
158 metrics->mean = speculative_snapshot.mean;
159 metrics->stddev = speculative_snapshot.stddev;
160 metrics->median = speculative_snapshot.median;
161 metrics->percentile_75th = speculative_snapshot.percentile_75th;
162 metrics->percentile_95th = speculative_snapshot.percentile_95th;
163 metrics->percentile_98th = speculative_snapshot.percentile_98th;
164 metrics->percentile_99th = speculative_snapshot.percentile_99th;
165 metrics->percentile_999th = speculative_snapshot.percentile_999th;
166 metrics->count = internal_metrics->request_rates.speculative_request_count();
167 metrics->percentage = internal_metrics->request_rates.speculative_request_percent();
168 }
169
cass_session_get_client_id(CassSession * session)170 CassUuid cass_session_get_client_id(CassSession* session) { return session->client_id(); }
171
172 } // extern "C"
173
least_busy_comp(const RequestProcessor::Ptr & a,const RequestProcessor::Ptr & b)174 static inline bool least_busy_comp(const RequestProcessor::Ptr& a, const RequestProcessor::Ptr& b) {
175 return a->request_count() < b->request_count();
176 }
177
178 namespace datastax { namespace internal { namespace core {
179
180 /**
181 * An initialize helper class for `Session`. This keeps the initialization
182 * logic and data out of the core class itself.
183 */
184 class SessionInitializer : public RefCounted<SessionInitializer> {
185 public:
186 typedef SharedRefPtr<SessionInitializer> Ptr;
187
SessionInitializer(Session * session)188 SessionInitializer(Session* session)
189 : session_(session)
190 , remaining_(0)
191 , error_code_(CASS_OK) {
192 uv_mutex_init(&mutex_);
193 }
194
SessionInitializer()195 SessionInitializer() { uv_mutex_destroy(&mutex_); }
196
initialize(const Host::Ptr & connected_host,ProtocolVersion protocol_version,const HostMap & hosts,const TokenMap::Ptr & token_map,const String & local_dc)197 void initialize(const Host::Ptr& connected_host, ProtocolVersion protocol_version,
198 const HostMap& hosts, const TokenMap::Ptr& token_map, const String& local_dc) {
199 inc_ref();
200
201 const size_t thread_count_io = remaining_ = session_->config().thread_count_io();
202 for (size_t i = 0; i < thread_count_io; ++i) {
203 RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer(
204 connected_host, protocol_version, hosts, token_map, local_dc,
205 bind_callback(&SessionInitializer::on_initialize, this)));
206
207 RequestProcessorSettings settings(session_->config());
208 settings.connection_pool_settings.connection_settings.client_id =
209 to_string(session_->client_id());
210
211 initializer->with_settings(RequestProcessorSettings(settings))
212 ->with_listener(session_)
213 ->with_keyspace(session_->connect_keyspace())
214 ->with_metrics(session_->metrics())
215 ->with_random(session_->random())
216 ->initialize(session_->event_loop_group_->get(i));
217 }
218 }
219
220 private:
on_initialize(RequestProcessorInitializer * initializer)221 void on_initialize(RequestProcessorInitializer* initializer) {
222 // A lock is required because request processors are initialized on
223 // different threads .
224 ScopedMutex l(&mutex_);
225
226 if (initializer->is_ok()) {
227 request_processors_.push_back(initializer->release_processor());
228 } else {
229 switch (initializer->error_code()) {
230 case RequestProcessorInitializer::REQUEST_PROCESSOR_ERROR_KEYSPACE:
231 error_code_ = CASS_ERROR_LIB_UNABLE_TO_SET_KEYSPACE;
232 break;
233 case RequestProcessorInitializer::REQUEST_PROCESSOR_ERROR_NO_HOSTS_AVAILABLE:
234 error_code_ = CASS_ERROR_LIB_NO_HOSTS_AVAILABLE;
235 break;
236 case RequestProcessorInitializer::REQUEST_PROCESSOR_ERROR_UNABLE_TO_INIT:
237 error_code_ = CASS_ERROR_LIB_UNABLE_TO_INIT;
238 break;
239 default:
240 error_code_ = CASS_ERROR_LIB_INTERNAL_ERROR;
241 break;
242 }
243 error_message_ = initializer->error_message();
244 }
245
246 if (remaining_ > 0 && --remaining_ == 0) {
247 { // This requires locking because cluster events can happen during
248 // initialization.
249 ScopedMutex l(&session_->mutex_);
250 session_->request_processor_count_ = request_processors_.size();
251 session_->request_processors_ = request_processors_;
252 }
253 if (error_code_ != CASS_OK) {
254 session_->notify_connect_failed(error_code_, error_message_);
255 } else {
256 session_->notify_connected();
257 session_->cluster()->start_monitor_reporting(to_string(session_->client_id()),
258 to_string(session_->session_id()),
259 session_->config());
260 }
261 l.unlock(); // Unlock before destroying the object
262 dec_ref();
263 }
264 }
265
266 private:
267 uv_mutex_t mutex_;
268 Session* session_;
269 size_t remaining_;
270 CassError error_code_;
271 String error_message_;
272 RequestProcessor::Vec request_processors_;
273 };
274
275 }}} // namespace datastax::internal::core
276
Session()277 Session::Session()
278 : request_processor_count_(0)
279 , is_closing_(false) {
280 uv_mutex_init(&mutex_);
281 }
282
~Session()283 Session::~Session() {
284 join();
285 uv_mutex_destroy(&mutex_);
286 }
287
prepare(const char * statement,size_t length)288 Future::Ptr Session::prepare(const char* statement, size_t length) {
289 PrepareRequest::Ptr prepare(new PrepareRequest(String(statement, length)));
290
291 ResponseFuture::Ptr future(new ResponseFuture(cluster()->schema_snapshot()));
292 future->prepare_request = PrepareRequest::ConstPtr(prepare);
293
294 execute(RequestHandler::Ptr(new RequestHandler(prepare, future, metrics())));
295
296 return future;
297 }
298
prepare(const Statement * statement)299 Future::Ptr Session::prepare(const Statement* statement) {
300 String query;
301
302 if (statement->opcode() == CQL_OPCODE_QUERY) { // Simple statement
303 query = statement->query();
304 } else { // Bound statement
305 query = static_cast<const ExecuteRequest*>(statement)->prepared()->query();
306 }
307
308 PrepareRequest::Ptr prepare(new PrepareRequest(query));
309
310 // Inherit the settings of the existing statement. These will in turn be
311 // inherited by bound statements.
312 prepare->set_settings(statement->settings());
313
314 ResponseFuture::Ptr future(new ResponseFuture(cluster()->schema_snapshot()));
315 future->prepare_request = PrepareRequest::ConstPtr(prepare);
316
317 execute(RequestHandler::Ptr(new RequestHandler(prepare, future, metrics())));
318
319 return future;
320 }
321
execute(const Request::ConstPtr & request)322 Future::Ptr Session::execute(const Request::ConstPtr& request) {
323 ResponseFuture::Ptr future(new ResponseFuture());
324
325 RequestHandler::Ptr request_handler(new RequestHandler(request, future, metrics()));
326
327 if (request_handler->request()->opcode() == CQL_OPCODE_EXECUTE) {
328 const ExecuteRequest* execute = static_cast<const ExecuteRequest*>(request_handler->request());
329 request_handler->set_prepared_metadata(cluster()->prepared(execute->prepared()->id()));
330 }
331
332 execute(request_handler);
333
334 return future;
335 }
336
execute(const RequestHandler::Ptr & request_handler)337 void Session::execute(const RequestHandler::Ptr& request_handler) {
338 if (state() != SESSION_STATE_CONNECTED) {
339 request_handler->set_error(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, "Session is not connected");
340 return;
341 }
342
343 // This intentionally doesn't lock the request processors. The processors will
344 // be populated before the connect future returns and calling execute during
345 // the connection process is undefined behavior. Locking would cause unnecessary
346 // overhead for something that's constant once the session is connected.
347 const RequestProcessor::Ptr& request_processor =
348 *std::min_element(request_processors_.begin(), request_processors_.end(), least_busy_comp);
349 request_processor->process_request(request_handler);
350 }
351
join()352 void Session::join() {
353 if (event_loop_group_) {
354 event_loop_group_->close_handles();
355 event_loop_group_->join();
356 event_loop_group_.reset();
357 }
358 }
359
on_connect(const Host::Ptr & connected_host,ProtocolVersion protocol_version,const HostMap & hosts,const TokenMap::Ptr & token_map,const String & local_dc)360 void Session::on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version,
361 const HostMap& hosts, const TokenMap::Ptr& token_map,
362 const String& local_dc) {
363 int rc = 0;
364
365 if (hosts.empty()) {
366 notify_connect_failed(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE,
367 "No hosts provided or no hosts resolved");
368 return;
369 }
370
371 join();
372 event_loop_group_.reset(new RoundRobinEventLoopGroup(config().thread_count_io()));
373 rc = event_loop_group_->init("Request Processor");
374 if (rc != 0) {
375 notify_connect_failed(CASS_ERROR_LIB_UNABLE_TO_INIT, "Unable to initialize event loop group");
376 return;
377 }
378
379 rc = event_loop_group_->run();
380 if (rc != 0) {
381 notify_connect_failed(CASS_ERROR_LIB_UNABLE_TO_INIT, "Unable to run event loop group");
382 return;
383 }
384
385 for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
386 const Host::Ptr& host = it->second;
387 config().host_listener()->on_host_added(host);
388 config().host_listener()->on_host_up(
389 host); // If host is down it will be marked down later in the connection process
390 }
391
392 request_processors_.clear();
393 request_processor_count_ = 0;
394 is_closing_ = false;
395 SessionInitializer::Ptr initializer(new SessionInitializer(this));
396 initializer->initialize(connected_host, protocol_version, hosts, token_map, local_dc);
397 }
398
on_close()399 void Session::on_close() {
400 // If there are request processors still connected those need to be closed
401 // first before sending the close notification.
402 ScopedMutex l(&mutex_);
403 is_closing_ = true;
404 if (request_processor_count_ > 0) {
405 for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
406 end = request_processors_.end();
407 it != end; ++it) {
408 (*it)->close();
409 }
410 } else {
411 notify_closed();
412 }
413 }
414
on_host_up(const Host::Ptr & host)415 void Session::on_host_up(const Host::Ptr& host) {
416 // Ignore up events from the control connection; however external host
417 // listeners should still be notified. The connection pools will reconnect
418 // themselves when the host becomes available.
419 config().host_listener()->on_host_up(host);
420 }
421
on_host_down(const Host::Ptr & host)422 void Session::on_host_down(const Host::Ptr& host) {
423 // Ignore down events from the control connection; however external host
424 // listeners should still be notified. The connection pools can determine if a
425 // host is down themselves. The control connection host can become partitioned
426 // from the rest of the cluster and in that scenario a down event from the
427 // down event from the control connection would be invalid.
428 ScopedMutex l(&mutex_);
429 if (!is_closing_) { // Refrain from host down events while session is closing
430 l.unlock();
431 config().host_listener()->on_host_down(host);
432 }
433 }
434
on_host_added(const Host::Ptr & host)435 void Session::on_host_added(const Host::Ptr& host) {
436 { // Lock for request processor
437 ScopedMutex l(&mutex_);
438 for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
439 end = request_processors_.end();
440 it != end; ++it) {
441 (*it)->notify_host_added(host);
442 }
443 }
444 config().host_listener()->on_host_added(host);
445 }
446
on_host_removed(const Host::Ptr & host)447 void Session::on_host_removed(const Host::Ptr& host) {
448 { // Lock for request processor
449 ScopedMutex l(&mutex_);
450 for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
451 end = request_processors_.end();
452 it != end; ++it) {
453 (*it)->notify_host_removed(host);
454 }
455 }
456 config().host_listener()->on_host_removed(host);
457 }
458
on_token_map_updated(const TokenMap::Ptr & token_map)459 void Session::on_token_map_updated(const TokenMap::Ptr& token_map) {
460 ScopedMutex l(&mutex_);
461 for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
462 end = request_processors_.end();
463 it != end; ++it) {
464 (*it)->notify_token_map_updated(token_map);
465 }
466 }
467
on_host_maybe_up(const Host::Ptr & host)468 void Session::on_host_maybe_up(const Host::Ptr& host) {
469 ScopedMutex l(&mutex_);
470 for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
471 end = request_processors_.end();
472 it != end; ++it) {
473 (*it)->notify_host_maybe_up(host->address());
474 }
475 }
476
on_host_ready(const Host::Ptr & host)477 void Session::on_host_ready(const Host::Ptr& host) {
478 ScopedMutex l(&mutex_);
479 for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
480 end = request_processors_.end();
481 it != end; ++it) {
482 (*it)->notify_host_ready(host);
483 }
484 }
485
on_pool_up(const Address & address)486 void Session::on_pool_up(const Address& address) { cluster()->notify_host_up(address); }
487
on_pool_down(const Address & address)488 void Session::on_pool_down(const Address& address) { cluster()->notify_host_down(address); }
489
on_pool_critical_error(const Address & address,Connector::ConnectionError code,const String & message)490 void Session::on_pool_critical_error(const Address& address, Connector::ConnectionError code,
491 const String& message) {
492 cluster()->notify_host_down(address);
493 }
494
on_keyspace_changed(const String & keyspace,const KeyspaceChangedHandler::Ptr & handler)495 void Session::on_keyspace_changed(const String& keyspace,
496 const KeyspaceChangedHandler::Ptr& handler) {
497 ScopedMutex l(&mutex_);
498 for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
499 end = request_processors_.end();
500 it != end; ++it) {
501 (*it)->set_keyspace(keyspace, handler);
502 }
503 }
504
on_prepared_metadata_changed(const String & id,const PreparedMetadata::Entry::Ptr & entry)505 void Session::on_prepared_metadata_changed(const String& id,
506 const PreparedMetadata::Entry::Ptr& entry) {
507 cluster()->prepared(id, entry);
508 }
509
on_close(RequestProcessor * processor)510 void Session::on_close(RequestProcessor* processor) {
511 // Requires a lock because the close callback is called from several
512 // different request processor threads.
513 ScopedMutex l(&mutex_);
514 if (request_processor_count_ > 0 && --request_processor_count_ == 0) {
515 notify_closed();
516 }
517 }
518