1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "client_insights.hpp"
18
19 #include "config.hpp"
20 #include "driver_info.hpp"
21 #include "get_time.hpp"
22 #include "logger.hpp"
23 #include "session.hpp"
24 #include "ssl.hpp"
25 #include "string.hpp"
26 #include "utils.hpp"
27
28 #include <cassert>
29 #include <uv.h>
30
31 #ifdef _WIN32
32 #include <windows.h>
33 #include <winsock2.h>
34 #else
35 #include <sys/utsname.h>
36 #include <unistd.h>
37 #endif
38 #define HOSTNAME_MAX_LENGTH 256
39
40 #define METADATA_STARTUP_NAME "driver.startup"
41 #define METADATA_STATUS_NAME "driver.status"
42 #define METADATA_INSIGHTS_MAPPING_ID "v1"
43 #define METADATA_LANGUAGE "C/C++"
44
45 #define CONFIG_ANTIPATTERN_MSG_MULTI_DC_HOSTS \
46 "Contact points contain hosts from " \
47 "multiple data centers but only one " \
48 "is going to be used"
49 #define CONFIG_ANTIPATTERN_MSG_REMOTE_HOSTS "Using remote hosts for failover"
50 #define CONFIG_ANTIPATTERN_MSG_DOWNGRADING \
51 "Downgrading consistency retry " \
52 "policy in use"
53 #define CONFIG_ANTIPATTERN_MSG_CERT_VALIDATION \
54 "Client-to-node encryption is " \
55 "enabled but server certificate " \
56 "validation is disabled"
57 #define CONFIG_ANTIPATTERN_MSG_PLAINTEXT_NO_SSL \
58 "Plain text authentication is " \
59 "enabled without client-to-node " \
60 "encryption" \
61 ""
62
63 namespace datastax { namespace internal { namespace core {
64
create_monitor_reporting(const String & client_id,const String & session_id,const Config & config)65 MonitorReporting* create_monitor_reporting(const String& client_id, const String& session_id,
66 const Config& config) {
67 // Ensure the client monitor events should be enabled
68 unsigned interval_secs = config.monitor_reporting_interval_secs();
69 if (interval_secs > 0) {
70 return new enterprise::ClientInsights(client_id, session_id, interval_secs);
71 }
72 return new NopMonitorReporting();
73 }
74
75 }}} // namespace datastax::internal::core
76
77 using namespace datastax::internal::core;
78 using namespace datastax::internal;
79
80 namespace datastax { namespace internal { namespace enterprise {
81
82 #ifdef _WIN32
83 #define ERROR_BUFFER_MAX_LENGTH 1024
get_last_error()84 String get_last_error() {
85 DWORD rc = GetLastError();
86
87 char buf[ERROR_BUFFER_MAX_LENGTH];
88 size_t size = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, rc,
89 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
90 reinterpret_cast<LPSTR>(&buf[0]), ERROR_BUFFER_MAX_LENGTH, NULL);
91 String str(buf, size);
92 trim(str);
93 return str;
94 }
95 #endif
96
get_hostname()97 String get_hostname() {
98 #ifdef _WIN32
99 WSADATA data;
100 WORD version_required = MAKEWORD(2, 2);
101 if (WSAStartup(version_required, &data) != 0) {
102 LOG_WARN("Unable to determine hostname: Failed to initialize WinSock2");
103 return String();
104 }
105 #endif
106
107 char buf[HOSTNAME_MAX_LENGTH + 1];
108 size_t size = HOSTNAME_MAX_LENGTH + 1;
109 if (int rc = gethostname(buf, size) != 0) {
110 LOG_WARN("Unable to determine hostname: Error code %d", rc);
111 return "UNKNOWN";
112 }
113 return String(buf, size);
114 }
115
116 struct Os {
117 String name;
118 String version;
119 String arch;
120 };
get_os()121 Os get_os() {
122 Os os;
123 #ifdef _WIN32
124 os.name = "Microsoft Windows";
125
126 DWORD size = GetFileVersionInfoSize(TEXT("kernel32.dll"), NULL);
127 if (size) {
128 Vector<BYTE> version_info(size);
129 if (GetFileVersionInfo(TEXT("kernel32.dll"), 0, size, &version_info[0])) {
130 VS_FIXEDFILEINFO* file_info = NULL;
131 UINT file_info_length = 0;
132 if (VerQueryValue(&version_info[0], TEXT("\\"), reinterpret_cast<LPVOID*>(&file_info),
133 &file_info_length)) {
134 OStringStream oss;
135 oss << static_cast<int>(HIWORD(file_info->dwProductVersionMS)) << "."
136 << static_cast<int>(LOWORD(file_info->dwProductVersionMS)) << "."
137 << static_cast<int>(HIWORD(file_info->dwProductVersionLS));
138 os.version = oss.str();
139 } else {
140 LOG_DEBUG("Unable to retrieve Windows version: %s\n", get_last_error().c_str());
141 }
142 } else {
143 LOG_DEBUG("Unable to retrieve Windows version (GetFileVersionInfo): %s\n",
144 get_last_error().c_str());
145 }
146 } else {
147 LOG_DEBUG("Unable to retrieve Windows version (GetFileVersionInfoSize): %s\n",
148 get_last_error().c_str());
149 }
150
151 #ifdef _WIN64
152 os.arch = "x64";
153 #else
154 os.arch = "x86";
155 #endif
156 #else
157 struct utsname client_info;
158 uname(&client_info);
159 os.name = client_info.sysname;
160 os.version = client_info.release;
161 os.arch = client_info.machine;
162 #endif
163
164 return os;
165 }
166
167 struct Cpus {
168 int length;
169 String model;
170 };
get_cpus()171 Cpus get_cpus() {
172 Cpus cpus;
173
174 uv_cpu_info_t* cpus_infos;
175 int cpus_count;
176 int rc = uv_cpu_info(&cpus_infos, &cpus_count);
177 if (rc == 0) {
178 uv_cpu_info_t cpus_info = cpus_infos[0];
179 cpus.length = cpus_count;
180 cpus.model = cpus_info.model;
181 uv_free_cpu_info(cpus_infos, cpus_count);
182 } else {
183 LOG_DEBUG("Unable to determine CPUs information: %s\n", uv_strerror(rc));
184 }
185
186 return cpus;
187 }
188
189 class ClientInsightsRequestCallback : public SimpleRequestCallback {
190 public:
191 typedef SharedRefPtr<ClientInsightsRequestCallback> Ptr;
192
ClientInsightsRequestCallback(const String & json,const String & event_type)193 ClientInsightsRequestCallback(const String& json, const String& event_type)
194 : SimpleRequestCallback("CALL InsightsRpc.reportInsight('" + json + "')")
195 , event_type_(event_type) {}
196
on_internal_set(ResponseMessage * response)197 virtual void on_internal_set(ResponseMessage* response) {
198 if (response->opcode() != CQL_OPCODE_RESULT) {
199 LOG_DEBUG("Failed to send %s event message: Invalid response [%s]", event_type_.c_str(),
200 opcode_to_string(response->opcode()).c_str());
201 }
202 }
203
on_internal_error(CassError code,const String & message)204 virtual void on_internal_error(CassError code, const String& message) {
205 LOG_DEBUG("Failed to send %s event message: %s", event_type_.c_str(), message.c_str());
206 }
207
on_internal_timeout()208 virtual void on_internal_timeout() {
209 LOG_DEBUG("Failed to send %s event message: Timed out waiting for response",
210 event_type_.c_str());
211 }
212
213 private:
214 String event_type_;
215 };
216
metadata(ClientInsights::Writer & writer,const String & name)217 void metadata(ClientInsights::Writer& writer, const String& name) {
218 writer.Key("metadata");
219 writer.StartObject();
220
221 writer.Key("name");
222 writer.String(name.c_str());
223 writer.Key("insightMappingId");
224 writer.String(METADATA_INSIGHTS_MAPPING_ID);
225 writer.Key("insightType");
226 writer.String("EVENT"); // TODO: Make this an enumeration in the future
227 writer.Key("timestamp");
228 writer.Uint64(get_time_since_epoch_ms());
229 writer.Key("tags");
230 writer.StartObject();
231 writer.Key("language");
232 writer.String(METADATA_LANGUAGE);
233 writer.EndObject(); // tags
234
235 writer.EndObject();
236 }
237
238 class StartupMessageHandler : public RefCounted<StartupMessageHandler> {
239 public:
240 typedef SharedRefPtr<StartupMessageHandler> Ptr;
241
StartupMessageHandler(const Connection::Ptr & connection,const String & client_id,const String & session_id,const Config & config,const HostMap & hosts,const LoadBalancingPolicy::Vec & initialized_policies)242 StartupMessageHandler(const Connection::Ptr& connection, const String& client_id,
243 const String& session_id, const Config& config, const HostMap& hosts,
244 const LoadBalancingPolicy::Vec& initialized_policies)
245 : connection_(connection)
246 , client_id_(client_id)
247 , session_id_(session_id)
248 , config_(config)
249 , hosts_(hosts)
250 , initialized_policies_(initialized_policies) {}
251
~StartupMessageHandler()252 ~StartupMessageHandler() {
253 ClientInsights::StringBuffer buffer;
254 ClientInsights::Writer writer(buffer);
255
256 writer.StartObject();
257 metadata(writer, METADATA_STARTUP_NAME);
258 startup_message_data(writer);
259 writer.EndObject();
260
261 assert(writer.IsComplete() && "Startup JSON is incomplete");
262 connection_->write_and_flush(RequestCallback::Ptr(
263 new ClientInsightsRequestCallback(buffer.GetString(), METADATA_STARTUP_NAME)));
264 }
265
send_message()266 void send_message() { resolve_contact_points(); }
267
268 private:
269 // Startup message associated methods
startup_message_data(ClientInsights::Writer & writer)270 void startup_message_data(ClientInsights::Writer& writer) {
271 writer.Key("data");
272 writer.StartObject();
273
274 writer.Key("clientId");
275 writer.String(client_id_.c_str());
276 writer.Key("sessionId");
277 writer.String(session_id_.c_str());
278 bool is_application_name_generated = false;
279 writer.Key("applicationName");
280 if (!config_.application_name().empty()) {
281 writer.String(config_.application_name().c_str());
282 } else {
283 writer.String(driver_name());
284 is_application_name_generated = true;
285 }
286 writer.Key("applicationNameWasGenerated");
287 writer.Bool(is_application_name_generated);
288 if (!config_.application_version().empty()) {
289 writer.Key("applicationVersion");
290 writer.String(config_.application_version().c_str());
291 }
292 writer.Key("driverName");
293 writer.String(driver_name());
294 writer.Key("driverVersion");
295 writer.String(driver_version());
296 contact_points(writer);
297 data_centers(writer);
298 writer.Key("initialControlConnection");
299 writer.String(connection_->resolved_address().to_string(true).c_str());
300 writer.Key("protocolVersion");
301 writer.Int(connection_->protocol_version().value());
302 writer.Key("localAddress");
303 writer.String(get_local_address(connection_->handle()).c_str());
304 writer.Key("hostName");
305 writer.String(get_hostname().c_str());
306 execution_profiles(writer);
307 pool_size_by_host_distance(writer);
308 writer.Key("heartbeatInterval");
309 writer.Uint64(config_.connection_heartbeat_interval_secs() * 1000); // in milliseconds
310 writer.Key("compression");
311 writer.String("NONE"); // TODO: Update once compression is added
312 reconnection_policy(writer);
313 ssl(writer);
314 auth_provider(writer);
315 other_options(writer);
316 platform_info(writer);
317 config_anti_patterns(writer);
318 writer.Key("periodicStatusInterval");
319 writer.Uint(config_.monitor_reporting_interval_secs());
320
321 writer.EndObject();
322 }
323
contact_points(ClientInsights::Writer & writer)324 void contact_points(ClientInsights::Writer& writer) {
325 writer.Key("contactPoints");
326 writer.StartObject();
327
328 for (ResolvedHostMap::const_iterator map_it = contact_points_resolved_.begin(),
329 map_end = contact_points_resolved_.end();
330 map_it != map_end; ++map_it) {
331 writer.Key(map_it->first.c_str());
332 writer.StartArray();
333 for (AddressSet::const_iterator vec_it = map_it->second.begin(),
334 vec_end = map_it->second.end();
335 vec_it != vec_end; ++vec_it) {
336 writer.String(vec_it->to_string(true).c_str());
337 }
338 writer.EndArray();
339 }
340
341 writer.EndObject();
342 }
343
data_centers(ClientInsights::Writer & writer)344 void data_centers(ClientInsights::Writer& writer) {
345 writer.Key("dataCenters");
346 writer.StartArray();
347
348 Set<String> data_centers;
349 for (HostMap::const_iterator it = hosts_.begin(), end = hosts_.end(); it != end; ++it) {
350 const String& data_center = it->second->dc();
351 if (data_centers.insert(data_center).second) {
352 writer.String(data_center.c_str());
353 }
354 }
355
356 writer.EndArray();
357 }
358
execution_profiles(ClientInsights::Writer & writer)359 void execution_profiles(ClientInsights::Writer& writer) {
360 writer.Key("executionProfiles");
361 writer.StartObject();
362
363 const ExecutionProfile& default_profile = config_.default_profile();
364 const ExecutionProfile::Map& profiles = config_.profiles();
365 writer.Key("default");
366 execution_profile_as_json(writer, default_profile);
367 for (ExecutionProfile::Map::const_iterator it = profiles.begin(), end = profiles.end();
368 it != end; ++it) {
369 writer.Key(it->first.c_str());
370 execution_profile_as_json(writer, it->second, &default_profile);
371 }
372
373 writer.EndObject();
374 }
375
pool_size_by_host_distance(ClientInsights::Writer & writer)376 void pool_size_by_host_distance(ClientInsights::Writer& writer) {
377 writer.Key("poolSizeByHostDistance");
378 writer.StartObject();
379
380 writer.Key("local");
381 writer.Uint(config_.core_connections_per_host() * hosts_.size());
382 // NOTE: Remote does not pertain to DataStax C/C++ driver pool
383 writer.Key("remote");
384 writer.Uint(0);
385
386 writer.EndObject();
387 }
388
reconnection_policy(ClientInsights::Writer & writer)389 void reconnection_policy(ClientInsights::Writer& writer) {
390 writer.Key("reconnectionPolicy");
391 writer.StartObject();
392
393 ReconnectionPolicy::Ptr reconnection_policy = config_.reconnection_policy();
394 writer.Key("type");
395 if (reconnection_policy->type() == ReconnectionPolicy::CONSTANT) {
396 writer.String("ConstantReconnectionPolicy");
397 } else if (reconnection_policy->type() == ReconnectionPolicy::EXPONENTIAL) {
398 writer.String("ExponentialReconnectionPolicy");
399 } else {
400 assert(false && "Reconnection policy needs to be added");
401 writer.String("UnknownReconnectionPolicy");
402 }
403 writer.Key("options");
404 writer.StartObject();
405 if (reconnection_policy->type() == ReconnectionPolicy::CONSTANT) {
406 ConstantReconnectionPolicy::Ptr crp =
407 static_cast<ConstantReconnectionPolicy::Ptr>(reconnection_policy);
408 writer.Key("delayMs");
409 writer.Uint(crp->delay_ms());
410 } else if (reconnection_policy->type() == ReconnectionPolicy::EXPONENTIAL) {
411 ExponentialReconnectionPolicy::Ptr erp =
412 static_cast<ExponentialReconnectionPolicy::Ptr>(reconnection_policy);
413 writer.Key("baseDelayMs");
414 writer.Uint(erp->base_delay_ms());
415 writer.Key("maxDelayMs");
416 writer.Uint(erp->max_delay_ms());
417 }
418 writer.EndObject(); // options
419
420 writer.EndObject();
421 }
422
ssl(ClientInsights::Writer & writer)423 void ssl(ClientInsights::Writer& writer) {
424 writer.Key("ssl");
425 writer.StartObject();
426
427 const SslContext::Ptr& ssl_context = config_.ssl_context();
428 writer.Key("enabled");
429 if (ssl_context) {
430 writer.Bool(true);
431 } else {
432 writer.Bool(false);
433 }
434 writer.Key("certValidation");
435 if (ssl_context) {
436 writer.Bool(ssl_context->is_cert_validation_enabled());
437 } else {
438 writer.Bool(false);
439 }
440
441 writer.EndObject();
442 }
443
auth_provider(ClientInsights::Writer & writer)444 void auth_provider(ClientInsights::Writer& writer) {
445 const AuthProvider::Ptr& auth_provider = config_.auth_provider();
446 if (auth_provider) {
447 writer.Key("authProvider");
448 writer.StartObject();
449
450 writer.Key("type");
451 writer.String(auth_provider->name().c_str());
452
453 writer.EndObject();
454 }
455 }
456
other_options(ClientInsights::Writer & writer)457 void other_options(ClientInsights::Writer& writer) {
458 writer.Key("otherOptions");
459 writer.StartObject();
460
461 writer.Key("configuration");
462 writer.StartObject();
463 writer.Key("protocolVersion");
464 writer.Uint(config_.protocol_version().value());
465 writer.Key("useBetaProtocol");
466 writer.Bool(config_.use_beta_protocol_version());
467 writer.Key("threadCountIo");
468 writer.Uint(config_.thread_count_io());
469 writer.Key("queueSizeIo");
470 writer.Uint(config_.queue_size_io());
471 writer.Key("coreConnectionsPerHost");
472 writer.Uint(config_.core_connections_per_host());
473 writer.Key("connectTimeoutMs");
474 writer.Uint(config_.connect_timeout_ms());
475 writer.Key("resolveTimeoutMs");
476 writer.Uint(config_.resolve_timeout_ms());
477 writer.Key("maxSchemaWaitTimeMs");
478 writer.Uint(config_.max_schema_wait_time_ms());
479 writer.Key("maxTracingWaitTimeMs");
480 writer.Uint(config_.max_tracing_wait_time_ms());
481 writer.Key("tracingConsistency");
482 writer.String(cass_consistency_string(config_.tracing_consistency()));
483 writer.Key("coalesceDelayUs");
484 writer.Uint64(config_.coalesce_delay_us());
485 writer.Key("newRequestRatio");
486 writer.Uint(config_.new_request_ratio());
487 writer.Key("logLevel");
488 writer.String(cass_log_level_string(config_.log_level()));
489 writer.Key("tcpNodelayEnable");
490 writer.Bool(config_.tcp_nodelay_enable());
491 writer.Key("tcpKeepaliveEnable");
492 writer.Bool(config_.tcp_keepalive_enable());
493 writer.Key("tcpKeepaliveDelaySecs");
494 writer.Uint(config_.tcp_keepalive_delay_secs());
495 writer.Key("connectionIdleTimeoutSecs");
496 writer.Uint(config_.connection_idle_timeout_secs());
497 writer.Key("useSchema");
498 writer.Bool(config_.use_schema());
499 writer.Key("useHostnameResolution");
500 writer.Bool(config_.use_hostname_resolution());
501 writer.Key("useRandomizedContactPoints");
502 writer.Bool(config_.use_randomized_contact_points());
503 writer.Key("maxReusableWriteObjects");
504 writer.Uint(config_.max_reusable_write_objects());
505 writer.Key("prepareOnAllHosts");
506 writer.Bool(config_.prepare_on_all_hosts());
507 writer.Key("prepareOnUpOrAddHost");
508 writer.Bool(config_.prepare_on_up_or_add_host());
509 writer.Key("noCompact");
510 writer.Bool(config_.no_compact());
511 writer.Key("cloudSecureConnectBundleLoaded");
512 writer.Bool(config_.cloud_secure_connection_config().is_loaded());
513 writer.Key("clusterMetadataResolver");
514 writer.String(config_.cluster_metadata_resolver_factory()->name());
515 writer.EndObject(); // configuration
516
517 writer.EndObject(); // otherOptions
518 }
519
platform_info(ClientInsights::Writer & writer)520 void platform_info(ClientInsights::Writer& writer) {
521 writer.Key("platformInfo");
522 writer.StartObject();
523
524 writer.Key("os");
525 writer.StartObject();
526 Os os = get_os();
527 writer.Key("name");
528 writer.String(os.name.c_str());
529 writer.Key("version");
530 writer.String(os.version.c_str());
531 writer.Key("arch");
532 writer.String(os.arch.c_str());
533 writer.EndObject(); // os
534
535 writer.Key("cpus");
536 writer.StartObject();
537 Cpus cpus = get_cpus();
538 writer.Key("length");
539 writer.Int(cpus.length);
540 writer.Key("model");
541 writer.String(cpus.model.c_str());
542 writer.EndObject(); // cpus
543
544 writer.Key("runtime");
545 writer.StartObject();
546 #if defined(__clang__) || defined(__APPLE_CC__)
547 writer.Key("Clang/LLVM");
548 writer.String(STRINGIFY(__clang_major__) "." STRINGIFY(__clang_minor__) "." STRINGIFY(
549 __clang_patchlevel__));
550 #elif defined(__INTEL_COMPILER)
551 writer.Key("Intel ICC/ICPC");
552 writer.String(STRINGIFY(__INTEL_COMPILER));
553 #elif defined(__GNUC__) || defined(__GNUG__)
554 writer.Key("GNU GCC/G++");
555 writer.String(
556 STRINGIFY(__GNUC__) "." STRINGIFY(__GNUC_MINOR__) "." STRINGIFY(__GNUC_PATCHLEVEL__));
557 #elif defined(__HP_aCC)
558 writer.Key("Hewlett-Packard C/aC++");
559 writer.String(STRINGIFY(__HP_aCC));
560 #elif defined(__IBMCPP__)
561 writer.Key("IBM XL C/C++");
562 writer.String(STRINGIFY(__xlc__));
563 #elif defined(_MSC_VER)
564 writer.Key("Microsoft Visual Studio");
565 writer.String(STRINGIFY(_MSC_FULL_VER));
566 #elif defined(__PGI)
567 writer.Key("Portland Group PGCC/PGCPP");
568 writer.String(
569 STRINGIFY(__PGIC__) "." STRINGIFY(__PGIC_MINOR__) "." STRINGIFY(__PGIC_PATCHLEVEL__));
570 #elif defined(__SUNPRO_CC)
571 writer.Key("Oracle Solaris Studio");
572 writer.String(STRINGIFY(__SUNPRO_CC));
573 #else
574 writer.Key("Unknown");
575 writer.String("Unknown");
576 #endif
577 writer.Key("uv");
578 writer.String(STRINGIFY(UV_VERSION_MAJOR) "." STRINGIFY(UV_VERSION_MINOR) "." STRINGIFY(
579 UV_VERSION_PATCH));
580 writer.Key("openssl");
581 #ifdef OPENSSL_VERSION_TEXT
582 writer.String(OPENSSL_VERSION_TEXT);
583 #else
584 #ifdef LIBRESSL_VERSION_NUMBER
585 writer.String("LibreSSL " STRINGIFY(LIBRESSL_VERSION_NUMBER));
586 #else
587 writer.String("OpenSSL " STRINGIFY(OPENSSL_VERSION_NUMBER));
588 #endif
589 #endif
590 writer.EndObject(); // runtime
591
592 writer.EndObject(); // platformInfo
593 }
594
config_anti_patterns(ClientInsights::Writer & writer)595 void config_anti_patterns(ClientInsights::Writer& writer) {
596 StringPairVec config_anti_patterns = get_config_anti_patterns(
597 config_.default_profile(), config_.profiles(), initialized_policies_, hosts_,
598 config_.ssl_context(), config_.auth_provider());
599 if (!config_anti_patterns.empty()) {
600 writer.Key("configAntiPatterns");
601 writer.StartObject();
602
603 for (StringPairVec::const_iterator it = config_anti_patterns.begin(),
604 end = config_anti_patterns.end();
605 it != end; ++it) {
606 writer.Key(it->first.c_str());
607 writer.String(it->second.c_str());
608 }
609
610 writer.EndObject();
611 }
612 }
613
614 private:
615 // Startup message helper methods
resolve_contact_points()616 void resolve_contact_points() {
617 const AddressVec& contact_points = config_.contact_points();
618 const int port = config_.port();
619 MultiResolver::Ptr resolver;
620
621 for (AddressVec::const_iterator it = contact_points.begin(), end = contact_points.end();
622 it != end; ++it) {
623 const Address& contact_point = *it;
624 // Attempt to parse the contact point string. If it's an IP address
625 // then immediately add it to our resolved contact points, otherwise
626 // attempt to resolve the string as a hostname.
627 if (contact_point.is_resolved()) {
628 AddressSet addresses;
629 addresses.insert(contact_point);
630 contact_points_resolved_[contact_point.hostname_or_address()] = addresses;
631 } else {
632 if (!resolver) {
633 inc_ref();
634 resolver.reset(
635 new MultiResolver(bind_callback(&StartupMessageHandler::on_resolve, this)));
636 }
637 resolver->resolve(connection_->loop(), contact_point.hostname_or_address(), port,
638 config_.resolve_timeout_ms());
639 }
640 }
641
642 // NOTE: If no resolution is performed the startup message will be sent in
643 // the destructor
644 }
645
on_resolve(MultiResolver * resolver)646 void on_resolve(MultiResolver* resolver) {
647 const Resolver::Vec& resolvers = resolver->resolvers();
648 for (Resolver::Vec::const_iterator it = resolvers.begin(), end = resolvers.end(); it != end;
649 ++it) {
650 const Resolver::Ptr resolver(*it);
651 AddressSet addresses;
652 if (resolver->is_success()) {
653 if (!resolver->addresses().empty()) {
654 for (AddressVec::const_iterator it = resolver->addresses().begin(),
655 end = resolver->addresses().end();
656 it != end; ++it) {
657 addresses.insert(*it);
658 }
659 }
660 }
661 contact_points_resolved_[resolver->hostname()] = addresses; // Empty resolved addresses are OK
662 }
663
664 dec_ref(); // Send startup message in destructor
665 }
666
get_local_address(const uv_tcp_t * tcp) const667 String get_local_address(const uv_tcp_t* tcp) const {
668 Address::SocketStorage name;
669 int namelen = sizeof(name);
670 if (uv_tcp_getsockname(tcp, name.addr(), &namelen) == 0) {
671 Address address(name.addr());
672 if (address.is_valid_and_resolved()) {
673 return address.to_string();
674 }
675 }
676 return "unknown";
677 }
678
execution_profile_as_json(ClientInsights::Writer & writer,const ExecutionProfile & profile,const ExecutionProfile * default_profile=NULL)679 void execution_profile_as_json(ClientInsights::Writer& writer, const ExecutionProfile& profile,
680 const ExecutionProfile* default_profile = NULL) {
681 writer.StartObject();
682
683 if (!default_profile || (default_profile && profile.request_timeout_ms() !=
684 default_profile->request_timeout_ms())) {
685 writer.Key("requestTimeoutMs");
686 writer.Uint64(profile.request_timeout_ms());
687 }
688 if (!default_profile ||
689 (default_profile && profile.consistency() != default_profile->consistency())) {
690 writer.Key("consistency");
691 writer.String(cass_consistency_string(profile.consistency()));
692 }
693 if (!default_profile || (default_profile && profile.serial_consistency() !=
694 default_profile->serial_consistency())) {
695 writer.Key("serialConsistency");
696 writer.String(cass_consistency_string(profile.serial_consistency()));
697 }
698 if (!default_profile ||
699 (default_profile && profile.retry_policy() != default_profile->retry_policy())) {
700 const RetryPolicy::Ptr& retry_policy = profile.retry_policy();
701 if (retry_policy) {
702 writer.Key("retryPolicy");
703 if (retry_policy->type() == RetryPolicy::DEFAULT) {
704 writer.String("DefaultRetryPolicy");
705 } else if (retry_policy->type() == RetryPolicy::DOWNGRADING) {
706 writer.String("DowngradingConsistencyRetryPolicy");
707 } else if (retry_policy->type() == RetryPolicy::FALLTHROUGH) {
708 writer.String("FallthroughRetryPolicy");
709 } else if (retry_policy->type() == RetryPolicy::LOGGING) {
710 writer.String("LoggingRetryPolicy");
711 } else {
712 LOG_DEBUG("Invalid retry policy: %d", retry_policy->type());
713 writer.String("unknown");
714 }
715 }
716 }
717
718 if (profile.load_balancing_policy()) {
719 writer.Key("loadBalancing");
720 writer.StartObject();
721 writer.Key("type");
722 LoadBalancingPolicy* current_lbp = profile.load_balancing_policy().get();
723 do {
724 // NOTE: DCAware and RoundRobin are leaf policies (e.g. not chainable)
725 if (dynamic_cast<DCAwarePolicy*>(current_lbp)) {
726 writer.String("DCAwarePolicy");
727 break;
728 } else if (dynamic_cast<RoundRobinPolicy*>(current_lbp)) {
729 writer.String("RoundRobinPolicy");
730 break;
731 }
732
733 if (ChainedLoadBalancingPolicy* chained_lbp =
734 dynamic_cast<ChainedLoadBalancingPolicy*>(current_lbp)) {
735 current_lbp = chained_lbp->child_policy().get();
736 } else {
737 current_lbp = NULL;
738 }
739 } while (current_lbp);
740 writer.Key("options");
741 writer.StartObject();
742 if (DCAwarePolicy* dc_lbp = dynamic_cast<DCAwarePolicy*>(current_lbp)) {
743 writer.Key("localDc");
744 if (dc_lbp->local_dc().empty()) {
745 writer.Null();
746 } else {
747 writer.String(dc_lbp->local_dc().c_str());
748 }
749 writer.Key("usedHostsPerRemoteDc");
750 writer.Uint64(dc_lbp->used_hosts_per_remote_dc());
751 writer.Key("allowRemoteDcsForLocalCl");
752 writer.Bool(!dc_lbp->skip_remote_dcs_for_local_cl());
753 }
754 if (!profile.blacklist().empty()) {
755 writer.Key("blacklist");
756 writer.String(implode(profile.blacklist()).c_str());
757 }
758 if (!profile.blacklist_dc().empty()) {
759 writer.Key("blacklistDc");
760 writer.String(implode(profile.blacklist_dc()).c_str());
761 }
762 if (!profile.whitelist().empty()) {
763 writer.Key("whitelist");
764 writer.String(implode(profile.whitelist()).c_str());
765 }
766 if (!profile.whitelist_dc().empty()) {
767 writer.Key("whitelistDc");
768 writer.String(implode(profile.whitelist_dc()).c_str());
769 }
770 if (profile.token_aware_routing()) {
771 writer.Key("tokenAwareRouting");
772 writer.StartObject();
773 writer.Key("shuffleReplicas");
774 writer.Bool(profile.token_aware_routing_shuffle_replicas());
775 writer.EndObject(); // tokenAwareRouting
776 }
777 if (profile.latency_aware()) {
778 writer.Key("latencyAwareRouting");
779 writer.StartObject();
780 writer.Key("exclusionThreshold");
781 writer.Double(profile.latency_aware_routing_settings().exclusion_threshold);
782 writer.Key("scaleNs");
783 writer.Uint64(profile.latency_aware_routing_settings().scale_ns);
784 writer.Key("retryPeriodNs");
785 writer.Uint64(profile.latency_aware_routing_settings().retry_period_ns);
786 writer.Key("updateRateMs");
787 writer.Uint64(profile.latency_aware_routing_settings().update_rate_ms);
788 writer.Key("minMeasured");
789 writer.Uint64(profile.latency_aware_routing_settings().min_measured);
790 writer.EndObject(); // latencyAwareRouting
791 }
792 writer.EndObject(); // options
793
794 writer.EndObject(); // loadBalancingPolicy
795 }
796
797 typedef ConstantSpeculativeExecutionPolicy CSEP;
798 CSEP* default_csep =
799 default_profile ? dynamic_cast<CSEP*>(default_profile->speculative_execution_policy().get())
800 : NULL;
801 CSEP* csep = dynamic_cast<CSEP*>(profile.speculative_execution_policy().get());
802 if (csep) {
803 if (!default_csep ||
804 (default_csep->constant_delay_ms_ != csep->constant_delay_ms_ &&
805 default_csep->max_speculative_executions_ != csep->max_speculative_executions_)) {
806 writer.Key("speculativeExecutionPolicy");
807 writer.StartObject();
808 writer.Key("type");
809 writer.String("ConstantSpeculativeExecutionPolicy");
810
811 writer.Key("options");
812 writer.StartObject();
813 writer.Key("constantDelayMs");
814 writer.Uint64(csep->constant_delay_ms_);
815 writer.Key("maxSpeculativeExecutions");
816 writer.Int(csep->max_speculative_executions_);
817 writer.EndObject(); // options
818
819 writer.EndObject(); // speculativeExecutionPolicy
820 }
821 }
822
823 writer.EndObject(); // executionProfile
824 }
825
826 typedef std::pair<String, String> StringPair;
827 typedef Vector<StringPair> StringPairVec;
get_config_anti_patterns(const ExecutionProfile & default_profile,const ExecutionProfile::Map & profiles,const LoadBalancingPolicy::Vec & policies,const HostMap & hosts,const SslContext::Ptr & ssl_context,const AuthProvider::Ptr & auth_provider)828 StringPairVec get_config_anti_patterns(const ExecutionProfile& default_profile,
829 const ExecutionProfile::Map& profiles,
830 const LoadBalancingPolicy::Vec& policies,
831 const HostMap& hosts, const SslContext::Ptr& ssl_context,
832 const AuthProvider::Ptr& auth_provider) {
833 StringPairVec config_anti_patterns;
834
835 if (is_contact_points_multiple_dcs(policies, hosts)) {
836 config_anti_patterns.push_back(
837 StringPair("contactPointsMultipleDCs", CONFIG_ANTIPATTERN_MSG_MULTI_DC_HOSTS));
838 LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_MULTI_DC_HOSTS);
839 }
840
841 for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end();
842 it != end; ++it) {
843 DCAwarePolicy* dc_lbp = get_dc_aware_policy(*it);
844 if (dc_lbp && !dc_lbp->skip_remote_dcs_for_local_cl()) {
845 config_anti_patterns.push_back(
846 StringPair("useRemoteHosts", CONFIG_ANTIPATTERN_MSG_REMOTE_HOSTS));
847 LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_REMOTE_HOSTS);
848 break;
849 }
850 }
851
852 bool is_downgrading_consistency_enabled =
853 is_downgrading_retry_anti_pattern(default_profile.retry_policy());
854 if (!is_downgrading_consistency_enabled) {
855 for (ExecutionProfile::Map::const_iterator it = profiles.begin(), end = profiles.end();
856 it != end; ++it) {
857 if (is_downgrading_retry_anti_pattern(it->second.retry_policy())) {
858 is_downgrading_consistency_enabled = true;
859 break;
860 }
861 }
862 }
863 if (is_downgrading_consistency_enabled) {
864 config_anti_patterns.push_back(
865 StringPair("downgradingConsistency", CONFIG_ANTIPATTERN_MSG_DOWNGRADING));
866 LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_DOWNGRADING);
867 }
868
869 if (ssl_context && !ssl_context->is_cert_validation_enabled()) {
870 config_anti_patterns.push_back(
871 StringPair("sslWithoutCertValidation", CONFIG_ANTIPATTERN_MSG_CERT_VALIDATION));
872 LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_CERT_VALIDATION);
873 }
874
875 if (auth_provider && auth_provider->name().find("PlainTextAuthProvider") != String::npos &&
876 !ssl_context) {
877 config_anti_patterns.push_back(
878 StringPair("plainTextAuthWithoutSsl", CONFIG_ANTIPATTERN_MSG_PLAINTEXT_NO_SSL));
879 LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_PLAINTEXT_NO_SSL);
880 }
881
882 return config_anti_patterns;
883 }
884
get_dc_aware_policy(const LoadBalancingPolicy::Ptr & policy)885 DCAwarePolicy* get_dc_aware_policy(const LoadBalancingPolicy::Ptr& policy) {
886 LoadBalancingPolicy* current_lbp = policy.get();
887 do {
888 if (DCAwarePolicy* dc_lbp = dynamic_cast<DCAwarePolicy*>(current_lbp)) {
889 return dc_lbp;
890 }
891 if (ChainedLoadBalancingPolicy* chained_lbp =
892 dynamic_cast<ChainedLoadBalancingPolicy*>(current_lbp)) {
893 current_lbp = chained_lbp->child_policy().get();
894 } else {
895 break;
896 }
897 } while (current_lbp);
898
899 return NULL;
900 }
901
is_contact_points_multiple_dcs(const LoadBalancingPolicy::Vec & policies,const HostMap & hosts)902 bool is_contact_points_multiple_dcs(const LoadBalancingPolicy::Vec& policies,
903 const HostMap& hosts) {
904 // Get the DC aware load balancing policy if it is the only policy that
905 // exists. If found this policy will be used after the contact points have
906 // been resolved in order to determine if there are contacts points that exist
907 // in multiple DCs using a copy of the discovered hosts.
908 if (policies.size() == 1) {
909 DCAwarePolicy* policy = get_dc_aware_policy(policies[0]);
910 if (policy) {
911 // Loop through the resolved contacts, find the correct initialized host
912 // and if the contact point is a remote host identify as an anti-pattern
913 for (ResolvedHostMap::const_iterator resolved_it = contact_points_resolved_.begin(),
914 hosts_end = contact_points_resolved_.end();
915 resolved_it != hosts_end; ++resolved_it) {
916 const AddressSet& addresses = resolved_it->second;
917 for (AddressSet::const_iterator addresses_it = addresses.begin(),
918 addresses_end = addresses.end();
919 addresses_it != addresses_end; ++addresses_it) {
920 const Address& address = *addresses_it;
921 for (HostMap::const_iterator hosts_it = hosts.begin(), hosts_end = hosts.end();
922 hosts_it != hosts_end; ++hosts_it) {
923 const Host::Ptr& host = hosts_it->second;
924 if (host->address() == address &&
925 policy->distance(host) == CASS_HOST_DISTANCE_REMOTE) {
926 return true;
927 }
928 }
929 }
930 }
931 }
932 }
933
934 return false;
935 }
936
is_downgrading_retry_anti_pattern(const RetryPolicy::Ptr & policy)937 bool is_downgrading_retry_anti_pattern(const RetryPolicy::Ptr& policy) {
938 if (policy && policy->type() == RetryPolicy::DOWNGRADING) {
939 return true;
940 }
941 return false;
942 }
943
944 private:
945 const Connection::Ptr connection_;
946 const String client_id_;
947 const String session_id_;
948 const Config config_;
949 const HostMap hosts_;
950 const LoadBalancingPolicy::Vec initialized_policies_;
951
952 private:
953 typedef Map<String, AddressSet> ResolvedHostMap;
954 ResolvedHostMap contact_points_resolved_;
955 };
956
ClientInsights(const String & client_id,const String & session_id,unsigned interval_secs)957 ClientInsights::ClientInsights(const String& client_id, const String& session_id,
958 unsigned interval_secs)
959 : client_id_(client_id)
960 , session_id_(session_id)
961 , interval_ms_(interval_secs * 1000) {}
962
interval_ms(const VersionNumber & dse_server_version) const963 uint64_t ClientInsights::interval_ms(const VersionNumber& dse_server_version) const {
964 // DSE v5.1.13+ (backported)
965 // DSE v6.0.5+ (backported)
966 // DSE v6.7.0 was the first to supported the Insights RPC call
967 if ((dse_server_version >= VersionNumber(5, 1, 13) &&
968 dse_server_version < VersionNumber(6, 0, 0)) ||
969 dse_server_version >= VersionNumber(6, 0, 5)) {
970 return interval_ms_;
971 }
972 return 0;
973 }
974
send_startup_message(const Connection::Ptr & connection,const Config & config,const HostMap & hosts,const LoadBalancingPolicy::Vec & initialized_policies)975 void ClientInsights::send_startup_message(const Connection::Ptr& connection, const Config& config,
976 const HostMap& hosts,
977 const LoadBalancingPolicy::Vec& initialized_policies) {
978 StartupMessageHandler::Ptr handler = StartupMessageHandler::Ptr(new StartupMessageHandler(
979 connection, client_id_, session_id_, config, hosts, initialized_policies));
980 handler->send_message();
981 }
982
send_status_message(const Connection::Ptr & connection,const HostMap & hosts)983 void ClientInsights::send_status_message(const Connection::Ptr& connection, const HostMap& hosts) {
984 StringBuffer buffer;
985 Writer writer(buffer);
986
987 writer.StartObject();
988 metadata(writer, METADATA_STATUS_NAME);
989
990 writer.Key("data");
991 writer.StartObject();
992
993 writer.Key("clientId");
994 writer.String(client_id_.c_str());
995 writer.Key("sessionId");
996 writer.String(session_id_.c_str());
997 writer.Key("controlConnection");
998 writer.String(connection->resolved_address().to_string(true).c_str());
999
1000 writer.Key("conntectedNodes");
1001 writer.StartObject();
1002 for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
1003 String address_with_port = it->first.to_string(true);
1004 const Host::Ptr& host = it->second;
1005 writer.Key(address_with_port.c_str());
1006 writer.StartObject();
1007 writer.Key("connections");
1008 writer.Int(host->connection_count());
1009 writer.Key("inFlightQueries");
1010 writer.Int(host->inflight_request_count());
1011 writer.EndObject(); // address_with_port
1012 }
1013 writer.EndObject(); // connectedNodes
1014
1015 writer.EndObject(); // data
1016 writer.EndObject();
1017
1018 assert(writer.IsComplete() && "Status JSON is incomplete");
1019 connection->write_and_flush(RequestCallback::Ptr(
1020 new ClientInsightsRequestCallback(buffer.GetString(), METADATA_STATUS_NAME)));
1021 }
1022
1023 }}} // namespace datastax::internal::enterprise
1024