1 /* 2 Copyright (c) DataStax, Inc. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef DATASTAX_INTERNAL_CONFIG_HPP 18 #define DATASTAX_INTERNAL_CONFIG_HPP 19 20 #include "auth.hpp" 21 #include "cassandra.h" 22 #include "cloud_secure_connection_config.hpp" 23 #include "cluster_metadata_resolver.hpp" 24 #include "constants.hpp" 25 #include "execution_profile.hpp" 26 #include "protocol.hpp" 27 #include "reconnection_policy.hpp" 28 #include "speculative_execution.hpp" 29 #include "ssl.hpp" 30 #include "string.hpp" 31 #include "timestamp_generator.hpp" 32 33 #include <climits> 34 35 namespace datastax { namespace internal { namespace core { 36 37 void stderr_log_callback(const CassLogMessage* message, void* data); 38 39 class Config { 40 public: Config()41 Config() 42 : port_(CASS_DEFAULT_PORT) 43 , protocol_version_(ProtocolVersion::highest_supported()) 44 , use_beta_protocol_version_(CASS_DEFAULT_USE_BETA_PROTOCOL_VERSION) 45 , thread_count_io_(CASS_DEFAULT_THREAD_COUNT_IO) 46 , queue_size_io_(CASS_DEFAULT_QUEUE_SIZE_IO) 47 , core_connections_per_host_(CASS_DEFAULT_NUM_CONNECTIONS_PER_HOST) 48 , reconnection_policy_(new ExponentialReconnectionPolicy()) 49 , connect_timeout_ms_(CASS_DEFAULT_CONNECT_TIMEOUT_MS) 50 , resolve_timeout_ms_(CASS_DEFAULT_RESOLVE_TIMEOUT_MS) 51 , max_schema_wait_time_ms_(CASS_DEFAULT_MAX_SCHEMA_WAIT_TIME_MS) 52 , max_tracing_wait_time_ms_(CASS_DEFAULT_MAX_TRACING_DATA_WAIT_TIME_MS) 53 , retry_tracing_wait_time_ms_(CASS_DEFAULT_RETRY_TRACING_DATA_WAIT_TIME_MS) 54 , tracing_consistency_(CASS_DEFAULT_TRACING_CONSISTENCY) 55 , coalesce_delay_us_(CASS_DEFAULT_COALESCE_DELAY) 56 , new_request_ratio_(CASS_DEFAULT_NEW_REQUEST_RATIO) 57 , log_level_(CASS_DEFAULT_LOG_LEVEL) 58 , log_callback_(stderr_log_callback) 59 , log_data_(NULL) 60 , auth_provider_(new AuthProvider()) 61 , tcp_nodelay_enable_(CASS_DEFAULT_TCP_NO_DELAY_ENABLED) 62 , tcp_keepalive_enable_(CASS_DEFAULT_TCP_KEEPALIVE_ENABLED) 63 , tcp_keepalive_delay_secs_(CASS_DEFAULT_TCP_KEEPALIVE_DELAY_SECS) 64 , connection_idle_timeout_secs_(CASS_DEFAULT_IDLE_TIMEOUT_SECS) 65 , connection_heartbeat_interval_secs_(CASS_DEFAULT_HEARTBEAT_INTERVAL_SECS) 66 , timestamp_gen_(new MonotonicTimestampGenerator()) 67 , use_schema_(CASS_DEFAULT_USE_SCHEMA) 68 , use_hostname_resolution_(CASS_DEFAULT_HOSTNAME_RESOLUTION_ENABLED) 69 , use_randomized_contact_points_(CASS_DEFAULT_USE_RANDOMIZED_CONTACT_POINTS) 70 , max_reusable_write_objects_(CASS_DEFAULT_MAX_REUSABLE_WRITE_OBJECTS) 71 , prepare_on_all_hosts_(CASS_DEFAULT_PREPARE_ON_ALL_HOSTS) 72 , prepare_on_up_or_add_host_(CASS_DEFAULT_PREPARE_ON_UP_OR_ADD_HOST) 73 , no_compact_(CASS_DEFAULT_NO_COMPACT) 74 , is_client_id_set_(false) 75 , host_listener_(new DefaultHostListener()) 76 , monitor_reporting_interval_secs_(CASS_DEFAULT_CLIENT_MONITOR_EVENTS_INTERVAL_SECS) 77 , cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory()) { 78 profiles_.set_empty_key(String()); 79 80 // Assign the defaults to the cluster profile 81 default_profile_.set_serial_consistency(CASS_DEFAULT_SERIAL_CONSISTENCY); 82 default_profile_.set_request_timeout(CASS_DEFAULT_REQUEST_TIMEOUT_MS); 83 default_profile_.set_load_balancing_policy(new DCAwarePolicy()); 84 default_profile_.set_retry_policy(new DefaultRetryPolicy()); 85 default_profile_.set_speculative_execution_policy(new NoSpeculativeExecutionPolicy()); 86 } 87 new_instance() const88 Config new_instance() const { 89 Config config = *this; 90 config.default_profile_.build_load_balancing_policy(); 91 config.init_profiles(); // Initializes the profiles from default (if needed) 92 config.set_speculative_execution_policy( 93 default_profile_.speculative_execution_policy()->new_instance()); 94 95 return config; 96 } 97 consistency()98 CassConsistency consistency() { return default_profile_.consistency(); } set_consistency(CassConsistency consistency)99 void set_consistency(CassConsistency consistency) { 100 default_profile_.set_consistency(consistency); 101 } 102 serial_consistency()103 CassConsistency serial_consistency() { return default_profile_.serial_consistency(); } set_serial_consistency(CassConsistency serial_consistency)104 void set_serial_consistency(CassConsistency serial_consistency) { 105 default_profile_.set_serial_consistency(serial_consistency); 106 } 107 thread_count_io() const108 unsigned thread_count_io() const { return thread_count_io_; } 109 set_thread_count_io(unsigned num_threads)110 void set_thread_count_io(unsigned num_threads) { thread_count_io_ = num_threads; } 111 queue_size_io() const112 unsigned queue_size_io() const { return queue_size_io_; } 113 set_queue_size_io(unsigned queue_size)114 void set_queue_size_io(unsigned queue_size) { queue_size_io_ = queue_size; } 115 core_connections_per_host() const116 unsigned core_connections_per_host() const { return core_connections_per_host_; } 117 set_core_connections_per_host(unsigned num_connections)118 void set_core_connections_per_host(unsigned num_connections) { 119 core_connections_per_host_ = num_connections; 120 } 121 reconnection_policy() const122 ReconnectionPolicy::Ptr reconnection_policy() const { return reconnection_policy_; } 123 set_constant_reconnect(uint64_t wait_time_ms)124 void set_constant_reconnect(uint64_t wait_time_ms) { 125 reconnection_policy_.reset(new ConstantReconnectionPolicy(wait_time_ms)); 126 } 127 set_exponential_reconnect(uint64_t base_delay_ms,uint64_t max_delay_ms)128 void set_exponential_reconnect(uint64_t base_delay_ms, uint64_t max_delay_ms) { 129 reconnection_policy_.reset(new ExponentialReconnectionPolicy(base_delay_ms, max_delay_ms)); 130 } 131 connect_timeout_ms() const132 unsigned connect_timeout_ms() const { return connect_timeout_ms_; } 133 set_connect_timeout(unsigned timeout_ms)134 void set_connect_timeout(unsigned timeout_ms) { connect_timeout_ms_ = timeout_ms; } 135 max_schema_wait_time_ms() const136 unsigned max_schema_wait_time_ms() const { return max_schema_wait_time_ms_; } 137 set_max_schema_wait_time_ms(unsigned time_ms)138 void set_max_schema_wait_time_ms(unsigned time_ms) { max_schema_wait_time_ms_ = time_ms; } 139 max_tracing_wait_time_ms() const140 unsigned max_tracing_wait_time_ms() const { return max_tracing_wait_time_ms_; } 141 set_max_tracing_wait_time_ms(unsigned time_ms)142 void set_max_tracing_wait_time_ms(unsigned time_ms) { max_tracing_wait_time_ms_ = time_ms; } 143 retry_tracing_wait_time_ms() const144 unsigned retry_tracing_wait_time_ms() const { return retry_tracing_wait_time_ms_; } 145 set_retry_tracing_wait_time_ms(unsigned time_ms)146 void set_retry_tracing_wait_time_ms(unsigned time_ms) { retry_tracing_wait_time_ms_ = time_ms; } 147 tracing_consistency() const148 CassConsistency tracing_consistency() const { return tracing_consistency_; } 149 set_tracing_consistency(CassConsistency consistency)150 void set_tracing_consistency(CassConsistency consistency) { tracing_consistency_ = consistency; } 151 coalesce_delay_us() const152 uint64_t coalesce_delay_us() const { return coalesce_delay_us_; } 153 set_coalesce_delay_us(uint64_t delay_us)154 void set_coalesce_delay_us(uint64_t delay_us) { coalesce_delay_us_ = delay_us; } 155 new_request_ratio() const156 int new_request_ratio() const { return new_request_ratio_; } 157 set_new_request_ratio(int ratio)158 void set_new_request_ratio(int ratio) { new_request_ratio_ = ratio; } 159 request_timeout()160 unsigned request_timeout() { return default_profile_.request_timeout_ms(); } set_request_timeout(unsigned timeout_ms)161 void set_request_timeout(unsigned timeout_ms) { 162 default_profile_.set_request_timeout(timeout_ms); 163 } 164 resolve_timeout_ms() const165 unsigned resolve_timeout_ms() const { return resolve_timeout_ms_; } 166 set_resolve_timeout(unsigned timeout_ms)167 void set_resolve_timeout(unsigned timeout_ms) { resolve_timeout_ms_ = timeout_ms; } 168 contact_points() const169 const AddressVec& contact_points() const { return contact_points_; } 170 contact_points()171 AddressVec& contact_points() { return contact_points_; } 172 port() const173 int port() const { return port_; } 174 set_port(int port)175 void set_port(int port) { port_ = port; } 176 protocol_version() const177 ProtocolVersion protocol_version() const { return protocol_version_; } 178 set_protocol_version(ProtocolVersion version)179 void set_protocol_version(ProtocolVersion version) { protocol_version_ = version; } 180 use_beta_protocol_version() const181 bool use_beta_protocol_version() const { return use_beta_protocol_version_; } 182 set_use_beta_protocol_version(bool enable)183 void set_use_beta_protocol_version(bool enable) { use_beta_protocol_version_ = enable; } 184 log_level() const185 CassLogLevel log_level() const { return log_level_; } 186 set_log_level(CassLogLevel log_level)187 void set_log_level(CassLogLevel log_level) { log_level_ = log_level; } 188 log_data() const189 void* log_data() const { return log_data_; } 190 log_callback() const191 CassLogCallback log_callback() const { return log_callback_; } 192 set_log_callback(CassLogCallback callback,void * data)193 void set_log_callback(CassLogCallback callback, void* data) { 194 log_callback_ = callback; 195 log_data_ = data; 196 } 197 auth_provider() const198 const AuthProvider::Ptr& auth_provider() const { return auth_provider_; } 199 set_auth_provider(const AuthProvider::Ptr & auth_provider)200 void set_auth_provider(const AuthProvider::Ptr& auth_provider) { 201 auth_provider_ = (!auth_provider ? AuthProvider::Ptr(new AuthProvider()) : auth_provider); 202 } 203 set_credentials(const String & username,const String & password)204 void set_credentials(const String& username, const String& password) { 205 auth_provider_.reset(new PlainTextAuthProvider(username, password)); 206 } 207 load_balancing_policy() const208 const LoadBalancingPolicy::Ptr& load_balancing_policy() const { 209 return default_profile().load_balancing_policy(); 210 } 211 load_balancing_policies() const212 LoadBalancingPolicy::Vec load_balancing_policies() const { 213 LoadBalancingPolicy::Vec policies; 214 for (ExecutionProfile::Map::const_iterator it = profiles_.begin(), end = profiles_.end(); 215 it != end; ++it) { 216 if (it->second.load_balancing_policy()) { 217 policies.push_back(it->second.load_balancing_policy()); 218 } 219 } 220 return policies; 221 } 222 set_load_balancing_policy(LoadBalancingPolicy * lbp)223 void set_load_balancing_policy(LoadBalancingPolicy* lbp) { 224 default_profile_.set_load_balancing_policy(lbp); 225 } 226 set_speculative_execution_policy(SpeculativeExecutionPolicy * sep)227 void set_speculative_execution_policy(SpeculativeExecutionPolicy* sep) { 228 default_profile_.set_speculative_execution_policy(sep); 229 } 230 ssl_context() const231 const SslContext::Ptr& ssl_context() const { return ssl_context_; } 232 set_ssl_context(SslContext * ssl_context)233 void set_ssl_context(SslContext* ssl_context) { ssl_context_.reset(ssl_context); } set_ssl_context(const SslContext::Ptr & ssl_context)234 void set_ssl_context(const SslContext::Ptr& ssl_context) { ssl_context_ = ssl_context; } 235 token_aware_routing() const236 bool token_aware_routing() const { return default_profile().token_aware_routing(); } 237 set_token_aware_routing(bool is_token_aware)238 void set_token_aware_routing(bool is_token_aware) { 239 default_profile_.set_token_aware_routing(is_token_aware); 240 } 241 set_token_aware_routing_shuffle_replicas(bool shuffle_replicas)242 void set_token_aware_routing_shuffle_replicas(bool shuffle_replicas) { 243 default_profile_.set_token_aware_routing_shuffle_replicas(shuffle_replicas); 244 } 245 set_latency_aware_routing(bool is_latency_aware)246 void set_latency_aware_routing(bool is_latency_aware) { 247 default_profile_.set_latency_aware_routing(is_latency_aware); 248 } 249 set_latency_aware_routing_settings(const LatencyAwarePolicy::Settings & settings)250 void set_latency_aware_routing_settings(const LatencyAwarePolicy::Settings& settings) { 251 default_profile_.set_latency_aware_routing_settings(settings); 252 } 253 tcp_nodelay_enable() const254 bool tcp_nodelay_enable() const { return tcp_nodelay_enable_; } 255 set_tcp_nodelay(bool enable)256 void set_tcp_nodelay(bool enable) { tcp_nodelay_enable_ = enable; } 257 tcp_keepalive_enable() const258 bool tcp_keepalive_enable() const { return tcp_keepalive_enable_; } tcp_keepalive_delay_secs() const259 unsigned tcp_keepalive_delay_secs() const { return tcp_keepalive_delay_secs_; } 260 set_tcp_keepalive(bool enable,unsigned delay_secs)261 void set_tcp_keepalive(bool enable, unsigned delay_secs) { 262 tcp_keepalive_enable_ = enable; 263 tcp_keepalive_delay_secs_ = delay_secs; 264 } 265 connection_idle_timeout_secs() const266 unsigned connection_idle_timeout_secs() const { return connection_idle_timeout_secs_; } 267 set_connection_idle_timeout_secs(unsigned timeout_secs)268 void set_connection_idle_timeout_secs(unsigned timeout_secs) { 269 connection_idle_timeout_secs_ = timeout_secs; 270 } 271 connection_heartbeat_interval_secs() const272 unsigned connection_heartbeat_interval_secs() const { 273 return connection_heartbeat_interval_secs_; 274 } 275 set_connection_heartbeat_interval_secs(unsigned interval_secs)276 void set_connection_heartbeat_interval_secs(unsigned interval_secs) { 277 connection_heartbeat_interval_secs_ = interval_secs; 278 } 279 timestamp_gen() const280 TimestampGenerator* timestamp_gen() const { return timestamp_gen_.get(); } 281 set_timestamp_gen(TimestampGenerator * timestamp_gen)282 void set_timestamp_gen(TimestampGenerator* timestamp_gen) { 283 if (timestamp_gen == NULL) return; 284 timestamp_gen_.reset(timestamp_gen); 285 } 286 set_retry_policy(RetryPolicy * retry_policy)287 void set_retry_policy(RetryPolicy* retry_policy) { 288 default_profile_.set_retry_policy(retry_policy); 289 } 290 use_schema() const291 bool use_schema() const { return use_schema_; } set_use_schema(bool enable)292 void set_use_schema(bool enable) { use_schema_ = enable; } 293 use_hostname_resolution() const294 bool use_hostname_resolution() const { return use_hostname_resolution_; } set_use_hostname_resolution(bool enable)295 void set_use_hostname_resolution(bool enable) { use_hostname_resolution_ = enable; } 296 use_randomized_contact_points() const297 bool use_randomized_contact_points() const { return use_randomized_contact_points_; } set_use_randomized_contact_points(bool enable)298 void set_use_randomized_contact_points(bool enable) { use_randomized_contact_points_ = enable; } 299 max_reusable_write_objects() const300 unsigned max_reusable_write_objects() const { return max_reusable_write_objects_; } set_max_reusable_write_objects(unsigned max_reusable_write_objects)301 void set_max_reusable_write_objects(unsigned max_reusable_write_objects) { 302 max_reusable_write_objects_ = max_reusable_write_objects; 303 } 304 default_profile() const305 const ExecutionProfile& default_profile() const { return default_profile_; } 306 default_profile()307 ExecutionProfile& default_profile() { return default_profile_; } 308 profiles() const309 const ExecutionProfile::Map& profiles() const { return profiles_; } 310 set_execution_profile(const String & name,const ExecutionProfile * profile)311 void set_execution_profile(const String& name, const ExecutionProfile* profile) { 312 ExecutionProfile copy = *profile; 313 copy.build_load_balancing_policy(); 314 profiles_[name] = copy; 315 } 316 prepare_on_all_hosts() const317 bool prepare_on_all_hosts() const { return prepare_on_all_hosts_; } 318 set_prepare_on_all_hosts(bool enabled)319 void set_prepare_on_all_hosts(bool enabled) { prepare_on_all_hosts_ = enabled; } 320 prepare_on_up_or_add_host() const321 bool prepare_on_up_or_add_host() const { return prepare_on_up_or_add_host_; } 322 set_prepare_on_up_or_add_host(bool enabled)323 void set_prepare_on_up_or_add_host(bool enabled) { prepare_on_up_or_add_host_ = enabled; } 324 local_address() const325 const Address& local_address() const { return local_address_; } 326 set_local_address(const Address & address)327 void set_local_address(const Address& address) { local_address_ = address; } 328 no_compact() const329 bool no_compact() const { return no_compact_; } 330 set_no_compact(bool enabled)331 void set_no_compact(bool enabled) { no_compact_ = enabled; } 332 application_name() const333 const String& application_name() const { return application_name_; } 334 set_application_name(const String & application_name)335 void set_application_name(const String& application_name) { 336 application_name_ = application_name; 337 } 338 application_version() const339 const String& application_version() const { return application_version_; } 340 set_application_version(const String & application_version)341 void set_application_version(const String& application_version) { 342 application_version_ = application_version; 343 } 344 client_id() const345 CassUuid client_id() const { return client_id_; } is_client_id_set() const346 bool is_client_id_set() const { return is_client_id_set_; } 347 set_client_id(CassUuid client_id)348 void set_client_id(CassUuid client_id) { 349 client_id_ = client_id; 350 is_client_id_set_ = true; 351 } 352 host_listener() const353 const DefaultHostListener::Ptr& host_listener() const { return host_listener_; } 354 set_host_listener(const DefaultHostListener::Ptr & listener)355 void set_host_listener(const DefaultHostListener::Ptr& listener) { 356 if (listener) { 357 host_listener_ = listener; 358 } else { 359 host_listener_.reset(new DefaultHostListener()); 360 } 361 } 362 monitor_reporting_interval_secs() const363 unsigned monitor_reporting_interval_secs() const { return monitor_reporting_interval_secs_; } set_monitor_reporting_interval_secs(unsigned interval_secs)364 void set_monitor_reporting_interval_secs(unsigned interval_secs) { 365 monitor_reporting_interval_secs_ = interval_secs; 366 }; 367 cloud_secure_connection_config() const368 const CloudSecureConnectionConfig& cloud_secure_connection_config() const { 369 return cloud_secure_connection_config_; 370 } set_cloud_secure_connection_bundle(const String & path)371 bool set_cloud_secure_connection_bundle(const String& path) { 372 return cloud_secure_connection_config_.load(path, this); 373 } 374 cluster_metadata_resolver_factory() const375 const ClusterMetadataResolverFactory::Ptr& cluster_metadata_resolver_factory() const { 376 return cluster_metadata_resolver_factory_; 377 } 378 set_cluster_metadata_resolver_factory(const ClusterMetadataResolverFactory::Ptr & factory)379 void set_cluster_metadata_resolver_factory(const ClusterMetadataResolverFactory::Ptr& factory) { 380 cluster_metadata_resolver_factory_ = factory; 381 } 382 set_default_consistency(CassConsistency consistency)383 void set_default_consistency(CassConsistency consistency) { 384 if (default_profile_.consistency() == CASS_CONSISTENCY_UNKNOWN) { 385 default_profile_.set_consistency(consistency); 386 } 387 388 for (ExecutionProfile::Map::iterator it = profiles_.begin(); it != profiles_.end(); ++it) { 389 if (it->second.consistency() == CASS_CONSISTENCY_UNKNOWN) { 390 it->second.set_consistency(consistency); 391 } 392 } 393 } 394 395 private: 396 void init_profiles(); 397 398 private: 399 int port_; 400 ProtocolVersion protocol_version_; 401 bool use_beta_protocol_version_; 402 AddressVec contact_points_; 403 unsigned thread_count_io_; 404 unsigned queue_size_io_; 405 unsigned core_connections_per_host_; 406 SharedRefPtr<ReconnectionPolicy> reconnection_policy_; 407 unsigned connect_timeout_ms_; 408 unsigned resolve_timeout_ms_; 409 unsigned max_schema_wait_time_ms_; 410 unsigned max_tracing_wait_time_ms_; 411 unsigned retry_tracing_wait_time_ms_; 412 CassConsistency tracing_consistency_; 413 uint64_t coalesce_delay_us_; 414 int new_request_ratio_; 415 CassLogLevel log_level_; 416 CassLogCallback log_callback_; 417 void* log_data_; 418 AuthProvider::Ptr auth_provider_; 419 SslContext::Ptr ssl_context_; 420 bool tcp_nodelay_enable_; 421 bool tcp_keepalive_enable_; 422 unsigned tcp_keepalive_delay_secs_; 423 unsigned connection_idle_timeout_secs_; 424 unsigned connection_heartbeat_interval_secs_; 425 SharedRefPtr<TimestampGenerator> timestamp_gen_; 426 bool use_schema_; 427 bool use_hostname_resolution_; 428 bool use_randomized_contact_points_; 429 unsigned max_reusable_write_objects_; 430 ExecutionProfile default_profile_; 431 ExecutionProfile::Map profiles_; 432 bool prepare_on_all_hosts_; 433 bool prepare_on_up_or_add_host_; 434 Address local_address_; 435 bool no_compact_; 436 String application_name_; 437 String application_version_; 438 bool is_client_id_set_; 439 CassUuid client_id_; 440 DefaultHostListener::Ptr host_listener_; 441 unsigned monitor_reporting_interval_secs_; 442 CloudSecureConnectionConfig cloud_secure_connection_config_; 443 ClusterMetadataResolverFactory::Ptr cluster_metadata_resolver_factory_; 444 }; 445 446 }}} // namespace datastax::internal::core 447 448 #endif 449