1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifndef DATASTAX_INTERNAL_REQUEST_PROCESSOR_INITIALIZER_HPP
18 #define DATASTAX_INTERNAL_REQUEST_PROCESSOR_INITIALIZER_HPP
19 
20 #include "atomic.hpp"
21 #include "callback.hpp"
22 #include "connection_pool_manager_initializer.hpp"
23 #include "ref_counted.hpp"
24 #include "request_processor.hpp"
25 #include "timestamp_generator.hpp"
26 #include "vector.hpp"
27 
28 #include <uv.h>
29 
30 namespace datastax { namespace internal { namespace core {
31 
32 class Config;
33 class EventLoop;
34 class RequestProcessorManager;
35 
36 /**
37  * A request processor initializer. This contains all the logic responsible for
38  * connecting and initializing a request processor object.
39  */
40 class RequestProcessorInitializer
41     : public RefCounted<RequestProcessorInitializer>
42     , public ConnectionPoolManagerListener {
43 public:
44   typedef SharedRefPtr<RequestProcessorInitializer> Ptr;
45   typedef Vector<Ptr> Vec;
46   typedef internal::Callback<void, RequestProcessorInitializer*> Callback;
47 
48   enum RequestProcessorError {
49     REQUEST_PROCESSOR_OK,
50     REQUEST_PROCESSOR_ERROR_KEYSPACE,
51     REQUEST_PROCESSOR_ERROR_NO_HOSTS_AVAILABLE,
52     REQUEST_PROCESSOR_ERROR_UNABLE_TO_INIT
53   };
54 
55   /**
56    * Constructor.
57    *
58    * @param connected_host The currently connected control connection host.
59    * @param protocol_version The highest negotiated protocol for the cluster.
60    * @param hosts A mapping of available hosts in the cluster.
61    * @param token_map A token map.
62    * @param local_dc The local datacenter for initializing the load balancing policies.
63    * @param callback A callback that is called when the processor is initialized
64    * or if an error occurred.
65    */
66   RequestProcessorInitializer(const Host::Ptr& connected_host, ProtocolVersion protocol_version,
67                               const HostMap& hosts, const TokenMap::Ptr& token_map,
68                               const String& local_dc, const Callback& callback);
69   ~RequestProcessorInitializer();
70 
71   /**
72    * Initialize the request processor.
73    *
74    * @param event_loop The event loop to run the request processor on.
75    */
76   void initialize(EventLoop* event_loop);
77 
78   /**
79    * Set the settings for use by the processor.
80    *
81    * @param settings A settings object.
82    * @return The initializer to chain calls.
83    */
84   RequestProcessorInitializer* with_settings(const RequestProcessorSettings& setttings);
85 
86   /**
87    * Set the listener to handle events for the processor.
88    *
89    * @param settings A processor listener.
90    * @return The initializer to chain calls.
91    */
92   RequestProcessorInitializer* with_listener(RequestProcessorListener* listener);
93 
94   /**
95    * Set the keyspace to connect with.
96    *
97    * @param keyspace The initial keyspace to connect with.
98    * @return The initializer to chain calls.
99    */
100   RequestProcessorInitializer* with_keyspace(const String& keyspace);
101 
102   /**
103    * Set the metrics object for recording metrics.
104    *
105    * @param metrics A shared metrics object.
106    * @return The initializer to chain calls.
107    */
108   RequestProcessorInitializer* with_metrics(Metrics* metrics);
109 
110   /**
111    * Set the RNG for use randomizing hosts in load balancing policies.
112    *
113    * @param random A random number generator object.
114    * @return The initializer to chain calls.
115    */
116   RequestProcessorInitializer* with_random(Random* random);
117 
118   /**
119    * Release the processor from the initializer. If not released in the callback
120    * the processor will automatically be closed.
121    *
122    * @return The processor object for this initializer. This returns a null object
123    * if the processor is not initialized or an error occurred.
124    */
125   RequestProcessor::Ptr release_processor();
126 
127 public:
error_code() const128   RequestProcessorError error_code() const { return error_code_; }
error_message() const129   const String& error_message() const { return error_message_; }
130 
is_ok() const131   bool is_ok() const { return error_code_ == REQUEST_PROCESSOR_OK; }
is_keyspace_error() const132   bool is_keyspace_error() const { return error_code_ == REQUEST_PROCESSOR_ERROR_KEYSPACE; }
133 
134 private:
135   friend class RunInitializeProcessor;
136 
137 private:
138   // Connection pool manager listener methods
139 
140   virtual void on_pool_up(const Address& address);
141   virtual void on_pool_down(const Address& address);
142   virtual void on_pool_critical_error(const Address& address, Connector::ConnectionError code,
143                                       const String& message);
144   virtual void on_close(ConnectionPoolManager* manager);
145 
146 private:
147   void internal_initialize();
148 
149 private:
150   void on_initialize(ConnectionPoolManagerInitializer* initializer);
151 
152 private:
153   uv_mutex_t mutex_;
154 
155   ConnectionPoolManagerInitializer::Ptr connection_pool_manager_initializer_;
156   RequestProcessor::Ptr processor_;
157 
158   EventLoop* event_loop_;
159   RequestProcessorSettings settings_;
160   RequestProcessorListener* listener_;
161   String keyspace_;
162   Metrics* metrics_;
163   Random* random_;
164 
165   const Host::Ptr connected_host_;
166   const ProtocolVersion protocol_version_;
167   HostMap hosts_;
168   const TokenMap::Ptr token_map_;
169   String local_dc_;
170 
171   RequestProcessorError error_code_;
172   String error_message_;
173 
174   Callback callback_;
175 
176   Atomic<size_t> remaining_;
177 };
178 
179 }}} // namespace datastax::internal::core
180 
181 #endif
182