1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifndef DATASTAX_INTERNAL_REQUEST_HANDLER_HPP
18 #define DATASTAX_INTERNAL_REQUEST_HANDLER_HPP
19 
20 #include "constants.hpp"
21 #include "error_response.hpp"
22 #include "future.hpp"
23 #include "host.hpp"
24 #include "load_balancing.hpp"
25 #include "metadata.hpp"
26 #include "prepare_request.hpp"
27 #include "request.hpp"
28 #include "request_callback.hpp"
29 #include "response.hpp"
30 #include "result_response.hpp"
31 #include "retry_policy.hpp"
32 #include "scoped_ptr.hpp"
33 #include "small_vector.hpp"
34 #include "speculative_execution.hpp"
35 #include "string.hpp"
36 #include "timestamp_generator.hpp"
37 
38 #include <uv.h>
39 
40 namespace datastax { namespace internal { namespace core {
41 
42 class Config;
43 class Connection;
44 class ConnectionPoolManager;
45 class Pool;
46 class ExecutionProfile;
47 class Timer;
48 class TokenMap;
49 
50 struct RequestTry {
RequestTrydatastax::internal::core::RequestTry51   RequestTry()
52       : error(CASS_OK)
53       , latency(0) {}
54 
RequestTrydatastax::internal::core::RequestTry55   RequestTry(const Address& address, uint64_t latency)
56       : address(address)
57       , error(CASS_OK)
58       , latency(latency / (1000 * 1000)) {} // To milliseconds
59 
RequestTrydatastax::internal::core::RequestTry60   RequestTry(const Address& address, CassError error)
61       : address(address)
62       , error(error)
63       , latency(0) {}
64 
65   Address address;
66   CassError error;
67   uint64_t latency;
68 };
69 
70 typedef SmallVector<RequestTry, 2> RequestTryVec;
71 
72 class ResponseFuture : public Future {
73 public:
74   typedef SharedRefPtr<ResponseFuture> Ptr;
75 
ResponseFuture()76   ResponseFuture()
77       : Future(FUTURE_TYPE_RESPONSE) {}
78 
ResponseFuture(const Metadata::SchemaSnapshot & schema_metadata)79   ResponseFuture(const Metadata::SchemaSnapshot& schema_metadata)
80       : Future(FUTURE_TYPE_RESPONSE)
81       , schema_metadata(new Metadata::SchemaSnapshot(schema_metadata)) {}
82 
set_response(Address address,const Response::Ptr & response)83   bool set_response(Address address, const Response::Ptr& response) {
84     ScopedMutex lock(&mutex_);
85     if (!is_set()) {
86       address_ = address;
87       response_ = response;
88       internal_set(lock);
89       return true;
90     }
91     return false;
92   }
93 
response()94   const Response::Ptr& response() {
95     ScopedMutex lock(&mutex_);
96     internal_wait(lock);
97     return response_;
98   }
99 
set_error_with_address(Address address,CassError code,const String & message)100   bool set_error_with_address(Address address, CassError code, const String& message) {
101     ScopedMutex lock(&mutex_);
102     if (!is_set()) {
103       address_ = address;
104       internal_set_error(code, message, lock);
105       return true;
106     }
107     return false;
108   }
109 
set_error_with_response(Address address,const Response::Ptr & response,CassError code,const String & message)110   bool set_error_with_response(Address address, const Response::Ptr& response, CassError code,
111                                const String& message) {
112     ScopedMutex lock(&mutex_);
113     if (!is_set()) {
114       address_ = address;
115       response_ = response;
116       internal_set_error(code, message, lock);
117       return true;
118     }
119     return false;
120   }
121 
address()122   const Address& address() {
123     ScopedMutex lock(&mutex_);
124     internal_wait(lock);
125     return address_;
126   }
127 
128   // Currently, used for testing only, but it could be exposed in the future.
attempted_addresses()129   AddressVec attempted_addresses() {
130     ScopedMutex lock(&mutex_);
131     internal_wait(lock);
132     return attempted_addresses_;
133   }
134 
135   PrepareRequest::ConstPtr prepare_request;
136   ScopedPtr<Metadata::SchemaSnapshot> schema_metadata;
137 
138 private:
139   friend class RequestHandler;
140 
add_attempted_address(const Address & address)141   void add_attempted_address(const Address& address) {
142     ScopedMutex lock(&mutex_);
143     attempted_addresses_.push_back(address);
144   }
145 
146 private:
147   Address address_;
148   Response::Ptr response_;
149   AddressVec attempted_addresses_;
150 };
151 
152 class RequestExecution;
153 class RequestListener;
154 
155 class RequestHandler : public RefCounted<RequestHandler> {
156   friend class Memory;
157 
158 public:
159   typedef SharedRefPtr<RequestHandler> Ptr;
160 
161   RequestHandler(const Request::ConstPtr& request, const ResponseFuture::Ptr& future,
162                  Metrics* metrics = NULL);
163   ~RequestHandler();
164 
165   void set_prepared_metadata(const PreparedMetadata::Entry::Ptr& entry);
166 
167   void init(const ExecutionProfile& profile, ConnectionPoolManager* manager,
168             const TokenMap* token_map, TimestampGenerator* timestamp_generator,
169             RequestListener* listener);
170 
171   void execute();
172 
wrapper() const173   const RequestWrapper& wrapper() const { return wrapper_; }
request() const174   const Request* request() const { return wrapper_.request().get(); }
consistency() const175   CassConsistency consistency() const { return wrapper_.consistency(); }
176 
177 public:
178   class Protected {
179     friend class RequestExecution;
Protected()180     Protected() {}
Protected(Protected const &)181     Protected(Protected const&) {}
182   };
183 
184   void retry(RequestExecution* request_execution, Protected);
185 
186   Host::Ptr next_host(Protected);
187   int64_t next_execution(const Host::Ptr& current_host, Protected);
188 
189   void start_request(uv_loop_t* loop, Protected);
190 
191   void add_attempted_address(const Address& address, Protected);
192 
193   void notify_result_metadata_changed(const String& prepared_id, const String& query,
194                                       const String& keyspace, const String& result_metadata_id,
195                                       const ResultResponse::ConstPtr& result_response, Protected);
196 
197   void notify_keyspace_changed(const String& keyspace, const Host::Ptr& current_host,
198                                const Response::Ptr& response);
199 
200   bool wait_for_tracing_data(const Host::Ptr& current_host, const Response::Ptr& response);
201   bool wait_for_schema_agreement(const Host::Ptr& current_host, const Response::Ptr& response);
202 
203   bool prepare_all(const Host::Ptr& current_host, const Response::Ptr& response);
204 
205   void set_response(const Host::Ptr& host, const Response::Ptr& response);
206   void set_error(CassError code, const String& message);
207   void set_error(const Host::Ptr& host, CassError code, const String& message);
208   void set_error_with_error_response(const Host::Ptr& host, const Response::Ptr& error,
209                                      CassError code, const String& message);
210 
211   void stop_timer();
212 
213 private:
214   void on_timeout(Timer* timer);
215 
216 private:
217   void stop_request();
218   void internal_retry(RequestExecution* request_execution);
219 
220 private:
221   RequestWrapper wrapper_;
222   SharedRefPtr<ResponseFuture> future_;
223 
224   bool is_done_;
225   int running_executions_;
226 
227   ScopedPtr<QueryPlan> query_plan_;
228   ScopedPtr<SpeculativeExecutionPlan> execution_plan_;
229   Timer timer_;
230 
231   const uint64_t start_time_ns_;
232   RequestListener* listener_;
233   ConnectionPoolManager* manager_;
234 
235   Metrics* const metrics_;
236 
237   RequestTryVec request_tries_;
238 };
239 
240 class KeyspaceChangedResponse {
241 public:
KeyspaceChangedResponse(const RequestHandler::Ptr & request_handler,const Host::Ptr & current_host,const Response::Ptr & response)242   KeyspaceChangedResponse(const RequestHandler::Ptr& request_handler, const Host::Ptr& current_host,
243                           const Response::Ptr& response)
244       : request_handler_(request_handler)
245       , current_host_(current_host)
246       , response_(response) {}
247 
set_response()248   void set_response() { request_handler_->set_response(current_host_, response_); }
249 
250 private:
251   RequestHandler::Ptr request_handler_;
252   Host::Ptr current_host_;
253   Response::Ptr response_;
254 };
255 
256 class PreparedMetadataListener {
257 public:
~PreparedMetadataListener()258   virtual ~PreparedMetadataListener() {}
259 
260   virtual void on_prepared_metadata_changed(const String& id,
261                                             const PreparedMetadata::Entry::Ptr& entry) = 0;
262 };
263 
264 class RequestListener : public PreparedMetadataListener {
265 public:
~RequestListener()266   virtual ~RequestListener() {}
267 
268   /**
269    * A callback called when the keyspace has changed.
270    *
271    * @param keyspace The new keyspace.
272    * @param response The response for the keyspace change. `set_response()`
273    * must be called when the callback is done processing the keyspace change.
274    */
275   virtual void on_keyspace_changed(const String& keyspace, KeyspaceChangedResponse response) = 0;
276 
277   virtual bool on_wait_for_tracing_data(const RequestHandler::Ptr& request_handler,
278                                         const Host::Ptr& current_host,
279                                         const Response::Ptr& response) = 0;
280 
281   virtual bool on_wait_for_schema_agreement(const RequestHandler::Ptr& request_handler,
282                                             const Host::Ptr& current_host,
283                                             const Response::Ptr& response) = 0;
284 
285   virtual bool on_prepare_all(const RequestHandler::Ptr& request_handler,
286                               const Host::Ptr& current_host, const Response::Ptr& response) = 0;
287 
288   virtual void on_done() = 0;
289 };
290 
291 class RequestExecution : public RequestCallback {
292 public:
293   typedef SharedRefPtr<RequestExecution> Ptr;
294 
295   RequestExecution(RequestHandler* request_handler);
296 
current_host() const297   const Host::Ptr& current_host() const { return current_host_; }
next_host()298   void next_host() { current_host_ = request_handler_->next_host(RequestHandler::Protected()); }
299 
300   void notify_result_metadata_changed(const Request* request, ResultResponse* result_response);
301   void notify_prepared_id_mismatch(const String& expected_id, const String& received_id);
302 
303   virtual void on_retry_current_host();
304   virtual void on_retry_next_host();
305 
306 private:
307   void on_execute_next(Timer* timer);
308 
309   void retry_current_host();
310   void retry_next_host();
311 
312   virtual void on_write(Connection* connection);
313 
314   virtual void on_set(ResponseMessage* response);
315   virtual void on_error(CassError code, const String& message);
316 
317   void on_result_response(Connection* connection, ResponseMessage* response);
318   void on_error_response(Connection* connection, ResponseMessage* response);
319   void on_error_unprepared(Connection* connection, ErrorResponse* error);
320 
321 private:
322   void set_response(const Response::Ptr& response);
323   void set_error(CassError code, const String& message);
324   void set_error_with_error_response(const Response::Ptr& error, CassError code,
325                                      const String& message);
326 
327 private:
328   RequestHandler::Ptr request_handler_;
329   Host::Ptr current_host_;
330   Connection* connection_;
331   Timer schedule_timer_;
332   int num_retries_;
333   const uint64_t start_time_ns_;
334 };
335 
336 }}} // namespace datastax::internal::core
337 
338 #endif
339