1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #include "client_insights.hpp"
18 
19 #include "config.hpp"
20 #include "driver_info.hpp"
21 #include "get_time.hpp"
22 #include "logger.hpp"
23 #include "session.hpp"
24 #include "ssl.hpp"
25 #include "string.hpp"
26 #include "utils.hpp"
27 
28 #include <cassert>
29 #include <uv.h>
30 
31 #ifdef _WIN32
32 #include <windows.h>
33 #include <winsock2.h>
34 #else
35 #include <sys/utsname.h>
36 #include <unistd.h>
37 #endif
38 #define HOSTNAME_MAX_LENGTH 256
39 
40 #define METADATA_STARTUP_NAME "driver.startup"
41 #define METADATA_STATUS_NAME "driver.status"
42 #define METADATA_INSIGHTS_MAPPING_ID "v1"
43 #define METADATA_LANGUAGE "C/C++"
44 
45 #define CONFIG_ANTIPATTERN_MSG_MULTI_DC_HOSTS \
46   "Contact points contain hosts from "        \
47   "multiple data centers but only one "       \
48   "is going to be used"
49 #define CONFIG_ANTIPATTERN_MSG_REMOTE_HOSTS "Using remote hosts for failover"
50 #define CONFIG_ANTIPATTERN_MSG_DOWNGRADING \
51   "Downgrading consistency retry "         \
52   "policy in use"
53 #define CONFIG_ANTIPATTERN_MSG_CERT_VALIDATION \
54   "Client-to-node encryption is "              \
55   "enabled but server certificate "            \
56   "validation is disabled"
57 #define CONFIG_ANTIPATTERN_MSG_PLAINTEXT_NO_SSL \
58   "Plain text authentication is "               \
59   "enabled without client-to-node "             \
60   "encryption"                                  \
61   ""
62 
63 namespace datastax { namespace internal { namespace core {
64 
create_monitor_reporting(const String & client_id,const String & session_id,const Config & config)65 MonitorReporting* create_monitor_reporting(const String& client_id, const String& session_id,
66                                            const Config& config) {
67   // Ensure the client monitor events should be enabled
68   unsigned interval_secs = config.monitor_reporting_interval_secs();
69   if (interval_secs > 0) {
70     return new enterprise::ClientInsights(client_id, session_id, interval_secs);
71   }
72   return new NopMonitorReporting();
73 }
74 
75 }}} // namespace datastax::internal::core
76 
77 using namespace datastax::internal::core;
78 using namespace datastax::internal;
79 
80 namespace datastax { namespace internal { namespace enterprise {
81 
82 #ifdef _WIN32
83 #define ERROR_BUFFER_MAX_LENGTH 1024
get_last_error()84 String get_last_error() {
85   DWORD rc = GetLastError();
86 
87   char buf[ERROR_BUFFER_MAX_LENGTH];
88   size_t size = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, rc,
89                                MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
90                                reinterpret_cast<LPSTR>(&buf[0]), ERROR_BUFFER_MAX_LENGTH, NULL);
91   String str(buf, size);
92   trim(str);
93   return str;
94 }
95 #endif
96 
get_hostname()97 String get_hostname() {
98 #ifdef _WIN32
99   WSADATA data;
100   WORD version_required = MAKEWORD(2, 2);
101   if (WSAStartup(version_required, &data) != 0) {
102     LOG_WARN("Unable to determine hostname: Failed to initialize WinSock2");
103     return String();
104   }
105 #endif
106 
107   char buf[HOSTNAME_MAX_LENGTH + 1];
108   size_t size = HOSTNAME_MAX_LENGTH + 1;
109   if (int rc = gethostname(buf, size) != 0) {
110     LOG_WARN("Unable to determine hostname: Error code %d", rc);
111     return "UNKNOWN";
112   }
113   return String(buf, size);
114 }
115 
116 struct Os {
117   String name;
118   String version;
119   String arch;
120 };
get_os()121 Os get_os() {
122   Os os;
123 #ifdef _WIN32
124   os.name = "Microsoft Windows";
125 
126   DWORD size = GetFileVersionInfoSize(TEXT("kernel32.dll"), NULL);
127   if (size) {
128     Vector<BYTE> version_info(size);
129     if (GetFileVersionInfo(TEXT("kernel32.dll"), 0, size, &version_info[0])) {
130       VS_FIXEDFILEINFO* file_info = NULL;
131       UINT file_info_length = 0;
132       if (VerQueryValue(&version_info[0], TEXT("\\"), reinterpret_cast<LPVOID*>(&file_info),
133                         &file_info_length)) {
134         OStringStream oss;
135         oss << static_cast<int>(HIWORD(file_info->dwProductVersionMS)) << "."
136             << static_cast<int>(LOWORD(file_info->dwProductVersionMS)) << "."
137             << static_cast<int>(HIWORD(file_info->dwProductVersionLS));
138         os.version = oss.str();
139       } else {
140         LOG_DEBUG("Unable to retrieve Windows version: %s\n", get_last_error().c_str());
141       }
142     } else {
143       LOG_DEBUG("Unable to retrieve Windows version (GetFileVersionInfo): %s\n",
144                 get_last_error().c_str());
145     }
146   } else {
147     LOG_DEBUG("Unable to retrieve Windows version (GetFileVersionInfoSize): %s\n",
148               get_last_error().c_str());
149   }
150 
151 #ifdef _WIN64
152   os.arch = "x64";
153 #else
154   os.arch = "x86";
155 #endif
156 #else
157   struct utsname client_info;
158   uname(&client_info);
159   os.name = client_info.sysname;
160   os.version = client_info.release;
161   os.arch = client_info.machine;
162 #endif
163 
164   return os;
165 }
166 
167 struct Cpus {
168   int length;
169   String model;
170 };
get_cpus()171 Cpus get_cpus() {
172   Cpus cpus;
173 
174   uv_cpu_info_t* cpus_infos;
175   int cpus_count;
176   int rc = uv_cpu_info(&cpus_infos, &cpus_count);
177   if (rc == 0) {
178     uv_cpu_info_t cpus_info = cpus_infos[0];
179     cpus.length = cpus_count;
180     cpus.model = cpus_info.model;
181     uv_free_cpu_info(cpus_infos, cpus_count);
182   } else {
183     LOG_DEBUG("Unable to determine CPUs information: %s\n", uv_strerror(rc));
184   }
185 
186   return cpus;
187 }
188 
189 class ClientInsightsRequestCallback : public SimpleRequestCallback {
190 public:
191   typedef SharedRefPtr<ClientInsightsRequestCallback> Ptr;
192 
ClientInsightsRequestCallback(const String & json,const String & event_type)193   ClientInsightsRequestCallback(const String& json, const String& event_type)
194       : SimpleRequestCallback("CALL InsightsRpc.reportInsight('" + json + "')")
195       , event_type_(event_type) {}
196 
on_internal_set(ResponseMessage * response)197   virtual void on_internal_set(ResponseMessage* response) {
198     if (response->opcode() != CQL_OPCODE_RESULT) {
199       LOG_DEBUG("Failed to send %s event message: Invalid response [%s]", event_type_.c_str(),
200                 opcode_to_string(response->opcode()).c_str());
201     }
202   }
203 
on_internal_error(CassError code,const String & message)204   virtual void on_internal_error(CassError code, const String& message) {
205     LOG_DEBUG("Failed to send %s event message: %s", event_type_.c_str(), message.c_str());
206   }
207 
on_internal_timeout()208   virtual void on_internal_timeout() {
209     LOG_DEBUG("Failed to send %s event message: Timed out waiting for response",
210               event_type_.c_str());
211   }
212 
213 private:
214   String event_type_;
215 };
216 
metadata(ClientInsights::Writer & writer,const String & name)217 void metadata(ClientInsights::Writer& writer, const String& name) {
218   writer.Key("metadata");
219   writer.StartObject();
220 
221   writer.Key("name");
222   writer.String(name.c_str());
223   writer.Key("insightMappingId");
224   writer.String(METADATA_INSIGHTS_MAPPING_ID);
225   writer.Key("insightType");
226   writer.String("EVENT"); // TODO: Make this an enumeration in the future
227   writer.Key("timestamp");
228   writer.Uint64(get_time_since_epoch_ms());
229   writer.Key("tags");
230   writer.StartObject();
231   writer.Key("language");
232   writer.String(METADATA_LANGUAGE);
233   writer.EndObject(); // tags
234 
235   writer.EndObject();
236 }
237 
238 class StartupMessageHandler : public RefCounted<StartupMessageHandler> {
239 public:
240   typedef SharedRefPtr<StartupMessageHandler> Ptr;
241 
StartupMessageHandler(const Connection::Ptr & connection,const String & client_id,const String & session_id,const Config & config,const HostMap & hosts,const LoadBalancingPolicy::Vec & initialized_policies)242   StartupMessageHandler(const Connection::Ptr& connection, const String& client_id,
243                         const String& session_id, const Config& config, const HostMap& hosts,
244                         const LoadBalancingPolicy::Vec& initialized_policies)
245       : connection_(connection)
246       , client_id_(client_id)
247       , session_id_(session_id)
248       , config_(config)
249       , hosts_(hosts)
250       , initialized_policies_(initialized_policies) {}
251 
~StartupMessageHandler()252   ~StartupMessageHandler() {
253     ClientInsights::StringBuffer buffer;
254     ClientInsights::Writer writer(buffer);
255 
256     writer.StartObject();
257     metadata(writer, METADATA_STARTUP_NAME);
258     startup_message_data(writer);
259     writer.EndObject();
260 
261     assert(writer.IsComplete() && "Startup JSON is incomplete");
262     connection_->write_and_flush(RequestCallback::Ptr(
263         new ClientInsightsRequestCallback(buffer.GetString(), METADATA_STARTUP_NAME)));
264   }
265 
send_message()266   void send_message() { resolve_contact_points(); }
267 
268 private:
269   // Startup message associated methods
startup_message_data(ClientInsights::Writer & writer)270   void startup_message_data(ClientInsights::Writer& writer) {
271     writer.Key("data");
272     writer.StartObject();
273 
274     writer.Key("clientId");
275     writer.String(client_id_.c_str());
276     writer.Key("sessionId");
277     writer.String(session_id_.c_str());
278     bool is_application_name_generated = false;
279     writer.Key("applicationName");
280     if (!config_.application_name().empty()) {
281       writer.String(config_.application_name().c_str());
282     } else {
283       writer.String(driver_name());
284       is_application_name_generated = true;
285     }
286     writer.Key("applicationNameWasGenerated");
287     writer.Bool(is_application_name_generated);
288     if (!config_.application_version().empty()) {
289       writer.Key("applicationVersion");
290       writer.String(config_.application_version().c_str());
291     }
292     writer.Key("driverName");
293     writer.String(driver_name());
294     writer.Key("driverVersion");
295     writer.String(driver_version());
296     contact_points(writer);
297     data_centers(writer);
298     writer.Key("initialControlConnection");
299     writer.String(connection_->resolved_address().to_string(true).c_str());
300     writer.Key("protocolVersion");
301     writer.Int(connection_->protocol_version().value());
302     writer.Key("localAddress");
303     writer.String(get_local_address(connection_->handle()).c_str());
304     writer.Key("hostName");
305     writer.String(get_hostname().c_str());
306     execution_profiles(writer);
307     pool_size_by_host_distance(writer);
308     writer.Key("heartbeatInterval");
309     writer.Uint64(config_.connection_heartbeat_interval_secs() * 1000); // in milliseconds
310     writer.Key("compression");
311     writer.String("NONE"); // TODO: Update once compression is added
312     reconnection_policy(writer);
313     ssl(writer);
314     auth_provider(writer);
315     other_options(writer);
316     platform_info(writer);
317     config_anti_patterns(writer);
318     writer.Key("periodicStatusInterval");
319     writer.Uint(config_.monitor_reporting_interval_secs());
320 
321     writer.EndObject();
322   }
323 
contact_points(ClientInsights::Writer & writer)324   void contact_points(ClientInsights::Writer& writer) {
325     writer.Key("contactPoints");
326     writer.StartObject();
327 
328     for (ResolvedHostMap::const_iterator map_it = contact_points_resolved_.begin(),
329                                          map_end = contact_points_resolved_.end();
330          map_it != map_end; ++map_it) {
331       writer.Key(map_it->first.c_str());
332       writer.StartArray();
333       for (AddressSet::const_iterator vec_it = map_it->second.begin(),
334                                       vec_end = map_it->second.end();
335            vec_it != vec_end; ++vec_it) {
336         writer.String(vec_it->to_string(true).c_str());
337       }
338       writer.EndArray();
339     }
340 
341     writer.EndObject();
342   }
343 
data_centers(ClientInsights::Writer & writer)344   void data_centers(ClientInsights::Writer& writer) {
345     writer.Key("dataCenters");
346     writer.StartArray();
347 
348     Set<String> data_centers;
349     for (HostMap::const_iterator it = hosts_.begin(), end = hosts_.end(); it != end; ++it) {
350       const String& data_center = it->second->dc();
351       if (data_centers.insert(data_center).second) {
352         writer.String(data_center.c_str());
353       }
354     }
355 
356     writer.EndArray();
357   }
358 
execution_profiles(ClientInsights::Writer & writer)359   void execution_profiles(ClientInsights::Writer& writer) {
360     writer.Key("executionProfiles");
361     writer.StartObject();
362 
363     const ExecutionProfile& default_profile = config_.default_profile();
364     const ExecutionProfile::Map& profiles = config_.profiles();
365     writer.Key("default");
366     execution_profile_as_json(writer, default_profile);
367     for (ExecutionProfile::Map::const_iterator it = profiles.begin(), end = profiles.end();
368          it != end; ++it) {
369       writer.Key(it->first.c_str());
370       execution_profile_as_json(writer, it->second, &default_profile);
371     }
372 
373     writer.EndObject();
374   }
375 
pool_size_by_host_distance(ClientInsights::Writer & writer)376   void pool_size_by_host_distance(ClientInsights::Writer& writer) {
377     writer.Key("poolSizeByHostDistance");
378     writer.StartObject();
379 
380     writer.Key("local");
381     writer.Uint(config_.core_connections_per_host() * hosts_.size());
382     // NOTE: Remote does not pertain to DataStax C/C++ driver pool
383     writer.Key("remote");
384     writer.Uint(0);
385 
386     writer.EndObject();
387   }
388 
reconnection_policy(ClientInsights::Writer & writer)389   void reconnection_policy(ClientInsights::Writer& writer) {
390     writer.Key("reconnectionPolicy");
391     writer.StartObject();
392 
393     ReconnectionPolicy::Ptr reconnection_policy = config_.reconnection_policy();
394     writer.Key("type");
395     if (reconnection_policy->type() == ReconnectionPolicy::CONSTANT) {
396       writer.String("ConstantReconnectionPolicy");
397     } else if (reconnection_policy->type() == ReconnectionPolicy::EXPONENTIAL) {
398       writer.String("ExponentialReconnectionPolicy");
399     } else {
400       assert(false && "Reconnection policy needs to be added");
401       writer.String("UnknownReconnectionPolicy");
402     }
403     writer.Key("options");
404     writer.StartObject();
405     if (reconnection_policy->type() == ReconnectionPolicy::CONSTANT) {
406       ConstantReconnectionPolicy::Ptr crp =
407           static_cast<ConstantReconnectionPolicy::Ptr>(reconnection_policy);
408       writer.Key("delayMs");
409       writer.Uint(crp->delay_ms());
410     } else if (reconnection_policy->type() == ReconnectionPolicy::EXPONENTIAL) {
411       ExponentialReconnectionPolicy::Ptr erp =
412           static_cast<ExponentialReconnectionPolicy::Ptr>(reconnection_policy);
413       writer.Key("baseDelayMs");
414       writer.Uint(erp->base_delay_ms());
415       writer.Key("maxDelayMs");
416       writer.Uint(erp->max_delay_ms());
417     }
418     writer.EndObject(); // options
419 
420     writer.EndObject();
421   }
422 
ssl(ClientInsights::Writer & writer)423   void ssl(ClientInsights::Writer& writer) {
424     writer.Key("ssl");
425     writer.StartObject();
426 
427     const SslContext::Ptr& ssl_context = config_.ssl_context();
428     writer.Key("enabled");
429     if (ssl_context) {
430       writer.Bool(true);
431     } else {
432       writer.Bool(false);
433     }
434     writer.Key("certValidation");
435     if (ssl_context) {
436       writer.Bool(ssl_context->is_cert_validation_enabled());
437     } else {
438       writer.Bool(false);
439     }
440 
441     writer.EndObject();
442   }
443 
auth_provider(ClientInsights::Writer & writer)444   void auth_provider(ClientInsights::Writer& writer) {
445     const AuthProvider::Ptr& auth_provider = config_.auth_provider();
446     if (auth_provider) {
447       writer.Key("authProvider");
448       writer.StartObject();
449 
450       writer.Key("type");
451       writer.String(auth_provider->name().c_str());
452 
453       writer.EndObject();
454     }
455   }
456 
other_options(ClientInsights::Writer & writer)457   void other_options(ClientInsights::Writer& writer) {
458     writer.Key("otherOptions");
459     writer.StartObject();
460 
461     writer.Key("configuration");
462     writer.StartObject();
463     writer.Key("protocolVersion");
464     writer.Uint(config_.protocol_version().value());
465     writer.Key("useBetaProtocol");
466     writer.Bool(config_.use_beta_protocol_version());
467     writer.Key("threadCountIo");
468     writer.Uint(config_.thread_count_io());
469     writer.Key("queueSizeIo");
470     writer.Uint(config_.queue_size_io());
471     writer.Key("coreConnectionsPerHost");
472     writer.Uint(config_.core_connections_per_host());
473     writer.Key("connectTimeoutMs");
474     writer.Uint(config_.connect_timeout_ms());
475     writer.Key("resolveTimeoutMs");
476     writer.Uint(config_.resolve_timeout_ms());
477     writer.Key("maxSchemaWaitTimeMs");
478     writer.Uint(config_.max_schema_wait_time_ms());
479     writer.Key("maxTracingWaitTimeMs");
480     writer.Uint(config_.max_tracing_wait_time_ms());
481     writer.Key("tracingConsistency");
482     writer.String(cass_consistency_string(config_.tracing_consistency()));
483     writer.Key("coalesceDelayUs");
484     writer.Uint64(config_.coalesce_delay_us());
485     writer.Key("newRequestRatio");
486     writer.Uint(config_.new_request_ratio());
487     writer.Key("logLevel");
488     writer.String(cass_log_level_string(config_.log_level()));
489     writer.Key("tcpNodelayEnable");
490     writer.Bool(config_.tcp_nodelay_enable());
491     writer.Key("tcpKeepaliveEnable");
492     writer.Bool(config_.tcp_keepalive_enable());
493     writer.Key("tcpKeepaliveDelaySecs");
494     writer.Uint(config_.tcp_keepalive_delay_secs());
495     writer.Key("connectionIdleTimeoutSecs");
496     writer.Uint(config_.connection_idle_timeout_secs());
497     writer.Key("useSchema");
498     writer.Bool(config_.use_schema());
499     writer.Key("useHostnameResolution");
500     writer.Bool(config_.use_hostname_resolution());
501     writer.Key("useRandomizedContactPoints");
502     writer.Bool(config_.use_randomized_contact_points());
503     writer.Key("maxReusableWriteObjects");
504     writer.Uint(config_.max_reusable_write_objects());
505     writer.Key("prepareOnAllHosts");
506     writer.Bool(config_.prepare_on_all_hosts());
507     writer.Key("prepareOnUpOrAddHost");
508     writer.Bool(config_.prepare_on_up_or_add_host());
509     writer.Key("noCompact");
510     writer.Bool(config_.no_compact());
511     writer.Key("cloudSecureConnectBundleLoaded");
512     writer.Bool(config_.cloud_secure_connection_config().is_loaded());
513     writer.Key("clusterMetadataResolver");
514     writer.String(config_.cluster_metadata_resolver_factory()->name());
515     writer.EndObject(); // configuration
516 
517     writer.EndObject(); // otherOptions
518   }
519 
platform_info(ClientInsights::Writer & writer)520   void platform_info(ClientInsights::Writer& writer) {
521     writer.Key("platformInfo");
522     writer.StartObject();
523 
524     writer.Key("os");
525     writer.StartObject();
526     Os os = get_os();
527     writer.Key("name");
528     writer.String(os.name.c_str());
529     writer.Key("version");
530     writer.String(os.version.c_str());
531     writer.Key("arch");
532     writer.String(os.arch.c_str());
533     writer.EndObject(); // os
534 
535     writer.Key("cpus");
536     writer.StartObject();
537     Cpus cpus = get_cpus();
538     writer.Key("length");
539     writer.Int(cpus.length);
540     writer.Key("model");
541     writer.String(cpus.model.c_str());
542     writer.EndObject(); // cpus
543 
544     writer.Key("runtime");
545     writer.StartObject();
546 #if defined(__clang__) || defined(__APPLE_CC__)
547     writer.Key("Clang/LLVM");
548     writer.String(STRINGIFY(__clang_major__) "." STRINGIFY(__clang_minor__) "." STRINGIFY(
549         __clang_patchlevel__));
550 #elif defined(__INTEL_COMPILER)
551     writer.Key("Intel ICC/ICPC");
552     writer.String(STRINGIFY(__INTEL_COMPILER));
553 #elif defined(__GNUC__) || defined(__GNUG__)
554     writer.Key("GNU GCC/G++");
555     writer.String(
556         STRINGIFY(__GNUC__) "." STRINGIFY(__GNUC_MINOR__) "." STRINGIFY(__GNUC_PATCHLEVEL__));
557 #elif defined(__HP_aCC)
558     writer.Key("Hewlett-Packard C/aC++");
559     writer.String(STRINGIFY(__HP_aCC));
560 #elif defined(__IBMCPP__)
561     writer.Key("IBM XL C/C++");
562     writer.String(STRINGIFY(__xlc__));
563 #elif defined(_MSC_VER)
564     writer.Key("Microsoft Visual Studio");
565     writer.String(STRINGIFY(_MSC_FULL_VER));
566 #elif defined(__PGI)
567     writer.Key("Portland Group PGCC/PGCPP");
568     writer.String(
569         STRINGIFY(__PGIC__) "." STRINGIFY(__PGIC_MINOR__) "." STRINGIFY(__PGIC_PATCHLEVEL__));
570 #elif defined(__SUNPRO_CC)
571     writer.Key("Oracle Solaris Studio");
572     writer.String(STRINGIFY(__SUNPRO_CC));
573 #else
574     writer.Key("Unknown");
575     writer.String("Unknown");
576 #endif
577     writer.Key("uv");
578     writer.String(STRINGIFY(UV_VERSION_MAJOR) "." STRINGIFY(UV_VERSION_MINOR) "." STRINGIFY(
579         UV_VERSION_PATCH));
580     writer.Key("openssl");
581 #ifdef OPENSSL_VERSION_TEXT
582     writer.String(OPENSSL_VERSION_TEXT);
583 #else
584 #ifdef LIBRESSL_VERSION_NUMBER
585     writer.String("LibreSSL " STRINGIFY(LIBRESSL_VERSION_NUMBER));
586 #else
587     writer.String("OpenSSL " STRINGIFY(OPENSSL_VERSION_NUMBER));
588 #endif
589 #endif
590     writer.EndObject(); // runtime
591 
592     writer.EndObject(); // platformInfo
593   }
594 
config_anti_patterns(ClientInsights::Writer & writer)595   void config_anti_patterns(ClientInsights::Writer& writer) {
596     StringPairVec config_anti_patterns = get_config_anti_patterns(
597         config_.default_profile(), config_.profiles(), initialized_policies_, hosts_,
598         config_.ssl_context(), config_.auth_provider());
599     if (!config_anti_patterns.empty()) {
600       writer.Key("configAntiPatterns");
601       writer.StartObject();
602 
603       for (StringPairVec::const_iterator it = config_anti_patterns.begin(),
604                                          end = config_anti_patterns.end();
605            it != end; ++it) {
606         writer.Key(it->first.c_str());
607         writer.String(it->second.c_str());
608       }
609 
610       writer.EndObject();
611     }
612   }
613 
614 private:
615   // Startup message helper methods
resolve_contact_points()616   void resolve_contact_points() {
617     const AddressVec& contact_points = config_.contact_points();
618     const int port = config_.port();
619     MultiResolver::Ptr resolver;
620 
621     for (AddressVec::const_iterator it = contact_points.begin(), end = contact_points.end();
622          it != end; ++it) {
623       const Address& contact_point = *it;
624       // Attempt to parse the contact point string. If it's an IP address
625       // then immediately add it to our resolved contact points, otherwise
626       // attempt to resolve the string as a hostname.
627       if (contact_point.is_resolved()) {
628         AddressSet addresses;
629         addresses.insert(contact_point);
630         contact_points_resolved_[contact_point.hostname_or_address()] = addresses;
631       } else {
632         if (!resolver) {
633           inc_ref();
634           resolver.reset(
635               new MultiResolver(bind_callback(&StartupMessageHandler::on_resolve, this)));
636         }
637         resolver->resolve(connection_->loop(), contact_point.hostname_or_address(), port,
638                           config_.resolve_timeout_ms());
639       }
640     }
641 
642     // NOTE: If no resolution is performed the startup message will be sent in
643     //       the destructor
644   }
645 
on_resolve(MultiResolver * resolver)646   void on_resolve(MultiResolver* resolver) {
647     const Resolver::Vec& resolvers = resolver->resolvers();
648     for (Resolver::Vec::const_iterator it = resolvers.begin(), end = resolvers.end(); it != end;
649          ++it) {
650       const Resolver::Ptr resolver(*it);
651       AddressSet addresses;
652       if (resolver->is_success()) {
653         if (!resolver->addresses().empty()) {
654           for (AddressVec::const_iterator it = resolver->addresses().begin(),
655                                           end = resolver->addresses().end();
656                it != end; ++it) {
657             addresses.insert(*it);
658           }
659         }
660       }
661       contact_points_resolved_[resolver->hostname()] = addresses; // Empty resolved addresses are OK
662     }
663 
664     dec_ref(); // Send startup message in destructor
665   }
666 
get_local_address(const uv_tcp_t * tcp) const667   String get_local_address(const uv_tcp_t* tcp) const {
668     Address::SocketStorage name;
669     int namelen = sizeof(name);
670     if (uv_tcp_getsockname(tcp, name.addr(), &namelen) == 0) {
671       Address address(name.addr());
672       if (address.is_valid_and_resolved()) {
673         return address.to_string();
674       }
675     }
676     return "unknown";
677   }
678 
execution_profile_as_json(ClientInsights::Writer & writer,const ExecutionProfile & profile,const ExecutionProfile * default_profile=NULL)679   void execution_profile_as_json(ClientInsights::Writer& writer, const ExecutionProfile& profile,
680                                  const ExecutionProfile* default_profile = NULL) {
681     writer.StartObject();
682 
683     if (!default_profile || (default_profile && profile.request_timeout_ms() !=
684                                                     default_profile->request_timeout_ms())) {
685       writer.Key("requestTimeoutMs");
686       writer.Uint64(profile.request_timeout_ms());
687     }
688     if (!default_profile ||
689         (default_profile && profile.consistency() != default_profile->consistency())) {
690       writer.Key("consistency");
691       writer.String(cass_consistency_string(profile.consistency()));
692     }
693     if (!default_profile || (default_profile && profile.serial_consistency() !=
694                                                     default_profile->serial_consistency())) {
695       writer.Key("serialConsistency");
696       writer.String(cass_consistency_string(profile.serial_consistency()));
697     }
698     if (!default_profile ||
699         (default_profile && profile.retry_policy() != default_profile->retry_policy())) {
700       const RetryPolicy::Ptr& retry_policy = profile.retry_policy();
701       if (retry_policy) {
702         writer.Key("retryPolicy");
703         if (retry_policy->type() == RetryPolicy::DEFAULT) {
704           writer.String("DefaultRetryPolicy");
705         } else if (retry_policy->type() == RetryPolicy::DOWNGRADING) {
706           writer.String("DowngradingConsistencyRetryPolicy");
707         } else if (retry_policy->type() == RetryPolicy::FALLTHROUGH) {
708           writer.String("FallthroughRetryPolicy");
709         } else if (retry_policy->type() == RetryPolicy::LOGGING) {
710           writer.String("LoggingRetryPolicy");
711         } else {
712           LOG_DEBUG("Invalid retry policy: %d", retry_policy->type());
713           writer.String("unknown");
714         }
715       }
716     }
717 
718     if (profile.load_balancing_policy()) {
719       writer.Key("loadBalancing");
720       writer.StartObject();
721       writer.Key("type");
722       LoadBalancingPolicy* current_lbp = profile.load_balancing_policy().get();
723       do {
724         // NOTE: DCAware and RoundRobin are leaf policies (e.g. not chainable)
725         if (dynamic_cast<DCAwarePolicy*>(current_lbp)) {
726           writer.String("DCAwarePolicy");
727           break;
728         } else if (dynamic_cast<RoundRobinPolicy*>(current_lbp)) {
729           writer.String("RoundRobinPolicy");
730           break;
731         }
732 
733         if (ChainedLoadBalancingPolicy* chained_lbp =
734                 dynamic_cast<ChainedLoadBalancingPolicy*>(current_lbp)) {
735           current_lbp = chained_lbp->child_policy().get();
736         } else {
737           current_lbp = NULL;
738         }
739       } while (current_lbp);
740       writer.Key("options");
741       writer.StartObject();
742       if (DCAwarePolicy* dc_lbp = dynamic_cast<DCAwarePolicy*>(current_lbp)) {
743         writer.Key("localDc");
744         if (dc_lbp->local_dc().empty()) {
745           writer.Null();
746         } else {
747           writer.String(dc_lbp->local_dc().c_str());
748         }
749         writer.Key("usedHostsPerRemoteDc");
750         writer.Uint64(dc_lbp->used_hosts_per_remote_dc());
751         writer.Key("allowRemoteDcsForLocalCl");
752         writer.Bool(!dc_lbp->skip_remote_dcs_for_local_cl());
753       }
754       if (!profile.blacklist().empty()) {
755         writer.Key("blacklist");
756         writer.String(implode(profile.blacklist()).c_str());
757       }
758       if (!profile.blacklist_dc().empty()) {
759         writer.Key("blacklistDc");
760         writer.String(implode(profile.blacklist_dc()).c_str());
761       }
762       if (!profile.whitelist().empty()) {
763         writer.Key("whitelist");
764         writer.String(implode(profile.whitelist()).c_str());
765       }
766       if (!profile.whitelist_dc().empty()) {
767         writer.Key("whitelistDc");
768         writer.String(implode(profile.whitelist_dc()).c_str());
769       }
770       if (profile.token_aware_routing()) {
771         writer.Key("tokenAwareRouting");
772         writer.StartObject();
773         writer.Key("shuffleReplicas");
774         writer.Bool(profile.token_aware_routing_shuffle_replicas());
775         writer.EndObject(); // tokenAwareRouting
776       }
777       if (profile.latency_aware()) {
778         writer.Key("latencyAwareRouting");
779         writer.StartObject();
780         writer.Key("exclusionThreshold");
781         writer.Double(profile.latency_aware_routing_settings().exclusion_threshold);
782         writer.Key("scaleNs");
783         writer.Uint64(profile.latency_aware_routing_settings().scale_ns);
784         writer.Key("retryPeriodNs");
785         writer.Uint64(profile.latency_aware_routing_settings().retry_period_ns);
786         writer.Key("updateRateMs");
787         writer.Uint64(profile.latency_aware_routing_settings().update_rate_ms);
788         writer.Key("minMeasured");
789         writer.Uint64(profile.latency_aware_routing_settings().min_measured);
790         writer.EndObject(); // latencyAwareRouting
791       }
792       writer.EndObject(); // options
793 
794       writer.EndObject(); // loadBalancingPolicy
795     }
796 
797     typedef ConstantSpeculativeExecutionPolicy CSEP;
798     CSEP* default_csep =
799         default_profile ? dynamic_cast<CSEP*>(default_profile->speculative_execution_policy().get())
800                         : NULL;
801     CSEP* csep = dynamic_cast<CSEP*>(profile.speculative_execution_policy().get());
802     if (csep) {
803       if (!default_csep ||
804           (default_csep->constant_delay_ms_ != csep->constant_delay_ms_ &&
805            default_csep->max_speculative_executions_ != csep->max_speculative_executions_)) {
806         writer.Key("speculativeExecutionPolicy");
807         writer.StartObject();
808         writer.Key("type");
809         writer.String("ConstantSpeculativeExecutionPolicy");
810 
811         writer.Key("options");
812         writer.StartObject();
813         writer.Key("constantDelayMs");
814         writer.Uint64(csep->constant_delay_ms_);
815         writer.Key("maxSpeculativeExecutions");
816         writer.Int(csep->max_speculative_executions_);
817         writer.EndObject(); // options
818 
819         writer.EndObject(); // speculativeExecutionPolicy
820       }
821     }
822 
823     writer.EndObject(); // executionProfile
824   }
825 
826   typedef std::pair<String, String> StringPair;
827   typedef Vector<StringPair> StringPairVec;
get_config_anti_patterns(const ExecutionProfile & default_profile,const ExecutionProfile::Map & profiles,const LoadBalancingPolicy::Vec & policies,const HostMap & hosts,const SslContext::Ptr & ssl_context,const AuthProvider::Ptr & auth_provider)828   StringPairVec get_config_anti_patterns(const ExecutionProfile& default_profile,
829                                          const ExecutionProfile::Map& profiles,
830                                          const LoadBalancingPolicy::Vec& policies,
831                                          const HostMap& hosts, const SslContext::Ptr& ssl_context,
832                                          const AuthProvider::Ptr& auth_provider) {
833     StringPairVec config_anti_patterns;
834 
835     if (is_contact_points_multiple_dcs(policies, hosts)) {
836       config_anti_patterns.push_back(
837           StringPair("contactPointsMultipleDCs", CONFIG_ANTIPATTERN_MSG_MULTI_DC_HOSTS));
838       LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_MULTI_DC_HOSTS);
839     }
840 
841     for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end();
842          it != end; ++it) {
843       DCAwarePolicy* dc_lbp = get_dc_aware_policy(*it);
844       if (dc_lbp && !dc_lbp->skip_remote_dcs_for_local_cl()) {
845         config_anti_patterns.push_back(
846             StringPair("useRemoteHosts", CONFIG_ANTIPATTERN_MSG_REMOTE_HOSTS));
847         LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_REMOTE_HOSTS);
848         break;
849       }
850     }
851 
852     bool is_downgrading_consistency_enabled =
853         is_downgrading_retry_anti_pattern(default_profile.retry_policy());
854     if (!is_downgrading_consistency_enabled) {
855       for (ExecutionProfile::Map::const_iterator it = profiles.begin(), end = profiles.end();
856            it != end; ++it) {
857         if (is_downgrading_retry_anti_pattern(it->second.retry_policy())) {
858           is_downgrading_consistency_enabled = true;
859           break;
860         }
861       }
862     }
863     if (is_downgrading_consistency_enabled) {
864       config_anti_patterns.push_back(
865           StringPair("downgradingConsistency", CONFIG_ANTIPATTERN_MSG_DOWNGRADING));
866       LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_DOWNGRADING);
867     }
868 
869     if (ssl_context && !ssl_context->is_cert_validation_enabled()) {
870       config_anti_patterns.push_back(
871           StringPair("sslWithoutCertValidation", CONFIG_ANTIPATTERN_MSG_CERT_VALIDATION));
872       LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_CERT_VALIDATION);
873     }
874 
875     if (auth_provider && auth_provider->name().find("PlainTextAuthProvider") != String::npos &&
876         !ssl_context) {
877       config_anti_patterns.push_back(
878           StringPair("plainTextAuthWithoutSsl", CONFIG_ANTIPATTERN_MSG_PLAINTEXT_NO_SSL));
879       LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_PLAINTEXT_NO_SSL);
880     }
881 
882     return config_anti_patterns;
883   }
884 
get_dc_aware_policy(const LoadBalancingPolicy::Ptr & policy)885   DCAwarePolicy* get_dc_aware_policy(const LoadBalancingPolicy::Ptr& policy) {
886     LoadBalancingPolicy* current_lbp = policy.get();
887     do {
888       if (DCAwarePolicy* dc_lbp = dynamic_cast<DCAwarePolicy*>(current_lbp)) {
889         return dc_lbp;
890       }
891       if (ChainedLoadBalancingPolicy* chained_lbp =
892               dynamic_cast<ChainedLoadBalancingPolicy*>(current_lbp)) {
893         current_lbp = chained_lbp->child_policy().get();
894       } else {
895         break;
896       }
897     } while (current_lbp);
898 
899     return NULL;
900   }
901 
is_contact_points_multiple_dcs(const LoadBalancingPolicy::Vec & policies,const HostMap & hosts)902   bool is_contact_points_multiple_dcs(const LoadBalancingPolicy::Vec& policies,
903                                       const HostMap& hosts) {
904     // Get the DC aware load balancing policy if it is the only policy that
905     // exists. If found this policy will be used after the contact points have
906     // been resolved in order to determine if there are contacts points that exist
907     // in multiple DCs using a copy of the discovered hosts.
908     if (policies.size() == 1) {
909       DCAwarePolicy* policy = get_dc_aware_policy(policies[0]);
910       if (policy) {
911         // Loop through the resolved contacts, find the correct initialized host
912         // and if the contact point is a remote host identify as an anti-pattern
913         for (ResolvedHostMap::const_iterator resolved_it = contact_points_resolved_.begin(),
914                                              hosts_end = contact_points_resolved_.end();
915              resolved_it != hosts_end; ++resolved_it) {
916           const AddressSet& addresses = resolved_it->second;
917           for (AddressSet::const_iterator addresses_it = addresses.begin(),
918                                           addresses_end = addresses.end();
919                addresses_it != addresses_end; ++addresses_it) {
920             const Address& address = *addresses_it;
921             for (HostMap::const_iterator hosts_it = hosts.begin(), hosts_end = hosts.end();
922                  hosts_it != hosts_end; ++hosts_it) {
923               const Host::Ptr& host = hosts_it->second;
924               if (host->address() == address &&
925                   policy->distance(host) == CASS_HOST_DISTANCE_REMOTE) {
926                 return true;
927               }
928             }
929           }
930         }
931       }
932     }
933 
934     return false;
935   }
936 
is_downgrading_retry_anti_pattern(const RetryPolicy::Ptr & policy)937   bool is_downgrading_retry_anti_pattern(const RetryPolicy::Ptr& policy) {
938     if (policy && policy->type() == RetryPolicy::DOWNGRADING) {
939       return true;
940     }
941     return false;
942   }
943 
944 private:
945   const Connection::Ptr connection_;
946   const String client_id_;
947   const String session_id_;
948   const Config config_;
949   const HostMap hosts_;
950   const LoadBalancingPolicy::Vec initialized_policies_;
951 
952 private:
953   typedef Map<String, AddressSet> ResolvedHostMap;
954   ResolvedHostMap contact_points_resolved_;
955 };
956 
ClientInsights(const String & client_id,const String & session_id,unsigned interval_secs)957 ClientInsights::ClientInsights(const String& client_id, const String& session_id,
958                                unsigned interval_secs)
959     : client_id_(client_id)
960     , session_id_(session_id)
961     , interval_ms_(interval_secs * 1000) {}
962 
interval_ms(const VersionNumber & dse_server_version) const963 uint64_t ClientInsights::interval_ms(const VersionNumber& dse_server_version) const {
964   // DSE v5.1.13+ (backported)
965   // DSE v6.0.5+ (backported)
966   // DSE v6.7.0 was the first to supported the Insights RPC call
967   if ((dse_server_version >= VersionNumber(5, 1, 13) &&
968        dse_server_version < VersionNumber(6, 0, 0)) ||
969       dse_server_version >= VersionNumber(6, 0, 5)) {
970     return interval_ms_;
971   }
972   return 0;
973 }
974 
send_startup_message(const Connection::Ptr & connection,const Config & config,const HostMap & hosts,const LoadBalancingPolicy::Vec & initialized_policies)975 void ClientInsights::send_startup_message(const Connection::Ptr& connection, const Config& config,
976                                           const HostMap& hosts,
977                                           const LoadBalancingPolicy::Vec& initialized_policies) {
978   StartupMessageHandler::Ptr handler = StartupMessageHandler::Ptr(new StartupMessageHandler(
979       connection, client_id_, session_id_, config, hosts, initialized_policies));
980   handler->send_message();
981 }
982 
send_status_message(const Connection::Ptr & connection,const HostMap & hosts)983 void ClientInsights::send_status_message(const Connection::Ptr& connection, const HostMap& hosts) {
984   StringBuffer buffer;
985   Writer writer(buffer);
986 
987   writer.StartObject();
988   metadata(writer, METADATA_STATUS_NAME);
989 
990   writer.Key("data");
991   writer.StartObject();
992 
993   writer.Key("clientId");
994   writer.String(client_id_.c_str());
995   writer.Key("sessionId");
996   writer.String(session_id_.c_str());
997   writer.Key("controlConnection");
998   writer.String(connection->resolved_address().to_string(true).c_str());
999 
1000   writer.Key("conntectedNodes");
1001   writer.StartObject();
1002   for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
1003     String address_with_port = it->first.to_string(true);
1004     const Host::Ptr& host = it->second;
1005     writer.Key(address_with_port.c_str());
1006     writer.StartObject();
1007     writer.Key("connections");
1008     writer.Int(host->connection_count());
1009     writer.Key("inFlightQueries");
1010     writer.Int(host->inflight_request_count());
1011     writer.EndObject(); // address_with_port
1012   }
1013   writer.EndObject(); // connectedNodes
1014 
1015   writer.EndObject(); // data
1016   writer.EndObject();
1017 
1018   assert(writer.IsComplete() && "Status JSON is incomplete");
1019   connection->write_and_flush(RequestCallback::Ptr(
1020       new ClientInsightsRequestCallback(buffer.GetString(), METADATA_STATUS_NAME)));
1021 }
1022 
1023 }}} // namespace datastax::internal::enterprise
1024