1 /* 2 Copyright (c) DataStax, Inc. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef DATASTAX_INTERNAL_REQUEST_PROCESSOR_INITIALIZER_HPP 18 #define DATASTAX_INTERNAL_REQUEST_PROCESSOR_INITIALIZER_HPP 19 20 #include "atomic.hpp" 21 #include "callback.hpp" 22 #include "connection_pool_manager_initializer.hpp" 23 #include "ref_counted.hpp" 24 #include "request_processor.hpp" 25 #include "timestamp_generator.hpp" 26 #include "vector.hpp" 27 28 #include <uv.h> 29 30 namespace datastax { namespace internal { namespace core { 31 32 class Config; 33 class EventLoop; 34 class RequestProcessorManager; 35 36 /** 37 * A request processor initializer. This contains all the logic responsible for 38 * connecting and initializing a request processor object. 39 */ 40 class RequestProcessorInitializer 41 : public RefCounted<RequestProcessorInitializer> 42 , public ConnectionPoolManagerListener { 43 public: 44 typedef SharedRefPtr<RequestProcessorInitializer> Ptr; 45 typedef Vector<Ptr> Vec; 46 typedef internal::Callback<void, RequestProcessorInitializer*> Callback; 47 48 enum RequestProcessorError { 49 REQUEST_PROCESSOR_OK, 50 REQUEST_PROCESSOR_ERROR_KEYSPACE, 51 REQUEST_PROCESSOR_ERROR_NO_HOSTS_AVAILABLE, 52 REQUEST_PROCESSOR_ERROR_UNABLE_TO_INIT 53 }; 54 55 /** 56 * Constructor. 57 * 58 * @param connected_host The currently connected control connection host. 59 * @param protocol_version The highest negotiated protocol for the cluster. 60 * @param hosts A mapping of available hosts in the cluster. 61 * @param token_map A token map. 62 * @param local_dc The local datacenter for initializing the load balancing policies. 63 * @param callback A callback that is called when the processor is initialized 64 * or if an error occurred. 65 */ 66 RequestProcessorInitializer(const Host::Ptr& connected_host, ProtocolVersion protocol_version, 67 const HostMap& hosts, const TokenMap::Ptr& token_map, 68 const String& local_dc, const Callback& callback); 69 ~RequestProcessorInitializer(); 70 71 /** 72 * Initialize the request processor. 73 * 74 * @param event_loop The event loop to run the request processor on. 75 */ 76 void initialize(EventLoop* event_loop); 77 78 /** 79 * Set the settings for use by the processor. 80 * 81 * @param settings A settings object. 82 * @return The initializer to chain calls. 83 */ 84 RequestProcessorInitializer* with_settings(const RequestProcessorSettings& setttings); 85 86 /** 87 * Set the listener to handle events for the processor. 88 * 89 * @param settings A processor listener. 90 * @return The initializer to chain calls. 91 */ 92 RequestProcessorInitializer* with_listener(RequestProcessorListener* listener); 93 94 /** 95 * Set the keyspace to connect with. 96 * 97 * @param keyspace The initial keyspace to connect with. 98 * @return The initializer to chain calls. 99 */ 100 RequestProcessorInitializer* with_keyspace(const String& keyspace); 101 102 /** 103 * Set the metrics object for recording metrics. 104 * 105 * @param metrics A shared metrics object. 106 * @return The initializer to chain calls. 107 */ 108 RequestProcessorInitializer* with_metrics(Metrics* metrics); 109 110 /** 111 * Set the RNG for use randomizing hosts in load balancing policies. 112 * 113 * @param random A random number generator object. 114 * @return The initializer to chain calls. 115 */ 116 RequestProcessorInitializer* with_random(Random* random); 117 118 /** 119 * Release the processor from the initializer. If not released in the callback 120 * the processor will automatically be closed. 121 * 122 * @return The processor object for this initializer. This returns a null object 123 * if the processor is not initialized or an error occurred. 124 */ 125 RequestProcessor::Ptr release_processor(); 126 127 public: error_code() const128 RequestProcessorError error_code() const { return error_code_; } error_message() const129 const String& error_message() const { return error_message_; } 130 is_ok() const131 bool is_ok() const { return error_code_ == REQUEST_PROCESSOR_OK; } is_keyspace_error() const132 bool is_keyspace_error() const { return error_code_ == REQUEST_PROCESSOR_ERROR_KEYSPACE; } 133 134 private: 135 friend class RunInitializeProcessor; 136 137 private: 138 // Connection pool manager listener methods 139 140 virtual void on_pool_up(const Address& address); 141 virtual void on_pool_down(const Address& address); 142 virtual void on_pool_critical_error(const Address& address, Connector::ConnectionError code, 143 const String& message); 144 virtual void on_close(ConnectionPoolManager* manager); 145 146 private: 147 void internal_initialize(); 148 149 private: 150 void on_initialize(ConnectionPoolManagerInitializer* initializer); 151 152 private: 153 uv_mutex_t mutex_; 154 155 ConnectionPoolManagerInitializer::Ptr connection_pool_manager_initializer_; 156 RequestProcessor::Ptr processor_; 157 158 EventLoop* event_loop_; 159 RequestProcessorSettings settings_; 160 RequestProcessorListener* listener_; 161 String keyspace_; 162 Metrics* metrics_; 163 Random* random_; 164 165 const Host::Ptr connected_host_; 166 const ProtocolVersion protocol_version_; 167 HostMap hosts_; 168 const TokenMap::Ptr token_map_; 169 String local_dc_; 170 171 RequestProcessorError error_code_; 172 String error_message_; 173 174 Callback callback_; 175 176 Atomic<size_t> remaining_; 177 }; 178 179 }}} // namespace datastax::internal::core 180 181 #endif 182