1 /* 2 Copyright (c) DataStax, Inc. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef DATASTAX_INTERNAL_REQUEST_HANDLER_HPP 18 #define DATASTAX_INTERNAL_REQUEST_HANDLER_HPP 19 20 #include "constants.hpp" 21 #include "error_response.hpp" 22 #include "future.hpp" 23 #include "host.hpp" 24 #include "load_balancing.hpp" 25 #include "metadata.hpp" 26 #include "prepare_request.hpp" 27 #include "request.hpp" 28 #include "request_callback.hpp" 29 #include "response.hpp" 30 #include "result_response.hpp" 31 #include "retry_policy.hpp" 32 #include "scoped_ptr.hpp" 33 #include "small_vector.hpp" 34 #include "speculative_execution.hpp" 35 #include "string.hpp" 36 #include "timestamp_generator.hpp" 37 38 #include <uv.h> 39 40 namespace datastax { namespace internal { namespace core { 41 42 class Config; 43 class Connection; 44 class ConnectionPoolManager; 45 class Pool; 46 class ExecutionProfile; 47 class Timer; 48 class TokenMap; 49 50 struct RequestTry { RequestTrydatastax::internal::core::RequestTry51 RequestTry() 52 : error(CASS_OK) 53 , latency(0) {} 54 RequestTrydatastax::internal::core::RequestTry55 RequestTry(const Address& address, uint64_t latency) 56 : address(address) 57 , error(CASS_OK) 58 , latency(latency / (1000 * 1000)) {} // To milliseconds 59 RequestTrydatastax::internal::core::RequestTry60 RequestTry(const Address& address, CassError error) 61 : address(address) 62 , error(error) 63 , latency(0) {} 64 65 Address address; 66 CassError error; 67 uint64_t latency; 68 }; 69 70 typedef SmallVector<RequestTry, 2> RequestTryVec; 71 72 class ResponseFuture : public Future { 73 public: 74 typedef SharedRefPtr<ResponseFuture> Ptr; 75 ResponseFuture()76 ResponseFuture() 77 : Future(FUTURE_TYPE_RESPONSE) {} 78 ResponseFuture(const Metadata::SchemaSnapshot & schema_metadata)79 ResponseFuture(const Metadata::SchemaSnapshot& schema_metadata) 80 : Future(FUTURE_TYPE_RESPONSE) 81 , schema_metadata(new Metadata::SchemaSnapshot(schema_metadata)) {} 82 set_response(Address address,const Response::Ptr & response)83 bool set_response(Address address, const Response::Ptr& response) { 84 ScopedMutex lock(&mutex_); 85 if (!is_set()) { 86 address_ = address; 87 response_ = response; 88 internal_set(lock); 89 return true; 90 } 91 return false; 92 } 93 response()94 const Response::Ptr& response() { 95 ScopedMutex lock(&mutex_); 96 internal_wait(lock); 97 return response_; 98 } 99 set_error_with_address(Address address,CassError code,const String & message)100 bool set_error_with_address(Address address, CassError code, const String& message) { 101 ScopedMutex lock(&mutex_); 102 if (!is_set()) { 103 address_ = address; 104 internal_set_error(code, message, lock); 105 return true; 106 } 107 return false; 108 } 109 set_error_with_response(Address address,const Response::Ptr & response,CassError code,const String & message)110 bool set_error_with_response(Address address, const Response::Ptr& response, CassError code, 111 const String& message) { 112 ScopedMutex lock(&mutex_); 113 if (!is_set()) { 114 address_ = address; 115 response_ = response; 116 internal_set_error(code, message, lock); 117 return true; 118 } 119 return false; 120 } 121 address()122 const Address& address() { 123 ScopedMutex lock(&mutex_); 124 internal_wait(lock); 125 return address_; 126 } 127 128 // Currently, used for testing only, but it could be exposed in the future. attempted_addresses()129 AddressVec attempted_addresses() { 130 ScopedMutex lock(&mutex_); 131 internal_wait(lock); 132 return attempted_addresses_; 133 } 134 135 PrepareRequest::ConstPtr prepare_request; 136 ScopedPtr<Metadata::SchemaSnapshot> schema_metadata; 137 138 private: 139 friend class RequestHandler; 140 add_attempted_address(const Address & address)141 void add_attempted_address(const Address& address) { 142 ScopedMutex lock(&mutex_); 143 attempted_addresses_.push_back(address); 144 } 145 146 private: 147 Address address_; 148 Response::Ptr response_; 149 AddressVec attempted_addresses_; 150 }; 151 152 class RequestExecution; 153 class RequestListener; 154 155 class RequestHandler : public RefCounted<RequestHandler> { 156 friend class Memory; 157 158 public: 159 typedef SharedRefPtr<RequestHandler> Ptr; 160 161 RequestHandler(const Request::ConstPtr& request, const ResponseFuture::Ptr& future, 162 Metrics* metrics = NULL); 163 ~RequestHandler(); 164 165 void set_prepared_metadata(const PreparedMetadata::Entry::Ptr& entry); 166 167 void init(const ExecutionProfile& profile, ConnectionPoolManager* manager, 168 const TokenMap* token_map, TimestampGenerator* timestamp_generator, 169 RequestListener* listener); 170 171 void execute(); 172 wrapper() const173 const RequestWrapper& wrapper() const { return wrapper_; } request() const174 const Request* request() const { return wrapper_.request().get(); } consistency() const175 CassConsistency consistency() const { return wrapper_.consistency(); } 176 177 public: 178 class Protected { 179 friend class RequestExecution; Protected()180 Protected() {} Protected(Protected const &)181 Protected(Protected const&) {} 182 }; 183 184 void retry(RequestExecution* request_execution, Protected); 185 186 Host::Ptr next_host(Protected); 187 int64_t next_execution(const Host::Ptr& current_host, Protected); 188 189 void start_request(uv_loop_t* loop, Protected); 190 191 void add_attempted_address(const Address& address, Protected); 192 193 void notify_result_metadata_changed(const String& prepared_id, const String& query, 194 const String& keyspace, const String& result_metadata_id, 195 const ResultResponse::ConstPtr& result_response, Protected); 196 197 void notify_keyspace_changed(const String& keyspace, const Host::Ptr& current_host, 198 const Response::Ptr& response); 199 200 bool wait_for_tracing_data(const Host::Ptr& current_host, const Response::Ptr& response); 201 bool wait_for_schema_agreement(const Host::Ptr& current_host, const Response::Ptr& response); 202 203 bool prepare_all(const Host::Ptr& current_host, const Response::Ptr& response); 204 205 void set_response(const Host::Ptr& host, const Response::Ptr& response); 206 void set_error(CassError code, const String& message); 207 void set_error(const Host::Ptr& host, CassError code, const String& message); 208 void set_error_with_error_response(const Host::Ptr& host, const Response::Ptr& error, 209 CassError code, const String& message); 210 211 void stop_timer(); 212 213 private: 214 void on_timeout(Timer* timer); 215 216 private: 217 void stop_request(); 218 void internal_retry(RequestExecution* request_execution); 219 220 private: 221 RequestWrapper wrapper_; 222 SharedRefPtr<ResponseFuture> future_; 223 224 bool is_done_; 225 int running_executions_; 226 227 ScopedPtr<QueryPlan> query_plan_; 228 ScopedPtr<SpeculativeExecutionPlan> execution_plan_; 229 Timer timer_; 230 231 const uint64_t start_time_ns_; 232 RequestListener* listener_; 233 ConnectionPoolManager* manager_; 234 235 Metrics* const metrics_; 236 237 RequestTryVec request_tries_; 238 }; 239 240 class KeyspaceChangedResponse { 241 public: KeyspaceChangedResponse(const RequestHandler::Ptr & request_handler,const Host::Ptr & current_host,const Response::Ptr & response)242 KeyspaceChangedResponse(const RequestHandler::Ptr& request_handler, const Host::Ptr& current_host, 243 const Response::Ptr& response) 244 : request_handler_(request_handler) 245 , current_host_(current_host) 246 , response_(response) {} 247 set_response()248 void set_response() { request_handler_->set_response(current_host_, response_); } 249 250 private: 251 RequestHandler::Ptr request_handler_; 252 Host::Ptr current_host_; 253 Response::Ptr response_; 254 }; 255 256 class PreparedMetadataListener { 257 public: ~PreparedMetadataListener()258 virtual ~PreparedMetadataListener() {} 259 260 virtual void on_prepared_metadata_changed(const String& id, 261 const PreparedMetadata::Entry::Ptr& entry) = 0; 262 }; 263 264 class RequestListener : public PreparedMetadataListener { 265 public: ~RequestListener()266 virtual ~RequestListener() {} 267 268 /** 269 * A callback called when the keyspace has changed. 270 * 271 * @param keyspace The new keyspace. 272 * @param response The response for the keyspace change. `set_response()` 273 * must be called when the callback is done processing the keyspace change. 274 */ 275 virtual void on_keyspace_changed(const String& keyspace, KeyspaceChangedResponse response) = 0; 276 277 virtual bool on_wait_for_tracing_data(const RequestHandler::Ptr& request_handler, 278 const Host::Ptr& current_host, 279 const Response::Ptr& response) = 0; 280 281 virtual bool on_wait_for_schema_agreement(const RequestHandler::Ptr& request_handler, 282 const Host::Ptr& current_host, 283 const Response::Ptr& response) = 0; 284 285 virtual bool on_prepare_all(const RequestHandler::Ptr& request_handler, 286 const Host::Ptr& current_host, const Response::Ptr& response) = 0; 287 288 virtual void on_done() = 0; 289 }; 290 291 class RequestExecution : public RequestCallback { 292 public: 293 typedef SharedRefPtr<RequestExecution> Ptr; 294 295 RequestExecution(RequestHandler* request_handler); 296 current_host() const297 const Host::Ptr& current_host() const { return current_host_; } next_host()298 void next_host() { current_host_ = request_handler_->next_host(RequestHandler::Protected()); } 299 300 void notify_result_metadata_changed(const Request* request, ResultResponse* result_response); 301 void notify_prepared_id_mismatch(const String& expected_id, const String& received_id); 302 303 virtual void on_retry_current_host(); 304 virtual void on_retry_next_host(); 305 306 private: 307 void on_execute_next(Timer* timer); 308 309 void retry_current_host(); 310 void retry_next_host(); 311 312 virtual void on_write(Connection* connection); 313 314 virtual void on_set(ResponseMessage* response); 315 virtual void on_error(CassError code, const String& message); 316 317 void on_result_response(Connection* connection, ResponseMessage* response); 318 void on_error_response(Connection* connection, ResponseMessage* response); 319 void on_error_unprepared(Connection* connection, ErrorResponse* error); 320 321 private: 322 void set_response(const Response::Ptr& response); 323 void set_error(CassError code, const String& message); 324 void set_error_with_error_response(const Response::Ptr& error, CassError code, 325 const String& message); 326 327 private: 328 RequestHandler::Ptr request_handler_; 329 Host::Ptr current_host_; 330 Connection* connection_; 331 Timer schedule_timer_; 332 int num_retries_; 333 const uint64_t start_time_ns_; 334 }; 335 336 }}} // namespace datastax::internal::core 337 338 #endif 339