1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #include "session.hpp"
18 
19 #include "batch_request.hpp"
20 #include "cluster_config.hpp"
21 #include "constants.hpp"
22 #include "execute_request.hpp"
23 #include "external.hpp"
24 #include "logger.hpp"
25 #include "metrics.hpp"
26 #include "monitor_reporting.hpp"
27 #include "prepare_all_handler.hpp"
28 #include "prepare_request.hpp"
29 #include "request_processor_initializer.hpp"
30 #include "scoped_lock.hpp"
31 #include "statement.hpp"
32 
33 using namespace datastax;
34 using namespace datastax::internal::core;
35 
36 extern "C" {
37 
cass_session_new()38 CassSession* cass_session_new() {
39   Session* session = new Session();
40   return CassSession::to(session);
41 }
42 
cass_session_free(CassSession * session)43 void cass_session_free(CassSession* session) {
44   // This attempts to close the session because the joining will
45   // hang indefinitely otherwise. This causes minimal delay
46   // if the session is already closed.
47   session->close()->wait();
48 
49   delete session->from();
50 }
51 
cass_session_connect(CassSession * session,const CassCluster * cluster)52 CassFuture* cass_session_connect(CassSession* session, const CassCluster* cluster) {
53   return cass_session_connect_keyspace(session, cluster, "");
54 }
55 
cass_session_connect_keyspace(CassSession * session,const CassCluster * cluster,const char * keyspace)56 CassFuture* cass_session_connect_keyspace(CassSession* session, const CassCluster* cluster,
57                                           const char* keyspace) {
58   return cass_session_connect_keyspace_n(session, cluster, keyspace, SAFE_STRLEN(keyspace));
59 }
60 
cass_session_connect_keyspace_n(CassSession * session,const CassCluster * cluster,const char * keyspace,size_t keyspace_length)61 CassFuture* cass_session_connect_keyspace_n(CassSession* session, const CassCluster* cluster,
62                                             const char* keyspace, size_t keyspace_length) {
63   Future::Ptr future(session->connect(cluster->config(), String(keyspace, keyspace_length)));
64   future->inc_ref();
65   return CassFuture::to(future.get());
66 }
67 
cass_session_close(CassSession * session)68 CassFuture* cass_session_close(CassSession* session) {
69   Future::Ptr future(session->close());
70   future->inc_ref();
71   return CassFuture::to(future.get());
72 }
73 
cass_session_prepare(CassSession * session,const char * query)74 CassFuture* cass_session_prepare(CassSession* session, const char* query) {
75   return cass_session_prepare_n(session, query, SAFE_STRLEN(query));
76 }
77 
cass_session_prepare_n(CassSession * session,const char * query,size_t query_length)78 CassFuture* cass_session_prepare_n(CassSession* session, const char* query, size_t query_length) {
79   Future::Ptr future(session->prepare(query, query_length));
80   future->inc_ref();
81   return CassFuture::to(future.get());
82 }
83 
cass_session_prepare_from_existing(CassSession * session,CassStatement * statement)84 CassFuture* cass_session_prepare_from_existing(CassSession* session, CassStatement* statement) {
85   Future::Ptr future(session->prepare(statement));
86   future->inc_ref();
87   return CassFuture::to(future.get());
88 }
89 
cass_session_execute(CassSession * session,const CassStatement * statement)90 CassFuture* cass_session_execute(CassSession* session, const CassStatement* statement) {
91   Future::Ptr future(session->execute(Request::ConstPtr(statement->from())));
92   future->inc_ref();
93   return CassFuture::to(future.get());
94 }
95 
cass_session_execute_batch(CassSession * session,const CassBatch * batch)96 CassFuture* cass_session_execute_batch(CassSession* session, const CassBatch* batch) {
97   Future::Ptr future(session->execute(Request::ConstPtr(batch->from())));
98   future->inc_ref();
99   return CassFuture::to(future.get());
100 }
101 
cass_session_get_schema_meta(const CassSession * session)102 const CassSchemaMeta* cass_session_get_schema_meta(const CassSession* session) {
103   return CassSchemaMeta::to(new Metadata::SchemaSnapshot(session->cluster()->schema_snapshot()));
104 }
105 
cass_session_get_metrics(const CassSession * session,CassMetrics * metrics)106 void cass_session_get_metrics(const CassSession* session, CassMetrics* metrics) {
107   const Metrics* internal_metrics = session->metrics();
108 
109   if (internal_metrics == NULL) {
110     LOG_WARN("Attempted to get metrics before connecting session object");
111     memset(metrics, 0, sizeof(CassMetrics));
112     return;
113   }
114 
115   Metrics::Histogram::Snapshot requests_snapshot;
116   internal_metrics->request_latencies.get_snapshot(&requests_snapshot);
117 
118   metrics->requests.min = requests_snapshot.min;
119   metrics->requests.max = requests_snapshot.max;
120   metrics->requests.mean = requests_snapshot.mean;
121   metrics->requests.stddev = requests_snapshot.stddev;
122   metrics->requests.median = requests_snapshot.median;
123   metrics->requests.percentile_75th = requests_snapshot.percentile_75th;
124   metrics->requests.percentile_95th = requests_snapshot.percentile_95th;
125   metrics->requests.percentile_98th = requests_snapshot.percentile_98th;
126   metrics->requests.percentile_99th = requests_snapshot.percentile_99th;
127   metrics->requests.percentile_999th = requests_snapshot.percentile_999th;
128   metrics->requests.one_minute_rate = internal_metrics->request_rates.one_minute_rate();
129   metrics->requests.five_minute_rate = internal_metrics->request_rates.five_minute_rate();
130   metrics->requests.fifteen_minute_rate = internal_metrics->request_rates.fifteen_minute_rate();
131   metrics->requests.mean_rate = internal_metrics->request_rates.mean_rate();
132 
133   metrics->stats.total_connections = internal_metrics->total_connections.sum();
134   metrics->stats.available_connections = metrics->stats.total_connections; // Deprecated
135   metrics->stats.exceeded_write_bytes_water_mark = 0;                      // Deprecated
136   metrics->stats.exceeded_pending_requests_water_mark = 0;                 // Deprecated
137 
138   metrics->errors.connection_timeouts = internal_metrics->connection_timeouts.sum();
139   metrics->errors.pending_request_timeouts = 0; // Deprecated
140   metrics->errors.request_timeouts = internal_metrics->request_timeouts.sum();
141 }
142 
cass_session_get_speculative_execution_metrics(const CassSession * session,CassSpeculativeExecutionMetrics * metrics)143 void cass_session_get_speculative_execution_metrics(const CassSession* session,
144                                                     CassSpeculativeExecutionMetrics* metrics) {
145   const Metrics* internal_metrics = session->metrics();
146 
147   if (internal_metrics == NULL) {
148     LOG_WARN("Attempted to get speculative execution metrics before connecting session object");
149     memset(metrics, 0, sizeof(CassSpeculativeExecutionMetrics));
150     return;
151   }
152 
153   Metrics::Histogram::Snapshot speculative_snapshot;
154   internal_metrics->speculative_request_latencies.get_snapshot(&speculative_snapshot);
155 
156   metrics->min = speculative_snapshot.min;
157   metrics->max = speculative_snapshot.max;
158   metrics->mean = speculative_snapshot.mean;
159   metrics->stddev = speculative_snapshot.stddev;
160   metrics->median = speculative_snapshot.median;
161   metrics->percentile_75th = speculative_snapshot.percentile_75th;
162   metrics->percentile_95th = speculative_snapshot.percentile_95th;
163   metrics->percentile_98th = speculative_snapshot.percentile_98th;
164   metrics->percentile_99th = speculative_snapshot.percentile_99th;
165   metrics->percentile_999th = speculative_snapshot.percentile_999th;
166   metrics->count = internal_metrics->request_rates.speculative_request_count();
167   metrics->percentage = internal_metrics->request_rates.speculative_request_percent();
168 }
169 
cass_session_get_client_id(CassSession * session)170 CassUuid cass_session_get_client_id(CassSession* session) { return session->client_id(); }
171 
172 } // extern "C"
173 
least_busy_comp(const RequestProcessor::Ptr & a,const RequestProcessor::Ptr & b)174 static inline bool least_busy_comp(const RequestProcessor::Ptr& a, const RequestProcessor::Ptr& b) {
175   return a->request_count() < b->request_count();
176 }
177 
178 namespace datastax { namespace internal { namespace core {
179 
180 /**
181  * An initialize helper class for `Session`. This keeps the initialization
182  * logic and data out of the core class itself.
183  */
184 class SessionInitializer : public RefCounted<SessionInitializer> {
185 public:
186   typedef SharedRefPtr<SessionInitializer> Ptr;
187 
SessionInitializer(Session * session)188   SessionInitializer(Session* session)
189       : session_(session)
190       , remaining_(0)
191       , error_code_(CASS_OK) {
192     uv_mutex_init(&mutex_);
193   }
194 
SessionInitializer()195   SessionInitializer() { uv_mutex_destroy(&mutex_); }
196 
initialize(const Host::Ptr & connected_host,ProtocolVersion protocol_version,const HostMap & hosts,const TokenMap::Ptr & token_map,const String & local_dc)197   void initialize(const Host::Ptr& connected_host, ProtocolVersion protocol_version,
198                   const HostMap& hosts, const TokenMap::Ptr& token_map, const String& local_dc) {
199     inc_ref();
200 
201     const size_t thread_count_io = remaining_ = session_->config().thread_count_io();
202     for (size_t i = 0; i < thread_count_io; ++i) {
203       RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer(
204           connected_host, protocol_version, hosts, token_map, local_dc,
205           bind_callback(&SessionInitializer::on_initialize, this)));
206 
207       RequestProcessorSettings settings(session_->config());
208       settings.connection_pool_settings.connection_settings.client_id =
209           to_string(session_->client_id());
210 
211       initializer->with_settings(RequestProcessorSettings(settings))
212           ->with_listener(session_)
213           ->with_keyspace(session_->connect_keyspace())
214           ->with_metrics(session_->metrics())
215           ->with_random(session_->random())
216           ->initialize(session_->event_loop_group_->get(i));
217     }
218   }
219 
220 private:
on_initialize(RequestProcessorInitializer * initializer)221   void on_initialize(RequestProcessorInitializer* initializer) {
222     // A lock is required because request processors are initialized on
223     // different threads .
224     ScopedMutex l(&mutex_);
225 
226     if (initializer->is_ok()) {
227       request_processors_.push_back(initializer->release_processor());
228     } else {
229       switch (initializer->error_code()) {
230         case RequestProcessorInitializer::REQUEST_PROCESSOR_ERROR_KEYSPACE:
231           error_code_ = CASS_ERROR_LIB_UNABLE_TO_SET_KEYSPACE;
232           break;
233         case RequestProcessorInitializer::REQUEST_PROCESSOR_ERROR_NO_HOSTS_AVAILABLE:
234           error_code_ = CASS_ERROR_LIB_NO_HOSTS_AVAILABLE;
235           break;
236         case RequestProcessorInitializer::REQUEST_PROCESSOR_ERROR_UNABLE_TO_INIT:
237           error_code_ = CASS_ERROR_LIB_UNABLE_TO_INIT;
238           break;
239         default:
240           error_code_ = CASS_ERROR_LIB_INTERNAL_ERROR;
241           break;
242       }
243       error_message_ = initializer->error_message();
244     }
245 
246     if (remaining_ > 0 && --remaining_ == 0) {
247       { // This requires locking because cluster events can happen during
248         // initialization.
249         ScopedMutex l(&session_->mutex_);
250         session_->request_processor_count_ = request_processors_.size();
251         session_->request_processors_ = request_processors_;
252       }
253       if (error_code_ != CASS_OK) {
254         session_->notify_connect_failed(error_code_, error_message_);
255       } else {
256         session_->notify_connected();
257         session_->cluster()->start_monitor_reporting(to_string(session_->client_id()),
258                                                      to_string(session_->session_id()),
259                                                      session_->config());
260       }
261       l.unlock(); // Unlock before destroying the object
262       dec_ref();
263     }
264   }
265 
266 private:
267   uv_mutex_t mutex_;
268   Session* session_;
269   size_t remaining_;
270   CassError error_code_;
271   String error_message_;
272   RequestProcessor::Vec request_processors_;
273 };
274 
275 }}} // namespace datastax::internal::core
276 
Session()277 Session::Session()
278     : request_processor_count_(0)
279     , is_closing_(false) {
280   uv_mutex_init(&mutex_);
281 }
282 
~Session()283 Session::~Session() {
284   join();
285   uv_mutex_destroy(&mutex_);
286 }
287 
prepare(const char * statement,size_t length)288 Future::Ptr Session::prepare(const char* statement, size_t length) {
289   PrepareRequest::Ptr prepare(new PrepareRequest(String(statement, length)));
290 
291   ResponseFuture::Ptr future(new ResponseFuture(cluster()->schema_snapshot()));
292   future->prepare_request = PrepareRequest::ConstPtr(prepare);
293 
294   execute(RequestHandler::Ptr(new RequestHandler(prepare, future, metrics())));
295 
296   return future;
297 }
298 
prepare(const Statement * statement)299 Future::Ptr Session::prepare(const Statement* statement) {
300   String query;
301 
302   if (statement->opcode() == CQL_OPCODE_QUERY) { // Simple statement
303     query = statement->query();
304   } else { // Bound statement
305     query = static_cast<const ExecuteRequest*>(statement)->prepared()->query();
306   }
307 
308   PrepareRequest::Ptr prepare(new PrepareRequest(query));
309 
310   // Inherit the settings of the existing statement. These will in turn be
311   // inherited by bound statements.
312   prepare->set_settings(statement->settings());
313 
314   ResponseFuture::Ptr future(new ResponseFuture(cluster()->schema_snapshot()));
315   future->prepare_request = PrepareRequest::ConstPtr(prepare);
316 
317   execute(RequestHandler::Ptr(new RequestHandler(prepare, future, metrics())));
318 
319   return future;
320 }
321 
execute(const Request::ConstPtr & request)322 Future::Ptr Session::execute(const Request::ConstPtr& request) {
323   ResponseFuture::Ptr future(new ResponseFuture());
324 
325   RequestHandler::Ptr request_handler(new RequestHandler(request, future, metrics()));
326 
327   if (request_handler->request()->opcode() == CQL_OPCODE_EXECUTE) {
328     const ExecuteRequest* execute = static_cast<const ExecuteRequest*>(request_handler->request());
329     request_handler->set_prepared_metadata(cluster()->prepared(execute->prepared()->id()));
330   }
331 
332   execute(request_handler);
333 
334   return future;
335 }
336 
execute(const RequestHandler::Ptr & request_handler)337 void Session::execute(const RequestHandler::Ptr& request_handler) {
338   if (state() != SESSION_STATE_CONNECTED) {
339     request_handler->set_error(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, "Session is not connected");
340     return;
341   }
342 
343   // This intentionally doesn't lock the request processors. The processors will
344   // be populated before the connect future returns and calling execute during
345   // the connection process is undefined behavior. Locking would cause unnecessary
346   // overhead for something that's constant once the session is connected.
347   const RequestProcessor::Ptr& request_processor =
348       *std::min_element(request_processors_.begin(), request_processors_.end(), least_busy_comp);
349   request_processor->process_request(request_handler);
350 }
351 
join()352 void Session::join() {
353   if (event_loop_group_) {
354     event_loop_group_->close_handles();
355     event_loop_group_->join();
356     event_loop_group_.reset();
357   }
358 }
359 
on_connect(const Host::Ptr & connected_host,ProtocolVersion protocol_version,const HostMap & hosts,const TokenMap::Ptr & token_map,const String & local_dc)360 void Session::on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version,
361                          const HostMap& hosts, const TokenMap::Ptr& token_map,
362                          const String& local_dc) {
363   int rc = 0;
364 
365   if (hosts.empty()) {
366     notify_connect_failed(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE,
367                           "No hosts provided or no hosts resolved");
368     return;
369   }
370 
371   join();
372   event_loop_group_.reset(new RoundRobinEventLoopGroup(config().thread_count_io()));
373   rc = event_loop_group_->init("Request Processor");
374   if (rc != 0) {
375     notify_connect_failed(CASS_ERROR_LIB_UNABLE_TO_INIT, "Unable to initialize event loop group");
376     return;
377   }
378 
379   rc = event_loop_group_->run();
380   if (rc != 0) {
381     notify_connect_failed(CASS_ERROR_LIB_UNABLE_TO_INIT, "Unable to run event loop group");
382     return;
383   }
384 
385   for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
386     const Host::Ptr& host = it->second;
387     config().host_listener()->on_host_added(host);
388     config().host_listener()->on_host_up(
389         host); // If host is down it will be marked down later in the connection process
390   }
391 
392   request_processors_.clear();
393   request_processor_count_ = 0;
394   is_closing_ = false;
395   SessionInitializer::Ptr initializer(new SessionInitializer(this));
396   initializer->initialize(connected_host, protocol_version, hosts, token_map, local_dc);
397 }
398 
on_close()399 void Session::on_close() {
400   // If there are request processors still connected those need to be closed
401   // first before sending the close notification.
402   ScopedMutex l(&mutex_);
403   is_closing_ = true;
404   if (request_processor_count_ > 0) {
405     for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
406                                                end = request_processors_.end();
407          it != end; ++it) {
408       (*it)->close();
409     }
410   } else {
411     notify_closed();
412   }
413 }
414 
on_host_up(const Host::Ptr & host)415 void Session::on_host_up(const Host::Ptr& host) {
416   // Ignore up events from the control connection; however external host
417   // listeners should still be notified. The connection pools will reconnect
418   // themselves when the host becomes available.
419   config().host_listener()->on_host_up(host);
420 }
421 
on_host_down(const Host::Ptr & host)422 void Session::on_host_down(const Host::Ptr& host) {
423   // Ignore down events from the control connection; however external host
424   // listeners should still be notified. The connection pools can determine if a
425   // host is down themselves. The control connection host can become partitioned
426   // from the rest of the cluster and in that scenario a down event from the
427   // down event from the control connection would be invalid.
428   ScopedMutex l(&mutex_);
429   if (!is_closing_) { // Refrain from host down events while session is closing
430     l.unlock();
431     config().host_listener()->on_host_down(host);
432   }
433 }
434 
on_host_added(const Host::Ptr & host)435 void Session::on_host_added(const Host::Ptr& host) {
436   { // Lock for request processor
437     ScopedMutex l(&mutex_);
438     for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
439                                                end = request_processors_.end();
440          it != end; ++it) {
441       (*it)->notify_host_added(host);
442     }
443   }
444   config().host_listener()->on_host_added(host);
445 }
446 
on_host_removed(const Host::Ptr & host)447 void Session::on_host_removed(const Host::Ptr& host) {
448   { // Lock for request processor
449     ScopedMutex l(&mutex_);
450     for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
451                                                end = request_processors_.end();
452          it != end; ++it) {
453       (*it)->notify_host_removed(host);
454     }
455   }
456   config().host_listener()->on_host_removed(host);
457 }
458 
on_token_map_updated(const TokenMap::Ptr & token_map)459 void Session::on_token_map_updated(const TokenMap::Ptr& token_map) {
460   ScopedMutex l(&mutex_);
461   for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
462                                              end = request_processors_.end();
463        it != end; ++it) {
464     (*it)->notify_token_map_updated(token_map);
465   }
466 }
467 
on_host_maybe_up(const Host::Ptr & host)468 void Session::on_host_maybe_up(const Host::Ptr& host) {
469   ScopedMutex l(&mutex_);
470   for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
471                                              end = request_processors_.end();
472        it != end; ++it) {
473     (*it)->notify_host_maybe_up(host->address());
474   }
475 }
476 
on_host_ready(const Host::Ptr & host)477 void Session::on_host_ready(const Host::Ptr& host) {
478   ScopedMutex l(&mutex_);
479   for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
480                                              end = request_processors_.end();
481        it != end; ++it) {
482     (*it)->notify_host_ready(host);
483   }
484 }
485 
on_pool_up(const Address & address)486 void Session::on_pool_up(const Address& address) { cluster()->notify_host_up(address); }
487 
on_pool_down(const Address & address)488 void Session::on_pool_down(const Address& address) { cluster()->notify_host_down(address); }
489 
on_pool_critical_error(const Address & address,Connector::ConnectionError code,const String & message)490 void Session::on_pool_critical_error(const Address& address, Connector::ConnectionError code,
491                                      const String& message) {
492   cluster()->notify_host_down(address);
493 }
494 
on_keyspace_changed(const String & keyspace,const KeyspaceChangedHandler::Ptr & handler)495 void Session::on_keyspace_changed(const String& keyspace,
496                                   const KeyspaceChangedHandler::Ptr& handler) {
497   ScopedMutex l(&mutex_);
498   for (RequestProcessor::Vec::const_iterator it = request_processors_.begin(),
499                                              end = request_processors_.end();
500        it != end; ++it) {
501     (*it)->set_keyspace(keyspace, handler);
502   }
503 }
504 
on_prepared_metadata_changed(const String & id,const PreparedMetadata::Entry::Ptr & entry)505 void Session::on_prepared_metadata_changed(const String& id,
506                                            const PreparedMetadata::Entry::Ptr& entry) {
507   cluster()->prepared(id, entry);
508 }
509 
on_close(RequestProcessor * processor)510 void Session::on_close(RequestProcessor* processor) {
511   // Requires a lock because the close callback is called from several
512   // different request processor threads.
513   ScopedMutex l(&mutex_);
514   if (request_processor_count_ > 0 && --request_processor_count_ == 0) {
515     notify_closed();
516   }
517 }
518