1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifndef DATASTAX_INTERNAL_CONFIG_HPP
18 #define DATASTAX_INTERNAL_CONFIG_HPP
19 
20 #include "auth.hpp"
21 #include "cassandra.h"
22 #include "cloud_secure_connection_config.hpp"
23 #include "cluster_metadata_resolver.hpp"
24 #include "constants.hpp"
25 #include "execution_profile.hpp"
26 #include "protocol.hpp"
27 #include "reconnection_policy.hpp"
28 #include "speculative_execution.hpp"
29 #include "ssl.hpp"
30 #include "string.hpp"
31 #include "timestamp_generator.hpp"
32 
33 #include <climits>
34 
35 namespace datastax { namespace internal { namespace core {
36 
37 void stderr_log_callback(const CassLogMessage* message, void* data);
38 
39 class Config {
40 public:
Config()41   Config()
42       : port_(CASS_DEFAULT_PORT)
43       , protocol_version_(ProtocolVersion::highest_supported())
44       , use_beta_protocol_version_(CASS_DEFAULT_USE_BETA_PROTOCOL_VERSION)
45       , thread_count_io_(CASS_DEFAULT_THREAD_COUNT_IO)
46       , queue_size_io_(CASS_DEFAULT_QUEUE_SIZE_IO)
47       , core_connections_per_host_(CASS_DEFAULT_NUM_CONNECTIONS_PER_HOST)
48       , reconnection_policy_(new ExponentialReconnectionPolicy())
49       , connect_timeout_ms_(CASS_DEFAULT_CONNECT_TIMEOUT_MS)
50       , resolve_timeout_ms_(CASS_DEFAULT_RESOLVE_TIMEOUT_MS)
51       , max_schema_wait_time_ms_(CASS_DEFAULT_MAX_SCHEMA_WAIT_TIME_MS)
52       , max_tracing_wait_time_ms_(CASS_DEFAULT_MAX_TRACING_DATA_WAIT_TIME_MS)
53       , retry_tracing_wait_time_ms_(CASS_DEFAULT_RETRY_TRACING_DATA_WAIT_TIME_MS)
54       , tracing_consistency_(CASS_DEFAULT_TRACING_CONSISTENCY)
55       , coalesce_delay_us_(CASS_DEFAULT_COALESCE_DELAY)
56       , new_request_ratio_(CASS_DEFAULT_NEW_REQUEST_RATIO)
57       , log_level_(CASS_DEFAULT_LOG_LEVEL)
58       , log_callback_(stderr_log_callback)
59       , log_data_(NULL)
60       , auth_provider_(new AuthProvider())
61       , tcp_nodelay_enable_(CASS_DEFAULT_TCP_NO_DELAY_ENABLED)
62       , tcp_keepalive_enable_(CASS_DEFAULT_TCP_KEEPALIVE_ENABLED)
63       , tcp_keepalive_delay_secs_(CASS_DEFAULT_TCP_KEEPALIVE_DELAY_SECS)
64       , connection_idle_timeout_secs_(CASS_DEFAULT_IDLE_TIMEOUT_SECS)
65       , connection_heartbeat_interval_secs_(CASS_DEFAULT_HEARTBEAT_INTERVAL_SECS)
66       , timestamp_gen_(new MonotonicTimestampGenerator())
67       , use_schema_(CASS_DEFAULT_USE_SCHEMA)
68       , use_hostname_resolution_(CASS_DEFAULT_HOSTNAME_RESOLUTION_ENABLED)
69       , use_randomized_contact_points_(CASS_DEFAULT_USE_RANDOMIZED_CONTACT_POINTS)
70       , max_reusable_write_objects_(CASS_DEFAULT_MAX_REUSABLE_WRITE_OBJECTS)
71       , prepare_on_all_hosts_(CASS_DEFAULT_PREPARE_ON_ALL_HOSTS)
72       , prepare_on_up_or_add_host_(CASS_DEFAULT_PREPARE_ON_UP_OR_ADD_HOST)
73       , no_compact_(CASS_DEFAULT_NO_COMPACT)
74       , is_client_id_set_(false)
75       , host_listener_(new DefaultHostListener())
76       , monitor_reporting_interval_secs_(CASS_DEFAULT_CLIENT_MONITOR_EVENTS_INTERVAL_SECS)
77       , cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory()) {
78     profiles_.set_empty_key(String());
79 
80     // Assign the defaults to the cluster profile
81     default_profile_.set_serial_consistency(CASS_DEFAULT_SERIAL_CONSISTENCY);
82     default_profile_.set_request_timeout(CASS_DEFAULT_REQUEST_TIMEOUT_MS);
83     default_profile_.set_load_balancing_policy(new DCAwarePolicy());
84     default_profile_.set_retry_policy(new DefaultRetryPolicy());
85     default_profile_.set_speculative_execution_policy(new NoSpeculativeExecutionPolicy());
86   }
87 
new_instance() const88   Config new_instance() const {
89     Config config = *this;
90     config.default_profile_.build_load_balancing_policy();
91     config.init_profiles(); // Initializes the profiles from default (if needed)
92     config.set_speculative_execution_policy(
93         default_profile_.speculative_execution_policy()->new_instance());
94 
95     return config;
96   }
97 
consistency()98   CassConsistency consistency() { return default_profile_.consistency(); }
set_consistency(CassConsistency consistency)99   void set_consistency(CassConsistency consistency) {
100     default_profile_.set_consistency(consistency);
101   }
102 
serial_consistency()103   CassConsistency serial_consistency() { return default_profile_.serial_consistency(); }
set_serial_consistency(CassConsistency serial_consistency)104   void set_serial_consistency(CassConsistency serial_consistency) {
105     default_profile_.set_serial_consistency(serial_consistency);
106   }
107 
thread_count_io() const108   unsigned thread_count_io() const { return thread_count_io_; }
109 
set_thread_count_io(unsigned num_threads)110   void set_thread_count_io(unsigned num_threads) { thread_count_io_ = num_threads; }
111 
queue_size_io() const112   unsigned queue_size_io() const { return queue_size_io_; }
113 
set_queue_size_io(unsigned queue_size)114   void set_queue_size_io(unsigned queue_size) { queue_size_io_ = queue_size; }
115 
core_connections_per_host() const116   unsigned core_connections_per_host() const { return core_connections_per_host_; }
117 
set_core_connections_per_host(unsigned num_connections)118   void set_core_connections_per_host(unsigned num_connections) {
119     core_connections_per_host_ = num_connections;
120   }
121 
reconnection_policy() const122   ReconnectionPolicy::Ptr reconnection_policy() const { return reconnection_policy_; }
123 
set_constant_reconnect(uint64_t wait_time_ms)124   void set_constant_reconnect(uint64_t wait_time_ms) {
125     reconnection_policy_.reset(new ConstantReconnectionPolicy(wait_time_ms));
126   }
127 
set_exponential_reconnect(uint64_t base_delay_ms,uint64_t max_delay_ms)128   void set_exponential_reconnect(uint64_t base_delay_ms, uint64_t max_delay_ms) {
129     reconnection_policy_.reset(new ExponentialReconnectionPolicy(base_delay_ms, max_delay_ms));
130   }
131 
connect_timeout_ms() const132   unsigned connect_timeout_ms() const { return connect_timeout_ms_; }
133 
set_connect_timeout(unsigned timeout_ms)134   void set_connect_timeout(unsigned timeout_ms) { connect_timeout_ms_ = timeout_ms; }
135 
max_schema_wait_time_ms() const136   unsigned max_schema_wait_time_ms() const { return max_schema_wait_time_ms_; }
137 
set_max_schema_wait_time_ms(unsigned time_ms)138   void set_max_schema_wait_time_ms(unsigned time_ms) { max_schema_wait_time_ms_ = time_ms; }
139 
max_tracing_wait_time_ms() const140   unsigned max_tracing_wait_time_ms() const { return max_tracing_wait_time_ms_; }
141 
set_max_tracing_wait_time_ms(unsigned time_ms)142   void set_max_tracing_wait_time_ms(unsigned time_ms) { max_tracing_wait_time_ms_ = time_ms; }
143 
retry_tracing_wait_time_ms() const144   unsigned retry_tracing_wait_time_ms() const { return retry_tracing_wait_time_ms_; }
145 
set_retry_tracing_wait_time_ms(unsigned time_ms)146   void set_retry_tracing_wait_time_ms(unsigned time_ms) { retry_tracing_wait_time_ms_ = time_ms; }
147 
tracing_consistency() const148   CassConsistency tracing_consistency() const { return tracing_consistency_; }
149 
set_tracing_consistency(CassConsistency consistency)150   void set_tracing_consistency(CassConsistency consistency) { tracing_consistency_ = consistency; }
151 
coalesce_delay_us() const152   uint64_t coalesce_delay_us() const { return coalesce_delay_us_; }
153 
set_coalesce_delay_us(uint64_t delay_us)154   void set_coalesce_delay_us(uint64_t delay_us) { coalesce_delay_us_ = delay_us; }
155 
new_request_ratio() const156   int new_request_ratio() const { return new_request_ratio_; }
157 
set_new_request_ratio(int ratio)158   void set_new_request_ratio(int ratio) { new_request_ratio_ = ratio; }
159 
request_timeout()160   unsigned request_timeout() { return default_profile_.request_timeout_ms(); }
set_request_timeout(unsigned timeout_ms)161   void set_request_timeout(unsigned timeout_ms) {
162     default_profile_.set_request_timeout(timeout_ms);
163   }
164 
resolve_timeout_ms() const165   unsigned resolve_timeout_ms() const { return resolve_timeout_ms_; }
166 
set_resolve_timeout(unsigned timeout_ms)167   void set_resolve_timeout(unsigned timeout_ms) { resolve_timeout_ms_ = timeout_ms; }
168 
contact_points() const169   const AddressVec& contact_points() const { return contact_points_; }
170 
contact_points()171   AddressVec& contact_points() { return contact_points_; }
172 
port() const173   int port() const { return port_; }
174 
set_port(int port)175   void set_port(int port) { port_ = port; }
176 
protocol_version() const177   ProtocolVersion protocol_version() const { return protocol_version_; }
178 
set_protocol_version(ProtocolVersion version)179   void set_protocol_version(ProtocolVersion version) { protocol_version_ = version; }
180 
use_beta_protocol_version() const181   bool use_beta_protocol_version() const { return use_beta_protocol_version_; }
182 
set_use_beta_protocol_version(bool enable)183   void set_use_beta_protocol_version(bool enable) { use_beta_protocol_version_ = enable; }
184 
log_level() const185   CassLogLevel log_level() const { return log_level_; }
186 
set_log_level(CassLogLevel log_level)187   void set_log_level(CassLogLevel log_level) { log_level_ = log_level; }
188 
log_data() const189   void* log_data() const { return log_data_; }
190 
log_callback() const191   CassLogCallback log_callback() const { return log_callback_; }
192 
set_log_callback(CassLogCallback callback,void * data)193   void set_log_callback(CassLogCallback callback, void* data) {
194     log_callback_ = callback;
195     log_data_ = data;
196   }
197 
auth_provider() const198   const AuthProvider::Ptr& auth_provider() const { return auth_provider_; }
199 
set_auth_provider(const AuthProvider::Ptr & auth_provider)200   void set_auth_provider(const AuthProvider::Ptr& auth_provider) {
201     auth_provider_ = (!auth_provider ? AuthProvider::Ptr(new AuthProvider()) : auth_provider);
202   }
203 
set_credentials(const String & username,const String & password)204   void set_credentials(const String& username, const String& password) {
205     auth_provider_.reset(new PlainTextAuthProvider(username, password));
206   }
207 
load_balancing_policy() const208   const LoadBalancingPolicy::Ptr& load_balancing_policy() const {
209     return default_profile().load_balancing_policy();
210   }
211 
load_balancing_policies() const212   LoadBalancingPolicy::Vec load_balancing_policies() const {
213     LoadBalancingPolicy::Vec policies;
214     for (ExecutionProfile::Map::const_iterator it = profiles_.begin(), end = profiles_.end();
215          it != end; ++it) {
216       if (it->second.load_balancing_policy()) {
217         policies.push_back(it->second.load_balancing_policy());
218       }
219     }
220     return policies;
221   }
222 
set_load_balancing_policy(LoadBalancingPolicy * lbp)223   void set_load_balancing_policy(LoadBalancingPolicy* lbp) {
224     default_profile_.set_load_balancing_policy(lbp);
225   }
226 
set_speculative_execution_policy(SpeculativeExecutionPolicy * sep)227   void set_speculative_execution_policy(SpeculativeExecutionPolicy* sep) {
228     default_profile_.set_speculative_execution_policy(sep);
229   }
230 
ssl_context() const231   const SslContext::Ptr& ssl_context() const { return ssl_context_; }
232 
set_ssl_context(SslContext * ssl_context)233   void set_ssl_context(SslContext* ssl_context) { ssl_context_.reset(ssl_context); }
set_ssl_context(const SslContext::Ptr & ssl_context)234   void set_ssl_context(const SslContext::Ptr& ssl_context) { ssl_context_ = ssl_context; }
235 
token_aware_routing() const236   bool token_aware_routing() const { return default_profile().token_aware_routing(); }
237 
set_token_aware_routing(bool is_token_aware)238   void set_token_aware_routing(bool is_token_aware) {
239     default_profile_.set_token_aware_routing(is_token_aware);
240   }
241 
set_token_aware_routing_shuffle_replicas(bool shuffle_replicas)242   void set_token_aware_routing_shuffle_replicas(bool shuffle_replicas) {
243     default_profile_.set_token_aware_routing_shuffle_replicas(shuffle_replicas);
244   }
245 
set_latency_aware_routing(bool is_latency_aware)246   void set_latency_aware_routing(bool is_latency_aware) {
247     default_profile_.set_latency_aware_routing(is_latency_aware);
248   }
249 
set_latency_aware_routing_settings(const LatencyAwarePolicy::Settings & settings)250   void set_latency_aware_routing_settings(const LatencyAwarePolicy::Settings& settings) {
251     default_profile_.set_latency_aware_routing_settings(settings);
252   }
253 
tcp_nodelay_enable() const254   bool tcp_nodelay_enable() const { return tcp_nodelay_enable_; }
255 
set_tcp_nodelay(bool enable)256   void set_tcp_nodelay(bool enable) { tcp_nodelay_enable_ = enable; }
257 
tcp_keepalive_enable() const258   bool tcp_keepalive_enable() const { return tcp_keepalive_enable_; }
tcp_keepalive_delay_secs() const259   unsigned tcp_keepalive_delay_secs() const { return tcp_keepalive_delay_secs_; }
260 
set_tcp_keepalive(bool enable,unsigned delay_secs)261   void set_tcp_keepalive(bool enable, unsigned delay_secs) {
262     tcp_keepalive_enable_ = enable;
263     tcp_keepalive_delay_secs_ = delay_secs;
264   }
265 
connection_idle_timeout_secs() const266   unsigned connection_idle_timeout_secs() const { return connection_idle_timeout_secs_; }
267 
set_connection_idle_timeout_secs(unsigned timeout_secs)268   void set_connection_idle_timeout_secs(unsigned timeout_secs) {
269     connection_idle_timeout_secs_ = timeout_secs;
270   }
271 
connection_heartbeat_interval_secs() const272   unsigned connection_heartbeat_interval_secs() const {
273     return connection_heartbeat_interval_secs_;
274   }
275 
set_connection_heartbeat_interval_secs(unsigned interval_secs)276   void set_connection_heartbeat_interval_secs(unsigned interval_secs) {
277     connection_heartbeat_interval_secs_ = interval_secs;
278   }
279 
timestamp_gen() const280   TimestampGenerator* timestamp_gen() const { return timestamp_gen_.get(); }
281 
set_timestamp_gen(TimestampGenerator * timestamp_gen)282   void set_timestamp_gen(TimestampGenerator* timestamp_gen) {
283     if (timestamp_gen == NULL) return;
284     timestamp_gen_.reset(timestamp_gen);
285   }
286 
set_retry_policy(RetryPolicy * retry_policy)287   void set_retry_policy(RetryPolicy* retry_policy) {
288     default_profile_.set_retry_policy(retry_policy);
289   }
290 
use_schema() const291   bool use_schema() const { return use_schema_; }
set_use_schema(bool enable)292   void set_use_schema(bool enable) { use_schema_ = enable; }
293 
use_hostname_resolution() const294   bool use_hostname_resolution() const { return use_hostname_resolution_; }
set_use_hostname_resolution(bool enable)295   void set_use_hostname_resolution(bool enable) { use_hostname_resolution_ = enable; }
296 
use_randomized_contact_points() const297   bool use_randomized_contact_points() const { return use_randomized_contact_points_; }
set_use_randomized_contact_points(bool enable)298   void set_use_randomized_contact_points(bool enable) { use_randomized_contact_points_ = enable; }
299 
max_reusable_write_objects() const300   unsigned max_reusable_write_objects() const { return max_reusable_write_objects_; }
set_max_reusable_write_objects(unsigned max_reusable_write_objects)301   void set_max_reusable_write_objects(unsigned max_reusable_write_objects) {
302     max_reusable_write_objects_ = max_reusable_write_objects;
303   }
304 
default_profile() const305   const ExecutionProfile& default_profile() const { return default_profile_; }
306 
default_profile()307   ExecutionProfile& default_profile() { return default_profile_; }
308 
profiles() const309   const ExecutionProfile::Map& profiles() const { return profiles_; }
310 
set_execution_profile(const String & name,const ExecutionProfile * profile)311   void set_execution_profile(const String& name, const ExecutionProfile* profile) {
312     ExecutionProfile copy = *profile;
313     copy.build_load_balancing_policy();
314     profiles_[name] = copy;
315   }
316 
prepare_on_all_hosts() const317   bool prepare_on_all_hosts() const { return prepare_on_all_hosts_; }
318 
set_prepare_on_all_hosts(bool enabled)319   void set_prepare_on_all_hosts(bool enabled) { prepare_on_all_hosts_ = enabled; }
320 
prepare_on_up_or_add_host() const321   bool prepare_on_up_or_add_host() const { return prepare_on_up_or_add_host_; }
322 
set_prepare_on_up_or_add_host(bool enabled)323   void set_prepare_on_up_or_add_host(bool enabled) { prepare_on_up_or_add_host_ = enabled; }
324 
local_address() const325   const Address& local_address() const { return local_address_; }
326 
set_local_address(const Address & address)327   void set_local_address(const Address& address) { local_address_ = address; }
328 
no_compact() const329   bool no_compact() const { return no_compact_; }
330 
set_no_compact(bool enabled)331   void set_no_compact(bool enabled) { no_compact_ = enabled; }
332 
application_name() const333   const String& application_name() const { return application_name_; }
334 
set_application_name(const String & application_name)335   void set_application_name(const String& application_name) {
336     application_name_ = application_name;
337   }
338 
application_version() const339   const String& application_version() const { return application_version_; }
340 
set_application_version(const String & application_version)341   void set_application_version(const String& application_version) {
342     application_version_ = application_version;
343   }
344 
client_id() const345   CassUuid client_id() const { return client_id_; }
is_client_id_set() const346   bool is_client_id_set() const { return is_client_id_set_; }
347 
set_client_id(CassUuid client_id)348   void set_client_id(CassUuid client_id) {
349     client_id_ = client_id;
350     is_client_id_set_ = true;
351   }
352 
host_listener() const353   const DefaultHostListener::Ptr& host_listener() const { return host_listener_; }
354 
set_host_listener(const DefaultHostListener::Ptr & listener)355   void set_host_listener(const DefaultHostListener::Ptr& listener) {
356     if (listener) {
357       host_listener_ = listener;
358     } else {
359       host_listener_.reset(new DefaultHostListener());
360     }
361   }
362 
monitor_reporting_interval_secs() const363   unsigned monitor_reporting_interval_secs() const { return monitor_reporting_interval_secs_; }
set_monitor_reporting_interval_secs(unsigned interval_secs)364   void set_monitor_reporting_interval_secs(unsigned interval_secs) {
365     monitor_reporting_interval_secs_ = interval_secs;
366   };
367 
cloud_secure_connection_config() const368   const CloudSecureConnectionConfig& cloud_secure_connection_config() const {
369     return cloud_secure_connection_config_;
370   }
set_cloud_secure_connection_bundle(const String & path)371   bool set_cloud_secure_connection_bundle(const String& path) {
372     return cloud_secure_connection_config_.load(path, this);
373   }
374 
cluster_metadata_resolver_factory() const375   const ClusterMetadataResolverFactory::Ptr& cluster_metadata_resolver_factory() const {
376     return cluster_metadata_resolver_factory_;
377   }
378 
set_cluster_metadata_resolver_factory(const ClusterMetadataResolverFactory::Ptr & factory)379   void set_cluster_metadata_resolver_factory(const ClusterMetadataResolverFactory::Ptr& factory) {
380     cluster_metadata_resolver_factory_ = factory;
381   }
382 
set_default_consistency(CassConsistency consistency)383   void set_default_consistency(CassConsistency consistency) {
384     if (default_profile_.consistency() == CASS_CONSISTENCY_UNKNOWN) {
385       default_profile_.set_consistency(consistency);
386     }
387 
388     for (ExecutionProfile::Map::iterator it = profiles_.begin(); it != profiles_.end(); ++it) {
389       if (it->second.consistency() == CASS_CONSISTENCY_UNKNOWN) {
390         it->second.set_consistency(consistency);
391       }
392     }
393   }
394 
395 private:
396   void init_profiles();
397 
398 private:
399   int port_;
400   ProtocolVersion protocol_version_;
401   bool use_beta_protocol_version_;
402   AddressVec contact_points_;
403   unsigned thread_count_io_;
404   unsigned queue_size_io_;
405   unsigned core_connections_per_host_;
406   SharedRefPtr<ReconnectionPolicy> reconnection_policy_;
407   unsigned connect_timeout_ms_;
408   unsigned resolve_timeout_ms_;
409   unsigned max_schema_wait_time_ms_;
410   unsigned max_tracing_wait_time_ms_;
411   unsigned retry_tracing_wait_time_ms_;
412   CassConsistency tracing_consistency_;
413   uint64_t coalesce_delay_us_;
414   int new_request_ratio_;
415   CassLogLevel log_level_;
416   CassLogCallback log_callback_;
417   void* log_data_;
418   AuthProvider::Ptr auth_provider_;
419   SslContext::Ptr ssl_context_;
420   bool tcp_nodelay_enable_;
421   bool tcp_keepalive_enable_;
422   unsigned tcp_keepalive_delay_secs_;
423   unsigned connection_idle_timeout_secs_;
424   unsigned connection_heartbeat_interval_secs_;
425   SharedRefPtr<TimestampGenerator> timestamp_gen_;
426   bool use_schema_;
427   bool use_hostname_resolution_;
428   bool use_randomized_contact_points_;
429   unsigned max_reusable_write_objects_;
430   ExecutionProfile default_profile_;
431   ExecutionProfile::Map profiles_;
432   bool prepare_on_all_hosts_;
433   bool prepare_on_up_or_add_host_;
434   Address local_address_;
435   bool no_compact_;
436   String application_name_;
437   String application_version_;
438   bool is_client_id_set_;
439   CassUuid client_id_;
440   DefaultHostListener::Ptr host_listener_;
441   unsigned monitor_reporting_interval_secs_;
442   CloudSecureConnectionConfig cloud_secure_connection_config_;
443   ClusterMetadataResolverFactory::Ptr cluster_metadata_resolver_factory_;
444 };
445 
446 }}} // namespace datastax::internal::core
447 
448 #endif
449