1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifdef _WIN32
18 // Enable memory leak detection
19 #define _CRTDBG_MAP_ALLOC
20 #include <crtdbg.h>
21 #include <stdlib.h>
22 
23 // Enable memory leak detection for new operator
24 #ifdef _DEBUG
25 #ifndef DBG_NEW
26 #define DBG_NEW new (_NORMAL_BLOCK, __FILE__, __LINE__)
27 #define new DBG_NEW
28 #endif
29 #endif
30 #else
31 #include <unistd.h>
32 #endif
33 #include "bridge.hpp"
34 
35 #ifdef CASS_USE_LIBSSH2
36 #include <libssh2.h>
37 #define LIBSSH2_INIT_ALL 0
38 #ifndef LIBSSH2_NO_OPENSSL
39 #ifdef OPENSSL_CLEANUP
40 #define PID_UNKNOWN 0
41 #include <openssl/conf.h>
42 #include <openssl/engine.h>
43 #include <openssl/rand.h>
44 #include <openssl/ssl.h>
45 #endif
46 #endif
47 #endif
48 
49 #include <ctype.h>
50 #include <errno.h>
51 #include <fcntl.h>
52 #include <stdio.h>
53 #include <string.h>
54 
55 #include <algorithm>
56 #include <fstream>
57 #include <iostream>
58 #include <sstream>
59 
60 // Create simple console logging functions
61 #define LOG_MESSAGE(message, is_output)           \
62   if (is_output) {                                \
63     std::cerr << "ccm> " << message << std::endl; \
64   }
65 #define LOG(message) LOG_MESSAGE(message, is_verbose_)
66 #define LOG_ERROR(message) LOG_MESSAGE(message, true)
67 
68 // Create FALSE/TRUE defines for easier code readability
69 #ifndef FALSE
70 #define FALSE 0
71 #endif
72 #ifndef TRUE
73 #define TRUE 1
74 #endif
75 
76 #define TRIM_DELIMETERS " \f\n\r\t\v"
77 #define CASSANDRA_BINARY_PORT 9042
78 #define CASSANDRA_STORAGE_PORT 7000
79 #define CASSANDRA_THRIFT_PORT 9160
80 #define CCM_NAP 100
81 #define CCM_RETRIES 100 // Up to 10 seconds for retry based on CCM_NAP
82 
83 // CCM node status
84 #define CCM_NODE_STATUS_DECOMMISSIONED "decommissioned"
85 #define CCM_NODE_STATUS_DOWN "down"
86 #define CCM_NODE_STATUS_UNINITIALIZED "(not initialized)"
87 #define CCM_NODE_STATUS_UP "up"
88 
89 // Workload value initialization
90 const std::string DSE_WORKLOADS[] = { "cassandra", "cfs",  "dsefs", "graph",
91                                       "hadoop",    "solr", "spark" };
92 const std::vector<std::string>
93     CCM::Bridge::dse_workloads_(DSE_WORKLOADS,
94                                 DSE_WORKLOADS + sizeof(DSE_WORKLOADS) / sizeof(DSE_WORKLOADS[0]));
95 const CCM::DseWorkload DEFAULT_WORKLOAD[] = { CCM::DSE_WORKLOAD_CASSANDRA };
96 const std::vector<CCM::DseWorkload> CCM::Bridge::DEFAULT_DSE_WORKLOAD(
97     DEFAULT_WORKLOAD, DEFAULT_WORKLOAD + sizeof(DEFAULT_WORKLOAD) / sizeof(DEFAULT_WORKLOAD[0]));
98 
99 using namespace CCM;
100 
Bridge(CassVersion server_version,bool use_git,const std::string & branch_tag,bool use_install_dir,const std::string & install_dir,ServerType server_type,std::vector<DseWorkload> dse_workload,const std::string & cluster_prefix,DseCredentialsType dse_credentials_type,const std::string & dse_username,const std::string & dse_password,DeploymentType deployment_type,AuthenticationType authentication_type,const std::string & host,short port,const std::string & username,const std::string & password,const std::string & public_key,const std::string & private_key,bool is_verbose)101 CCM::Bridge::Bridge(
102     CassVersion server_version /*= DEFAULT_CASSANDRA_VERSION*/, bool use_git /*= DEFAULT_USE_GIT*/,
103     const std::string& branch_tag /* ""*/, bool use_install_dir /*=DEFAULT_USE_INSTALL_DIR*/,
104     const std::string& install_dir /*=""*/, ServerType server_type /*= DEFAULT_SERVER_TYPE*/,
105     std::vector<DseWorkload> dse_workload /*= DEFAULT_DSE_WORKLOAD*/,
106     const std::string& cluster_prefix /*= DEFAULT_CLUSTER_PREFIX*/,
107     DseCredentialsType dse_credentials_type /*= DEFAULT_DSE_CREDENTIALS*/,
108     const std::string& dse_username /*= ""*/, const std::string& dse_password /*= ""*/,
109     DeploymentType deployment_type /*= DEFAULT_DEPLOYMENT*/,
110     AuthenticationType authentication_type /*= DEFAULT_AUTHENTICATION*/,
111     const std::string& host /*= DEFAULT_HOST*/, short port /*= DEFAULT_REMOTE_DEPLOYMENT_PORT*/,
112     const std::string& username /*= DEFAULT_USERNAME*/,
113     const std::string& password /*= DEFAULT_PASSWORD*/, const std::string& public_key /*= ""*/,
114     const std::string& private_key /*= ""*/, bool is_verbose /*= DEFAULT_IS_VERBOSE*/)
115     : cassandra_version_(server_version)
116     , dse_version_(DEFAULT_DSE_VERSION)
117     , use_git_(use_git)
118     , branch_tag_(branch_tag)
119     , use_install_dir_(use_install_dir)
120     , install_dir_(install_dir)
121     , server_type_(server_type)
122     , dse_workload_(dse_workload)
123     , cluster_prefix_(cluster_prefix)
124     , authentication_type_(authentication_type)
125     , dse_credentials_type_(dse_credentials_type)
126     , dse_username_(dse_username)
127     , dse_password_(dse_password)
128 #ifdef CASS_USE_LIBSSH2
129     , deployment_type_(deployment_type)
130     , host_(host)
131     , session_(NULL)
132     , channel_(NULL)
133     , socket_(NULL)
134 #else
135     // Force local deployment only
136     , deployment_type_(DeploymentType::LOCAL)
137     , host_("127.0.0.1")
138 #endif
139     , is_verbose_(is_verbose) {
140 #ifdef _WIN32
141 #ifdef _DEBUG
142   // Enable automatic execution of the memory leak detection upon exit
143   _CrtSetDbgFlag(_CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF);
144 #endif
145 #endif
146   // Determine if DSE/DDAC is being used
147   if (!is_cassandra()) {
148     dse_version_ = DseVersion(server_version.to_string());
149     cassandra_version_ = dse_version_.get_cass_version();
150   }
151 
152   // Determine if installation directory can be used
153   if (use_install_dir_ && install_dir_.empty()) {
154     throw BridgeException("Directory must not be blank");
155   }
156 #ifdef CASS_USE_LIBSSH2
157   // Determine if libssh2 needs to be initialized
158   if (deployment_type_ == DeploymentType::REMOTE) {
159     // Initialize the socket
160     try {
161       initialize_socket(host_, port);
162     } catch (SocketException& se) {
163       // Re-throw the exception as a BridgeException
164       finalize_libssh2();
165       throw BridgeException(se.what());
166     }
167 
168     // Initialize the libssh2 connection
169     initialize_libssh2();
170 
171     // Authenticate and establish the libssh2 connection
172     establish_libssh2_connection(authentication_type_, username, password, public_key, private_key);
173   }
174 #endif
175 }
176 
~Bridge()177 CCM::Bridge::~Bridge() {
178 #ifdef CASS_USE_LIBSSH2
179   if (deployment_type_ == DeploymentType::REMOTE) {
180     close_libssh2_terminal();
181     finalize_libssh2();
182   }
183 #endif
184 }
185 
clear_cluster_data()186 void CCM::Bridge::clear_cluster_data() {
187   // Create the cluster clear data command and execute
188   std::vector<std::string> clear_cluster_data_command;
189   clear_cluster_data_command.push_back("clear");
190   execute_ccm_command(clear_cluster_data_command);
191 }
192 
cluster_contact_points(bool is_all)193 std::string CCM::Bridge::cluster_contact_points(bool is_all /*= true*/) {
194   // Determine if all the nodes are being returned or just the available nodes
195   if (!is_all) {
196     // Create the cluster liveset command and execute
197     std::vector<std::string> liveset_command;
198     liveset_command.push_back("liveset");
199     return execute_ccm_command(liveset_command);
200   } else {
201     ClusterStatus status = cluster_status();
202     std::stringstream contact_points;
203     std::string ip_prefix = get_ip_prefix();
204     for (unsigned i = 1; i <= status.node_count; ++i) {
205       if (i > 1) {
206         contact_points << ",";
207       }
208       contact_points << ip_prefix << i;
209     }
210     return contact_points.str();
211   }
212 }
213 
cluster_ip_addresses(bool is_all)214 std::vector<std::string> CCM::Bridge::cluster_ip_addresses(bool is_all /*= true*/) {
215   // Get and sort the IPv4 addresses and return the array/vector
216   std::vector<std::string> ip_addresses = explode(cluster_contact_points(is_all), ',');
217   std::sort(ip_addresses.begin(), ip_addresses.end());
218   return ip_addresses;
219 }
220 
cluster_status()221 ClusterStatus CCM::Bridge::cluster_status() {
222   // Create the cluster status command and execute
223   std::vector<std::string> status_command;
224   status_command.push_back("status");
225   std::string ccm_output = execute_ccm_command(status_command);
226 
227   // Iterate over the output line and parse the tokens
228   std::string ip_prefix = get_ip_prefix();
229   ClusterStatus status;
230   std::istringstream parser(ccm_output);
231   for (std::string token; std::getline(parser, token);) {
232     if (!token.empty()) {
233       // Handle node lines only
234       std::string current_line = to_lower(trim(token));
235       if (current_line.compare(0, 4, "node") == 0) {
236         // Remove node and colon
237         current_line.replace(0, 4, "");
238         size_t colon_index = current_line.find(":");
239         if (colon_index != std::string::npos) {
240           current_line.replace(colon_index, 1, "");
241         }
242 
243         // Split into key and value (node and status) and update cluster status
244         std::vector<std::string> tokens = explode(current_line);
245         if (tokens.size() >= 2) {
246           std::string node_ip_address = ip_prefix + tokens[0];
247           std::string node_status(tokens[1]);
248           ++status.node_count;
249 
250           // Determine the status being updated
251           if (node_status.compare(CCM_NODE_STATUS_DECOMMISSIONED) == 0 ||
252               node_status.compare("decommisionned") ==
253                   0) { // Handle misspelling of decommissioned for older CCM versions
254             status.nodes_decommissioned.push_back(node_ip_address);
255           } else if (node_status.compare(CCM_NODE_STATUS_DOWN) == 0) {
256             // Determine if the node is actually uninitialized
257             if (tokens.size() == 4 &&
258                 (tokens[2] + " " + tokens[3]).compare(CCM_NODE_STATUS_UNINITIALIZED) == 0) {
259               status.nodes_uninitialized.push_back(node_ip_address);
260             } else {
261               status.nodes_down.push_back(node_ip_address);
262             }
263           } else if (node_status.compare(CCM_NODE_STATUS_UP) == 0) {
264             status.nodes_up.push_back(node_ip_address);
265           } else {
266             LOG_ERROR("Node status \"" << node_status << "\" is not valid");
267           }
268         } else {
269           LOG_ERROR("To many tokens detected in \"" << current_line
270                                                     << "\" to determine node status");
271         }
272       }
273     }
274   }
275   return status;
276 }
277 
create_cluster(std::vector<unsigned short> data_center_nodes,bool with_vnodes,bool is_password_authenticator,bool is_ssl,bool is_client_authentication)278 bool CCM::Bridge::create_cluster(std::vector<unsigned short> data_center_nodes,
279                                  bool with_vnodes /*= false*/,
280                                  bool is_password_authenticator /*= false*/,
281                                  bool is_ssl /*= false*/,
282                                  bool is_client_authentication /*= false*/) {
283   // Generate the cluster name and determine if it needs to be created
284   std::string active_cluster_name = get_active_cluster();
285   std::string cluster_name =
286       generate_cluster_name(cassandra_version_, data_center_nodes, with_vnodes,
287                             is_password_authenticator, is_ssl, is_client_authentication);
288   for (std::vector<DseWorkload>::iterator iterator = dse_workload_.begin();
289        iterator != dse_workload_.end(); ++iterator) {
290     if (is_dse() && *iterator != DSE_WORKLOAD_CASSANDRA) {
291       cluster_name.append("-").append(dse_workloads_[*iterator]);
292     }
293   }
294   if (!switch_cluster(cluster_name)) {
295     // Ensure any active cluster is stopped
296     if (!get_active_cluster().empty()) {
297       stop_cluster();
298     }
299 
300     // Create the cluster create command and execute
301     std::vector<std::string> create_command;
302     create_command.push_back("create");
303     if (use_install_dir_ && !install_dir_.empty()) {
304       create_command.push_back("--install-dir=" + install_dir_);
305     } else {
306       create_command.push_back("-v");
307       if (is_cassandra()) {
308         if (use_git_) {
309           if (branch_tag_.empty()) {
310             create_command.push_back("git:cassandra-" + cassandra_version_.to_string());
311           } else {
312             create_command.push_back("git:" + branch_tag_);
313           }
314         } else {
315           create_command.push_back(cassandra_version_.ccm_version());
316         }
317       } else {
318         if (use_git_) {
319           if (branch_tag_.empty()) {
320             create_command.push_back("git:" + dse_version_.to_string());
321           } else {
322             create_command.push_back("git:" + branch_tag_);
323           }
324         } else {
325           create_command.push_back(dse_version_.ccm_version());
326         }
327         if (dse_credentials_type_ == DseCredentialsType::USERNAME_PASSWORD) {
328           create_command.push_back("--dse-username=" + dse_username_);
329           create_command.push_back("--dse-password=" + dse_password_);
330         }
331       }
332     }
333     if (is_dse()) {
334       create_command.push_back("--dse");
335     } else if (is_ddac()) {
336       create_command.push_back("--ddac");
337     }
338     create_command.push_back("-b");
339 
340     // Determine if password authenticator or SSL and client authentication
341     // should be enabled
342     if (is_password_authenticator) {
343       create_command.push_back("--pwd-auth");
344     }
345     if (is_ssl) {
346       // TODO: Use test::Utils::temp_directory() after boost tests are removed and bridge is merged
347       // into testing framework
348 #ifdef _WIN32
349       char* temp = getenv("TEMP");
350       std::string ssl_command = "--ssl=";
351       ssl_command.append(temp);
352       ssl_command.append("\\");
353       ssl_command.append("ssl");
354 #else
355       std::string ssl_command = "--ssl=/tmp/ssl";
356 #endif
357 
358       create_command.push_back(ssl_command);
359       if (is_client_authentication) {
360         create_command.push_back("--require_client_auth");
361       }
362     }
363 
364     // Add the name of the cluster to create
365     create_command.push_back(cluster_name);
366 
367     // Execute the CCM create command
368     execute_ccm_command(create_command);
369 
370     // Generate the cluster update configuration command and execute
371     execute_ccm_command(generate_create_updateconf_command(cassandra_version_));
372     if (is_dse() && dse_version_ >= "6.7.0") {
373       update_cluster_configuration("user_defined_function_fail_micros", "5000000");
374     }
375 
376     // Create the cluster populate command and execute
377     std::string cluster_nodes = generate_cluster_nodes(data_center_nodes);
378     std::string cluster_ip_prefix = get_ip_prefix();
379     std::vector<std::string> populate_command;
380     populate_command.push_back("populate");
381     populate_command.push_back("-n");
382     populate_command.push_back(cluster_nodes);
383     populate_command.push_back("-i");
384     populate_command.push_back(cluster_ip_prefix);
385     if (with_vnodes) {
386       populate_command.push_back("--vnodes");
387     }
388     execute_ccm_command(populate_command);
389 
390     // Update the cluster configuration (set num_tokens)
391     if (with_vnodes) {
392       // Maximum number of tokens is 1536
393       update_cluster_configuration("num_tokens", "1536");
394     }
395 
396     // Set the DSE workload (if applicable)
397     if (is_dse() && !(dse_workload_.size() == 1 && dse_workload_[0] == DSE_WORKLOAD_CASSANDRA)) {
398       set_dse_workloads(dse_workload_);
399     }
400   }
401 
402   // Indicate if the cluster was created or switched
403   return !(active_cluster_name.compare(cluster_name) == 0);
404 }
405 
is_cluster_down()406 bool CCM::Bridge::is_cluster_down() {
407   // Iterate over each node and ensure a connection cannot be made
408   ClusterStatus status = cluster_status();
409   for (unsigned int i = 1; i <= status.node_count; ++i) {
410     // Ensure the node is down
411     if (!is_node_down(i)) {
412       return false;
413     }
414   }
415 
416   // Cluster is down
417   return true;
418 }
419 
is_cluster_up()420 bool CCM::Bridge::is_cluster_up() {
421   // Iterate over each node and ensure a connection can be made
422   ClusterStatus status = cluster_status();
423   for (unsigned int i = 1; i <= status.node_count; ++i) {
424     // Ensure the node is ready/up
425     if (!is_node_up(i)) {
426       return false;
427     }
428   }
429 
430   // Cluster is ready
431   return true;
432 }
433 
hang_up_cluster()434 bool CCM::Bridge::hang_up_cluster() {
435   // Create the cluster stop command and execute
436   std::vector<std::string> stop_command;
437   stop_command.push_back("stop");
438   stop_command.push_back("--hang-up");
439   execute_ccm_command(stop_command);
440 
441   // Ensure the cluster is down
442   return is_cluster_down();
443 }
444 
kill_cluster()445 bool CCM::Bridge::kill_cluster() { return stop_cluster(true); }
446 
remove_cluster()447 void CCM::Bridge::remove_cluster() { remove_cluster(get_active_cluster()); }
448 
remove_cluster(const std::string & cluster_name)449 void CCM::Bridge::remove_cluster(const std::string& cluster_name) {
450   // Create the cluster remove command and execute
451   std::vector<std::string> remove_command;
452   remove_command.push_back("remove");
453   remove_command.push_back(cluster_name);
454   execute_ccm_command(remove_command);
455 }
456 
remove_all_clusters(bool is_all)457 void CCM::Bridge::remove_all_clusters(bool is_all /*= false*/) {
458   // Iterate through all the available clusters
459   std::vector<std::string> clusters = get_available_clusters();
460   for (std::vector<std::string>::const_iterator iterator = clusters.begin();
461        iterator != clusters.end(); ++iterator) {
462     // Determine if the cluster should be removed
463     bool is_valid_cluster = is_all;
464     if (!is_valid_cluster && (*iterator).compare(0, cluster_prefix_.size(), cluster_prefix_) != 0) {
465       continue;
466     }
467     remove_cluster(*iterator);
468   }
469 }
470 
start_cluster(std::vector<std::string> jvm_arguments)471 bool CCM::Bridge::start_cluster(
472     std::vector<std::string> jvm_arguments /*= DEFAULT_JVM_ARGUMENTS*/) {
473   // Create the cluster start command and execute
474   std::vector<std::string> start_command;
475   start_command.push_back("start");
476   start_command.push_back("--wait-other-notice");
477   start_command.push_back("--wait-for-binary-proto");
478 #ifdef _WIN32
479   if (deployment_type_ == DeploymentType::LOCAL) {
480     if (cassandra_version_ >= "2.2.4") {
481       start_command.push_back("--quiet-windows");
482     }
483   }
484 #endif
485   for (std::vector<std::string>::const_iterator iterator = jvm_arguments.begin();
486        iterator != jvm_arguments.end(); ++iterator) {
487     std::string jvm_argument = trim(*iterator);
488     if (!jvm_argument.empty()) {
489       start_command.push_back("--jvm_arg=" + *iterator);
490     }
491   }
492   execute_ccm_command(start_command);
493 
494   // Ensure the cluster is up
495   return is_cluster_up();
496 }
497 
start_cluster(std::string jvm_argument)498 bool CCM::Bridge::start_cluster(std::string jvm_argument /*= ""*/) {
499   std::vector<std::string> jvm_arguments;
500   if (!jvm_argument.empty()) {
501     jvm_arguments.push_back(jvm_argument);
502   }
503   return start_cluster(jvm_arguments);
504 }
505 
stop_cluster(bool is_kill)506 bool CCM::Bridge::stop_cluster(bool is_kill /*= false*/) {
507   // Create the cluster stop command and execute
508   std::vector<std::string> stop_command;
509   stop_command.push_back("stop");
510   if (is_kill) {
511     stop_command.push_back("--not-gently");
512   }
513   execute_ccm_command(stop_command);
514 
515   // Ensure the cluster is down
516   return is_cluster_down();
517 }
518 
switch_cluster(const std::string & cluster_name)519 bool CCM::Bridge::switch_cluster(const std::string& cluster_name) {
520   // Get the active cluster and the available clusters
521   std::string active_cluster;
522   std::vector<std::string> clusters = get_available_clusters(active_cluster);
523 
524   // Determine if the switch should be performed
525   if (active_cluster.compare(trim(cluster_name)) != 0) {
526     // Ensure the cluster is in the list
527     if (std::find(clusters.begin(), clusters.end(), cluster_name) != clusters.end()) {
528       if (!active_cluster.empty()) {
529         kill_cluster();
530       }
531 
532       // Create the cluster switch command and clear the data
533       std::vector<std::string> switch_command;
534       switch_command.push_back("switch");
535       switch_command.push_back(cluster_name);
536       execute_ccm_command(switch_command);
537       clear_cluster_data();
538       return true;
539     }
540   } else {
541     // Cluster requested is already active
542     return true;
543   }
544 
545   // Unable to switch the cluster
546   return false;
547 }
548 
update_cluster_configuration(std::vector<std::string> key_value_pairs,bool is_dse,bool is_yaml)549 void CCM::Bridge::update_cluster_configuration(std::vector<std::string> key_value_pairs,
550                                                bool is_dse /*= false*/, bool is_yaml /*= false*/) {
551   // Create the update configuration command
552   if (is_yaml) {
553     for (std::vector<std::string>::const_iterator iterator = key_value_pairs.begin();
554          iterator != key_value_pairs.end(); ++iterator) {
555       update_cluster_configuration_yaml(*iterator, is_dse);
556     }
557   } else {
558     key_value_pairs.insert(key_value_pairs.begin(), (is_dse ? "updatedseconf" : "updateconf"));
559     execute_ccm_command(key_value_pairs);
560   }
561 }
562 
update_cluster_configuration(const std::string & key,const std::string & value,bool is_dse)563 void CCM::Bridge::update_cluster_configuration(const std::string& key, const std::string& value,
564                                                bool is_dse /*= false*/) {
565   // Create the configuration to be updated
566   std::stringstream configuration;
567   configuration << key << ":" << value;
568 
569   // Create the update configuration command
570   std::vector<std::string> updateconf_command;
571   updateconf_command.push_back(is_dse ? "updatedseconf" : "updateconf");
572   updateconf_command.push_back(configuration.str());
573   execute_ccm_command(updateconf_command);
574 }
575 
update_cluster_configuration_yaml(const std::string & yaml,bool is_dse)576 void CCM::Bridge::update_cluster_configuration_yaml(const std::string& yaml,
577                                                     bool is_dse /*= false*/) {
578   // Create the update configuration command for a literal YAML
579   std::vector<std::string> updateconf_command;
580   updateconf_command.push_back(is_dse ? "updatedseconf" : "updateconf");
581   updateconf_command.push_back("-y");
582   updateconf_command.push_back(yaml);
583   execute_ccm_command(updateconf_command);
584 }
585 
update_node_configuration(unsigned int node,std::vector<std::string> key_value_pairs)586 void CCM::Bridge::update_node_configuration(unsigned int node,
587                                             std::vector<std::string> key_value_pairs) {
588   // Create the update configuration command
589   key_value_pairs.insert(key_value_pairs.begin(), generate_node_name(node));
590   key_value_pairs.insert(key_value_pairs.begin(), "updateconf");
591   execute_ccm_command(key_value_pairs);
592 }
593 
update_node_configuration(unsigned int node,const std::string & key,const std::string & value)594 void CCM::Bridge::update_node_configuration(unsigned int node, const std::string& key,
595                                             const std::string& value) {
596   // Create the configuration to be updated
597   std::stringstream configuration;
598   configuration << key << ":" << value;
599 
600   // Create the update configuration command
601   std::vector<std::string> updateconf_command;
602   updateconf_command.push_back(generate_node_name(node));
603   updateconf_command.push_back("updateconf");
604   updateconf_command.push_back(configuration.str());
605   execute_ccm_command(updateconf_command);
606 }
607 
add_node(const std::string & data_center)608 unsigned int CCM::Bridge::add_node(const std::string& data_center /*= ""*/) {
609   // Generate the arguments for the add node command
610   unsigned int node = get_next_available_node();
611   std::stringstream node_ip_address;
612   node_ip_address << get_ip_prefix() << node;
613   std::stringstream jmx_port;
614   std::stringstream jmx_remote_debug_port;
615   jmx_port << (7000 + (100 * node));
616   jmx_remote_debug_port << (2000 + (100 * node));
617 
618   // Create the add node command and execute
619   std::vector<std::string> add_node_command;
620   add_node_command.push_back("add");
621   add_node_command.push_back("-b");
622   add_node_command.push_back("-i");
623   add_node_command.push_back(node_ip_address.str());
624   add_node_command.push_back("-j");
625   add_node_command.push_back(jmx_port.str());
626   add_node_command.push_back("-r");
627   add_node_command.push_back(jmx_remote_debug_port.str());
628   if (!data_center.empty()) {
629     add_node_command.push_back("-d");
630     add_node_command.push_back(data_center);
631   }
632   if (is_dse()) {
633     add_node_command.push_back("--dse");
634   }
635   add_node_command.push_back(generate_node_name(node));
636   execute_ccm_command(add_node_command);
637 
638   // Return the node created
639   return node;
640 }
641 
bootstrap_node(const std::vector<std::string> & jvm_arguments,const std::string & data_center)642 unsigned int CCM::Bridge::bootstrap_node(const std::vector<std::string>& jvm_arguments,
643                                          const std::string& data_center /*= ""*/) {
644   unsigned int node = add_node(data_center);
645   start_node(node, jvm_arguments);
646   return node;
647 }
648 
bootstrap_node(const std::string & jvm_argument,const std::string & data_center)649 unsigned int CCM::Bridge::bootstrap_node(const std::string& jvm_argument /*= ""*/,
650                                          const std::string& data_center /*= ""*/) {
651   unsigned int node = add_node(data_center);
652   start_node(node, jvm_argument);
653   return node;
654 }
655 
decommission_node(unsigned int node,bool is_force)656 bool CCM::Bridge::decommission_node(unsigned int node, bool is_force /*= false*/) {
657   // Create the node decommission command and execute
658   std::vector<std::string> decommission_node_command;
659   decommission_node_command.push_back(generate_node_name(node));
660   decommission_node_command.push_back("decommission");
661   if (is_force && ((is_cassandra() && cassandra_version_ >= "3.12") || // Cassandra v3.12+
662                    (!is_cassandra() && dse_version_ >= "5.1.0"))) { // DataStax Enterprise v5.1.0+
663     decommission_node_command.push_back("--force");
664   }
665   execute_ccm_command(decommission_node_command);
666 
667   // Ensure the node has been decommissioned
668   return is_node_decommissioned(node);
669 }
670 
disable_node_binary_protocol(unsigned int node)671 void CCM::Bridge::disable_node_binary_protocol(unsigned int node) {
672   // Create the disable node binary protocol command and execute
673   std::vector<std::string> disable_node_binary_protocol_command;
674   disable_node_binary_protocol_command.push_back(generate_node_name(node));
675   disable_node_binary_protocol_command.push_back("nodetool");
676   disable_node_binary_protocol_command.push_back("disablebinary");
677   execute_ccm_command(disable_node_binary_protocol_command);
678 }
679 
disable_node_gossip(unsigned int node)680 void CCM::Bridge::disable_node_gossip(unsigned int node) {
681   // Create the disable node gossip command and execute
682   std::vector<std::string> disable_node_gossip_command;
683   disable_node_gossip_command.push_back(generate_node_name(node));
684   disable_node_gossip_command.push_back("nodetool");
685   disable_node_gossip_command.push_back("disablegossip");
686   execute_ccm_command(disable_node_gossip_command);
687 }
688 
disable_node_trace(unsigned int node)689 void CCM::Bridge::disable_node_trace(unsigned int node) {
690   // Create the disable node trace command and execute
691   std::vector<std::string> disable_node_trace_command;
692   disable_node_trace_command.push_back(generate_node_name(node));
693   disable_node_trace_command.push_back("nodetool");
694   disable_node_trace_command.push_back("settraceprobability");
695   disable_node_trace_command.push_back("0");
696   execute_ccm_command(disable_node_trace_command);
697 }
698 
enable_node_binary_protocol(unsigned int node)699 void CCM::Bridge::enable_node_binary_protocol(unsigned int node) {
700   // Create the enable node binary protocol command and execute
701   std::vector<std::string> enable_node_binary_protocol_command;
702   enable_node_binary_protocol_command.push_back(generate_node_name(node));
703   enable_node_binary_protocol_command.push_back("nodetool");
704   enable_node_binary_protocol_command.push_back("enablebinary");
705   execute_ccm_command(enable_node_binary_protocol_command);
706 }
707 
enable_node_gossip(unsigned int node)708 void CCM::Bridge::enable_node_gossip(unsigned int node) {
709   // Create the enable node gossip command and execute
710   std::vector<std::string> disable_node_gossip_command;
711   disable_node_gossip_command.push_back(generate_node_name(node));
712   disable_node_gossip_command.push_back("nodetool");
713   disable_node_gossip_command.push_back("enablegossip");
714   execute_ccm_command(disable_node_gossip_command);
715 }
716 
enable_node_trace(unsigned int node)717 void CCM::Bridge::enable_node_trace(unsigned int node) {
718   // Create the enable node trace command and execute
719   std::vector<std::string> enable_node_trace_command;
720   enable_node_trace_command.push_back(generate_node_name(node));
721   enable_node_trace_command.push_back("nodetool");
722   enable_node_trace_command.push_back("settraceprobability");
723   enable_node_trace_command.push_back("1");
724   execute_ccm_command(enable_node_trace_command);
725 }
726 
execute_cql_on_node(unsigned int node,const std::string & cql)727 void CCM::Bridge::execute_cql_on_node(unsigned int node, const std::string& cql) {
728   // Update the CQL statement for the command line
729   std::stringstream execute_statement;
730   execute_statement << "\"" << cql << ";\"";
731 
732   // Create the CQLSH pass through command and execute
733   std::vector<std::string> cqlsh_node_command;
734   cqlsh_node_command.push_back(generate_node_name(node));
735   cqlsh_node_command.push_back("cqlsh");
736   cqlsh_node_command.push_back("-x");
737   cqlsh_node_command.push_back(execute_statement.str());
738   execute_ccm_command(cqlsh_node_command);
739 }
740 
force_decommission_node(unsigned int node)741 bool CCM::Bridge::force_decommission_node(unsigned int node) {
742   return decommission_node(node, true);
743 }
744 
hang_up_node(unsigned int node)745 bool CCM::Bridge::hang_up_node(unsigned int node) {
746   // Create the node stop command and execute
747   std::vector<std::string> stop_node_command;
748   stop_node_command.push_back(generate_node_name(node));
749   stop_node_command.push_back("stop");
750   stop_node_command.push_back("--hang-up");
751   execute_ccm_command(stop_node_command);
752 
753   // Ensure the node is down
754   return is_node_down(node);
755 }
756 
kill_node(unsigned int node)757 bool CCM::Bridge::kill_node(unsigned int node) { return stop_node(node, true); }
758 
pause_node(unsigned int node)759 void CCM::Bridge::pause_node(unsigned int node) {
760   // Create the node pause command and execute
761   std::vector<std::string> pause_node_command;
762   pause_node_command.push_back(generate_node_name(node));
763   pause_node_command.push_back("pause");
764   execute_ccm_command(pause_node_command);
765 }
766 
resume_node(unsigned int node)767 void CCM::Bridge::resume_node(unsigned int node) {
768   // Create the node resume command and execute
769   std::vector<std::string> resume_node_command;
770   resume_node_command.push_back(generate_node_name(node));
771   resume_node_command.push_back("resume");
772   execute_ccm_command(resume_node_command);
773 }
774 
start_node(unsigned int node,const std::vector<std::string> & jvm_arguments)775 bool CCM::Bridge::start_node(
776     unsigned int node, const std::vector<std::string>& jvm_arguments /*= DEFAULT_JVM_ARGUMENTS*/) {
777   // Create the node start command and execute
778   std::vector<std::string> start_node_command;
779   start_node_command.push_back(generate_node_name(node));
780   start_node_command.push_back("start");
781   start_node_command.push_back("--wait-other-notice");
782   start_node_command.push_back("--wait-for-binary-proto");
783 #ifdef _WIN32
784   if (deployment_type_ == DeploymentType::LOCAL) {
785     if (cassandra_version_ >= "2.2.4") {
786       start_node_command.push_back("--quiet-windows");
787     }
788   }
789 #endif
790   for (std::vector<std::string>::const_iterator iterator = jvm_arguments.begin();
791        iterator != jvm_arguments.end(); ++iterator) {
792     std::string jvm_argument = trim(*iterator);
793     if (!jvm_argument.empty()) {
794       start_node_command.push_back("--jvm_arg=" + *iterator);
795     }
796   }
797   execute_ccm_command(start_node_command);
798 
799   // Ensure the node is up
800   return is_node_up(node);
801 }
802 
start_node(unsigned int node,const std::string & jvm_argument)803 bool CCM::Bridge::start_node(unsigned int node, const std::string& jvm_argument) {
804   // Create the JVM arguments array/vector
805   std::vector<std::string> jvm_arguments;
806   jvm_arguments.push_back(jvm_argument);
807   return start_node(node, jvm_arguments);
808 }
809 
stop_node(unsigned int node,bool is_kill)810 bool CCM::Bridge::stop_node(unsigned int node, bool is_kill /*= false*/) {
811   // Create the node stop command and execute
812   std::vector<std::string> stop_node_command;
813   stop_node_command.push_back(generate_node_name(node));
814   stop_node_command.push_back("stop");
815   if (is_kill) {
816     stop_node_command.push_back("--not-gently");
817   }
818   execute_ccm_command(stop_node_command);
819 
820   // Ensure the node is down
821   return is_node_down(node);
822 }
823 
get_ip_prefix()824 std::string CCM::Bridge::get_ip_prefix() { return host_.substr(0, host_.size() - 1); }
825 
get_cassandra_version()826 CassVersion CCM::Bridge::get_cassandra_version() {
827   // Get the version string from CCM
828   std::vector<std::string> active_cluster_version_command;
829   active_cluster_version_command.push_back(generate_node_name(1));
830   active_cluster_version_command.push_back("version");
831   std::string ccm_output = execute_ccm_command(active_cluster_version_command);
832 
833   // Ensure the version release information exists and return the version
834   size_t version_index = ccm_output.find("ReleaseVersion:");
835   if (version_index != std::string::npos) {
836     ccm_output.replace(0, version_index + 15, "");
837     return CassVersion(trim(ccm_output));
838   }
839 
840   // Unable to determine version information from active cluster
841   throw BridgeException("Unable to determine version information from active Cassandra cluster \"" +
842                         get_active_cluster() + "\"");
843 }
844 
get_dse_version()845 DseVersion CCM::Bridge::get_dse_version() {
846   // Get the version string from CCM
847   std::vector<std::string> active_cluster_version_command;
848   active_cluster_version_command.push_back(generate_node_name(1));
849   active_cluster_version_command.push_back("dse");
850   active_cluster_version_command.push_back("-v");
851   std::string ccm_output = execute_ccm_command(active_cluster_version_command);
852 
853   // Ensure the version release information exists and return the version
854   ccm_output = trim(ccm_output);
855   if (!ccm_output.empty()) {
856     return DseVersion(ccm_output);
857   }
858 
859   // Unable to determine version information from active cluster
860   throw BridgeException("Unable to determine version information from active DSE/DDAC cluster \"" +
861                         get_active_cluster() + "\"");
862 }
863 
set_dse_workload(unsigned int node,DseWorkload workload,bool is_kill)864 bool CCM::Bridge::set_dse_workload(unsigned int node, DseWorkload workload,
865                                    bool is_kill /*= false */) {
866   std::vector<DseWorkload> workloads;
867   workloads.push_back(workload);
868   return set_dse_workloads(1, workloads, is_kill);
869 }
870 
set_dse_workloads(unsigned int node,std::vector<DseWorkload> workloads,bool is_kill)871 bool CCM::Bridge::set_dse_workloads(unsigned int node, std::vector<DseWorkload> workloads,
872                                     bool is_kill /*= false */) {
873   // Ensure the workloads can be processed
874   if (workloads.empty()) {
875     throw BridgeException("No workloads to assign");
876   }
877 
878   // Update the member variable with the workloads and generate workloads
879   dse_workload_.clear();
880   dse_workload_ = workloads;
881   std::string dse_workloads = generate_dse_workloads(workloads);
882 
883   // Determine if the node is currently active/up
884   bool was_node_active = false;
885   if (!is_node_down(node)) {
886     LOG("Stopping active node \"" << node << "\" and assigning workload(s) \"" << dse_workloads
887                                   << "\"");
888     stop_node(node, is_kill);
889     was_node_active = true;
890   }
891 
892   // Create the node DSE workload command and execute
893   std::vector<std::string> dse_workload_command;
894   dse_workload_command.push_back(generate_node_name(node));
895   dse_workload_command.push_back("setworkload");
896   dse_workload_command.push_back(dse_workloads);
897   execute_ccm_command(dse_workload_command);
898 
899   // Determine if the node should be restarted
900   if (was_node_active) {
901     LOG("Restarting node \"" << node << "\" to applying workload(s) \"" << dse_workloads << "\"");
902     start_node(node);
903   }
904 
905   return was_node_active;
906 }
907 
set_dse_workload(DseWorkload workload,bool is_kill)908 bool CCM::Bridge::set_dse_workload(DseWorkload workload, bool is_kill /*= false */) {
909   std::vector<DseWorkload> workloads;
910   workloads.push_back(workload);
911   return set_dse_workloads(workloads, is_kill);
912 }
913 
set_dse_workloads(std::vector<DseWorkload> workloads,bool is_kill)914 bool CCM::Bridge::set_dse_workloads(std::vector<DseWorkload> workloads, bool is_kill /*= false */) {
915   // Ensure the workloads can be processed
916   if (workloads.empty()) {
917     throw BridgeException("No workloads to assign");
918   }
919 
920   // Determine if the cluster is currently active/up
921   bool was_cluster_active = false;
922   std::string cluster = get_active_cluster();
923   if (!is_cluster_down()) {
924     LOG("Stopping active cluster \"" << cluster << "\" and assigning workload(s) \""
925                                      << generate_dse_workloads(workloads) << "\"");
926     stop_cluster(is_kill);
927     was_cluster_active = true;
928   }
929 
930   // Iterate over each node and set the DSE workload
931   ClusterStatus status = cluster_status();
932   for (unsigned int i = 1; i <= status.node_count; ++i) {
933     set_dse_workloads(i, workloads, false);
934   }
935 
936   // Determine if the cluster should be restarted
937   if (was_cluster_active) {
938     LOG("Restarting cluster \"" << cluster << "\" and applying workload(s) \""
939                                 << generate_dse_workloads(workloads) << "\"");
940     start_cluster();
941   }
942 
943   return was_cluster_active;
944 }
945 
is_node_decommissioned(unsigned int node)946 bool CCM::Bridge::is_node_decommissioned(unsigned int node) {
947   // Iterate over the list of decommissioned nodes
948   std::stringstream node_ip_address;
949   node_ip_address << get_ip_prefix() << node;
950   std::vector<std::string> nodes = cluster_status().nodes_decommissioned;
951   for (std::vector<std::string>::const_iterator iterator = nodes.begin(); iterator < nodes.end();
952        ++iterator) {
953     if (node_ip_address.str().compare(*iterator) == 0) {
954       return true;
955     }
956   }
957 
958   // Node has not been decommissioned
959   return false;
960 }
961 
is_node_down(unsigned int node,bool is_quick_check)962 bool CCM::Bridge::is_node_down(unsigned int node, bool is_quick_check /*= false*/) {
963   if (is_quick_check) {
964     return !is_node_availabe(node);
965   }
966 
967   unsigned int number_of_retries = 0;
968   while (number_of_retries++ < CCM_RETRIES) {
969     if (!is_node_availabe(node)) {
970       return true;
971     } else {
972       std::string cluster = get_active_cluster();
973       LOG("[#" << number_of_retries
974                << "] - Attempting to recheck node down "
975                   "status for node \""
976                << node << "\" in cluster \"" << cluster << "\"");
977       msleep(CCM_NAP);
978     }
979   }
980 
981   // Connection can still being established on node
982   return false;
983 }
984 
is_node_up(unsigned int node,bool is_quick_check)985 bool CCM::Bridge::is_node_up(unsigned int node, bool is_quick_check /*= false*/) {
986   if (is_quick_check) {
987     return is_node_availabe(node);
988   }
989 
990   unsigned int number_of_retries = 0;
991   while (number_of_retries++ < CCM_RETRIES) {
992     if (is_node_availabe(node)) {
993       return true;
994     } else {
995       std::string cluster = get_active_cluster();
996       LOG("[#" << number_of_retries
997                << "] - Attempting to recheck node up "
998                   "status for node \""
999                << node << "\" in cluster \"" << cluster << "\"");
1000       msleep(CCM_NAP);
1001     }
1002   }
1003 
1004   // Connection cannot be established on node
1005   return false;
1006 }
1007 
1008 #ifdef CASS_USE_LIBSSH2
initialize_socket(const std::string & host,short port)1009 void CCM::Bridge::initialize_socket(const std::string& host, short port) {
1010   // Initialize the socket
1011   socket_ = new Socket();
1012 
1013   // Establish socket connection
1014   socket_->establish_connection(host, port);
1015 }
1016 
synchronize_socket()1017 void CCM::Bridge::synchronize_socket() {
1018   // Determine current read/write direction of the session
1019   bool is_read = false;
1020   bool is_write = false;
1021   int read_write_direction = libssh2_session_block_directions(session_);
1022   if (read_write_direction & LIBSSH2_SESSION_BLOCK_INBOUND) {
1023     is_read = true;
1024   }
1025   if (read_write_direction & LIBSSH2_SESSION_BLOCK_OUTBOUND) {
1026     is_write = true;
1027   }
1028 
1029   // Synchronize the socket
1030   socket_->synchronize(is_read, is_write);
1031 }
1032 
initialize_libssh2()1033 void CCM::Bridge::initialize_libssh2() {
1034   // Initialize libssh2
1035   int rc = libssh2_init(LIBSSH2_INIT_ALL);
1036   if (rc) {
1037     finalize_libssh2();
1038     std::stringstream message;
1039     message << "[libssh2] Failed initialization with error code \"" << rc << "\"";
1040     throw BridgeException(message.str());
1041   }
1042 
1043   // Initialize and create the libssh2 session
1044   session_ = libssh2_session_init();
1045   if (!session_) {
1046     finalize_libssh2();
1047     throw BridgeException("[libssh2] Failed session failed");
1048   }
1049 
1050   // Disable blocking on the session
1051   libssh2_session_set_blocking(session_, FALSE);
1052 
1053   // Perform the session handshake; trade banners, exchange keys, setup cyrpto
1054   while ((rc = libssh2_session_handshake(session_, socket_->get_handle())) ==
1055          LIBSSH2_ERROR_EAGAIN) {
1056     ; // no-op
1057   }
1058   if (rc) {
1059     // Determine error that occurred
1060     std::stringstream message;
1061     message << "[libssh2] Failed session handshake with error ";
1062     switch (rc) {
1063       case LIBSSH2_ERROR_SOCKET_NONE:
1064         message << "\"the socket is invalid\"";
1065         break;
1066       case LIBSSH2_ERROR_BANNER_SEND:
1067         message << "\"unable to send banner to remote host\"";
1068         break;
1069       case LIBSSH2_ERROR_KEX_FAILURE:
1070         message << "\"encryption key exchange with the remote host failed\"";
1071         break;
1072       case LIBSSH2_ERROR_SOCKET_SEND:
1073         message << "\"unable to send data on socket\"";
1074         break;
1075       case LIBSSH2_ERROR_SOCKET_DISCONNECT:
1076         message << "\"the socket was disconnected\"";
1077         break;
1078       case LIBSSH2_ERROR_PROTO:
1079         message << "\"an invalid SSH protocol response was received on the socket\"";
1080         break;
1081       default:
1082         message << " code \"" << rc << "\"";
1083         break;
1084     }
1085     finalize_libssh2();
1086     throw BridgeException(message.str());
1087   }
1088 }
1089 
establish_libssh2_connection(AuthenticationType authentication_type,const std::string & username,const std::string & password,const std::string & public_key,const std::string & private_key)1090 void CCM::Bridge::establish_libssh2_connection(AuthenticationType authentication_type,
1091                                                const std::string& username,
1092                                                const std::string& password,
1093                                                const std::string& public_key,
1094                                                const std::string& private_key) {
1095   int rc = 0;
1096 
1097   // Determine authentication mechanism
1098   if (authentication_type == AuthenticationType::USERNAME_PASSWORD) {
1099     // Perform username and password authentication
1100     while ((rc = libssh2_userauth_password(session_, username.c_str(), password.c_str())) ==
1101            LIBSSH2_ERROR_EAGAIN) {
1102       ; // no-op
1103     }
1104   } else {
1105     while ((rc = libssh2_userauth_publickey_fromfile(session_, username.c_str(), public_key.c_str(),
1106                                                      private_key.c_str(), "")) ==
1107            LIBSSH2_ERROR_EAGAIN) {
1108       ; // no-op
1109     }
1110   }
1111 
1112   if (rc) {
1113     // Determine error that occurred
1114     std::stringstream message;
1115     message << "[libssh2] Failed username/password authentication with error ";
1116     switch (rc) {
1117       case LIBSSH2_ERROR_ALLOC:
1118         message << "\"an internal memory allocation call failed\"";
1119         break;
1120       case LIBSSH2_ERROR_SOCKET_SEND:
1121         message << "\"unable to send data on socket\"";
1122         break;
1123       case LIBSSH2_ERROR_SOCKET_TIMEOUT:
1124         message << "\"timed out waiting for response\"";
1125         break;
1126       case LIBSSH2_ERROR_PASSWORD_EXPIRED:
1127         message << "\"password has expired\"";
1128         break;
1129       case LIBSSH2_ERROR_PUBLICKEY_UNVERIFIED:
1130         message << "\"the username/public key combination was invalid\"";
1131         break;
1132       case LIBSSH2_ERROR_AUTHENTICATION_FAILED:
1133         // Ensure error message is displayed for the authentication type
1134         if (authentication_type == AuthenticationType::USERNAME_PASSWORD) {
1135           message << "\"invalid username/password\"";
1136         } else {
1137           message << "\"authentication using the supplied public key was not accepted\"";
1138         }
1139         break;
1140       default:
1141         message << "code \"" << rc << "\"";
1142         break;
1143     }
1144     finalize_libssh2();
1145     throw BridgeException(message.str());
1146   }
1147 }
1148 
open_libssh2_terminal()1149 void CCM::Bridge::open_libssh2_terminal() {
1150   // Open a channel; request a shell
1151   while (session_ != NULL && (channel_ = libssh2_channel_open_session(session_)) == NULL &&
1152          libssh2_session_last_error(session_, NULL, NULL, FALSE) == LIBSSH2_ERROR_EAGAIN) {
1153     synchronize_socket();
1154   }
1155   if (!channel_) {
1156     // Determine error that occurred
1157     int rc = libssh2_session_last_error(session_, NULL, NULL, FALSE);
1158     std::string message("[libssh2] Failed opening session channel with error \"");
1159     switch (rc) {
1160       case LIBSSH2_ERROR_ALLOC:
1161         message.append("an internal memory allocation call failed");
1162         break;
1163       case LIBSSH2_ERROR_SOCKET_SEND:
1164         message.append("unable to send data on socket");
1165         break;
1166       case LIBSSH2_ERROR_CHANNEL_FAILURE:
1167         message.append("unable to open channel");
1168     }
1169     message.append("\"");
1170     finalize_libssh2();
1171     throw BridgeException(message);
1172   }
1173 }
1174 
close_libssh2_terminal()1175 void CCM::Bridge::close_libssh2_terminal() {
1176   if (channel_) {
1177     // Close the libssh2 channel/terminal
1178     int rc = 0;
1179     while ((rc = libssh2_channel_close(channel_)) == LIBSSH2_ERROR_EAGAIN) {
1180       synchronize_socket();
1181     }
1182     if (rc == 0) {
1183       char* exit_signal = NULL;
1184       libssh2_channel_get_exit_status(channel_);
1185       libssh2_channel_get_exit_signal(channel_, &exit_signal, NULL, NULL, NULL, NULL, NULL);
1186       if (exit_signal) {
1187         LOG_ERROR("[libssh2] Failed to close channel with exit signal \"" << exit_signal << "\"");
1188       }
1189     }
1190     if (rc) {
1191       LOG_ERROR("[libssh2] Failed to close channel with error code \"" << rc << "\"");
1192     }
1193 
1194     // Free the channel/terminal resources
1195     while ((rc = libssh2_channel_free(channel_)) == LIBSSH2_ERROR_EAGAIN) {
1196       ; // no-op
1197     }
1198     if (rc) {
1199       LOG_ERROR("[libssh2] Failed to free channel resources with error code \"" << rc << "\"");
1200     }
1201     channel_ = NULL;
1202   }
1203 }
1204 
finalize_libssh2()1205 void CCM::Bridge::finalize_libssh2() {
1206   // Free the libssh2 session
1207   if (session_) {
1208     // Perform session disconnection
1209     int rc = 0;
1210     while ((rc = libssh2_session_disconnect(
1211                 session_, "Shutting down libssh2 CCM bridge session")) == LIBSSH2_ERROR_EAGAIN) {
1212       ; // no-op
1213     }
1214     if (rc) {
1215       LOG_ERROR("[libssh2] Failed to disconnect session with error code \"" << rc << "\"");
1216     }
1217     while ((rc = libssh2_session_free(session_)) == LIBSSH2_ERROR_EAGAIN) {
1218       ; // no-op
1219     }
1220     if (rc) {
1221       LOG_ERROR("[libssh2] Failed to free session resources with error code \"" << rc << "\"");
1222     }
1223     session_ = NULL;
1224   }
1225 
1226   // Free the socket (closes connection)
1227   delete socket_;
1228   socket_ = NULL;
1229 
1230   // Free up remaining libssh2 memory
1231   libssh2_exit();
1232 
1233 #ifndef LIBSSH2_NO_OPENSSL
1234 #ifdef OPENSSL_CLEANUP
1235   // Free OpenSSL resources
1236   RAND_cleanup();
1237   ENGINE_cleanup();
1238   CONF_modules_unload(TRUE);
1239   CONF_modules_free();
1240   EVP_cleanup();
1241   ERR_free_strings();
1242   ERR_remove_state(PID_UNKNOWN);
1243   CRYPTO_cleanup_all_ex_data();
1244 #endif
1245 #endif
1246 }
1247 
execute_libssh2_command(const std::vector<std::string> & command)1248 std::string CCM::Bridge::execute_libssh2_command(const std::vector<std::string>& command) {
1249   // Make sure the libssh2 session wasn't terminated
1250   if (!session_) {
1251     throw BridgeException("[libssh2] Session is invalid/terminated");
1252   }
1253 
1254   // Create/Open libssh2 terminal
1255   open_libssh2_terminal();
1256 
1257   // Execute the command
1258   int rc = 0;
1259   std::string full_command = implode(command);
1260   while ((rc = libssh2_channel_exec(channel_, full_command.c_str())) == LIBSSH2_ERROR_EAGAIN) {
1261     synchronize_socket();
1262   }
1263   if (rc) {
1264     // Determine error that occurred
1265     std::stringstream message;
1266     message << "[libssh2] Failed to execute command with error ";
1267     switch (rc) {
1268       case LIBSSH2_ERROR_ALLOC:
1269         message << "\"An internal memory allocation call failed\"";
1270         break;
1271       case LIBSSH2_ERROR_SOCKET_SEND:
1272         message << "\"Unable to send data on socket\"";
1273         break;
1274       case LIBSSH2_ERROR_CHANNEL_REQUEST_DENIED:
1275         message << "\"Request denied\"";
1276         break;
1277       default:
1278         message << "code \"" << rc << "\"";
1279         break;
1280     }
1281     finalize_libssh2();
1282     throw BridgeException(message.str());
1283   }
1284 
1285   // Get the terminal output, close the terminal and return the output
1286   std::string output = read_libssh2_terminal();
1287   close_libssh2_terminal();
1288   return output;
1289 }
1290 #endif
1291 
1292 #ifdef CASS_USE_LIBSSH2
read_libssh2_terminal()1293 std::string CCM::Bridge::read_libssh2_terminal() {
1294   ssize_t nread = 0;
1295   char buffer[512];
1296   memset(buffer, '\0', sizeof(char) * 512);
1297   std::string output;
1298 
1299   // Read stdout
1300   while (true) {
1301     while ((nread = libssh2_channel_read(channel_, buffer, sizeof(buffer))) > 0) {
1302       if (nread > 0) {
1303         output.append(buffer, nread);
1304       }
1305     }
1306     if (nread == LIBSSH2_ERROR_EAGAIN) {
1307       synchronize_socket();
1308       msleep(CCM_NAP);
1309     } else {
1310       break;
1311     }
1312   }
1313 
1314   // Read stderr
1315   while (true) {
1316     while ((nread = libssh2_channel_read_stderr(channel_, buffer, sizeof(buffer))) > 0) {
1317       if (nread > 0) {
1318         output.append(buffer, nread);
1319       }
1320     }
1321     if (nread == LIBSSH2_ERROR_EAGAIN) {
1322       synchronize_socket();
1323       msleep(CCM_NAP);
1324     } else {
1325       break;
1326     }
1327   }
1328 
1329   return output;
1330 }
1331 #endif
1332 
execute_ccm_command(const std::vector<std::string> & command)1333 std::string CCM::Bridge::execute_ccm_command(const std::vector<std::string>& command) {
1334   // Create the CCM command
1335   std::vector<std::string> ccm_command;
1336   ccm_command.push_back("ccm");
1337   ccm_command.insert(ccm_command.end(), command.begin(), command.end());
1338   LOG(implode(ccm_command));
1339 
1340   // Determine how to execute the command
1341   std::string output;
1342   if (deployment_type_ == DeploymentType::LOCAL) {
1343 #ifdef _WIN32
1344     if (!is_cassandra()) {
1345       std::stringstream message;
1346       message << server_type_.to_string() << " v" << dse_version_.to_string()
1347               << " cannot be launched on Windows platform";
1348       throw BridgeException(message.str());
1349     }
1350 #endif
1351     utils::Process::Result result = utils::Process::execute(ccm_command);
1352     if (result.exit_status != 0) {
1353       throw BridgeException(result.standard_error);
1354     }
1355     output = result.standard_output;
1356 #ifdef CASS_USE_LIBSSH2
1357   } else if (deployment_type_ == DeploymentType::REMOTE) {
1358     output = execute_libssh2_command(ccm_command);
1359     if (!output.empty()) LOG(trim(output));
1360 #endif
1361   }
1362 
1363   return output;
1364 }
1365 
get_active_cluster()1366 std::string CCM::Bridge::get_active_cluster() {
1367   std::string active_cluster;
1368   std::vector<std::string> clusters = get_available_clusters(active_cluster);
1369   return active_cluster;
1370 }
1371 
get_available_clusters()1372 std::vector<std::string> CCM::Bridge::get_available_clusters() {
1373   std::string active_cluster;
1374   return get_available_clusters(active_cluster);
1375 }
1376 
get_available_clusters(std::string & active_cluster)1377 std::vector<std::string> CCM::Bridge::get_available_clusters(std::string& active_cluster) {
1378   // Create the cluster list command and get the list of clusters
1379   std::vector<std::string> list_command;
1380   list_command.push_back("list");
1381   std::vector<std::string> clusters = explode(execute_ccm_command(list_command));
1382 
1383   // Determine the active cluster and correct the cluster array
1384   int index = 0;
1385   for (std::vector<std::string>::const_iterator iterator = clusters.begin();
1386        iterator < clusters.end(); ++iterator) {
1387     std::string cluster = *iterator;
1388     if (cluster.compare(0, 1, "*") == 0) {
1389       cluster.erase(std::remove(cluster.begin(), cluster.end(), '*'), cluster.end());
1390       active_cluster = cluster;
1391       clusters[index] = cluster;
1392     }
1393     ++index;
1394   }
1395   return clusters;
1396 }
1397 
generate_cluster_name(CassVersion cassandra_version,std::vector<unsigned short> data_center_nodes,bool with_vnodes,bool is_password_authenticator,bool is_ssl,bool is_client_authentication)1398 std::string CCM::Bridge::generate_cluster_name(CassVersion cassandra_version,
1399                                                std::vector<unsigned short> data_center_nodes,
1400                                                bool with_vnodes, bool is_password_authenticator,
1401                                                bool is_ssl, bool is_client_authentication) {
1402   std::stringstream cluster_name;
1403   std::string server_version =
1404       !is_cassandra() ? dse_version_.to_string(false) : cassandra_version.to_string(false);
1405   std::replace(server_version.begin(), server_version.end(), '.', '-');
1406   cluster_name << cluster_prefix_ << "_" << server_version << "_"
1407                << generate_cluster_nodes(data_center_nodes, '-');
1408   if (with_vnodes) {
1409     cluster_name << "-vnodes";
1410   }
1411   if (is_password_authenticator) {
1412     cluster_name << "-password_authenticator";
1413   }
1414   if (is_ssl) {
1415     cluster_name << "-ssl";
1416     if (is_client_authentication) {
1417       cluster_name << "-client_authentication";
1418     }
1419   }
1420   return cluster_name.str();
1421 }
1422 
generate_cluster_nodes(std::vector<unsigned short> data_center_nodes,char separator)1423 std::string CCM::Bridge::generate_cluster_nodes(std::vector<unsigned short> data_center_nodes,
1424                                                 char separator /* = ':'*/) {
1425   std::stringstream cluster_nodes;
1426   for (std::vector<unsigned short>::iterator iterator = data_center_nodes.begin();
1427        iterator != data_center_nodes.end(); ++iterator) {
1428     cluster_nodes << *iterator;
1429     if ((iterator + 1) != data_center_nodes.end()) {
1430       cluster_nodes << separator;
1431     }
1432   }
1433   return cluster_nodes.str();
1434 }
1435 
1436 std::vector<std::string>
generate_create_updateconf_command(CassVersion cassandra_version)1437 CCM::Bridge::generate_create_updateconf_command(CassVersion cassandra_version) {
1438   // TODO: Add SSL setup and client authentication
1439   // Create the update configuration command (common updates)
1440   std::vector<std::string> updateconf_command;
1441   updateconf_command.push_back("updateconf");
1442   // Disable optimizations (limits) when using DSE/DDAC
1443   if (is_cassandra()) {
1444     updateconf_command.push_back("--rt=10000");
1445     updateconf_command.push_back("read_request_timeout_in_ms:10000");
1446     updateconf_command.push_back("write_request_timeout_in_ms:10000");
1447     updateconf_command.push_back("request_timeout_in_ms:10000");
1448     updateconf_command.push_back("phi_convict_threshold:16");
1449     updateconf_command.push_back("hinted_handoff_enabled:false");
1450     updateconf_command.push_back("dynamic_snitch_update_interval_in_ms:1000");
1451     updateconf_command.push_back("native_transport_max_threads:1");
1452     updateconf_command.push_back("concurrent_reads:2");
1453     updateconf_command.push_back("concurrent_writes:2");
1454     updateconf_command.push_back("concurrent_compactors:1");
1455     updateconf_command.push_back("compaction_throughput_mb_per_sec:0");
1456     updateconf_command.push_back("key_cache_size_in_mb:0");
1457     updateconf_command.push_back("key_cache_save_period:0");
1458     updateconf_command.push_back("memtable_flush_writers:1");
1459     updateconf_command.push_back("max_hints_delivery_threads:1");
1460 
1461     // Create Cassandra version specific updates (C* v1.2.x)
1462     if (cassandra_version < "2.0.0") {
1463       updateconf_command.push_back("reduce_cache_sizes_at:0");
1464       updateconf_command.push_back("reduce_cache_capacity_to:0");
1465       updateconf_command.push_back("flush_largest_memtables_at:0");
1466       updateconf_command.push_back("index_interval:512");
1467     } else {
1468       updateconf_command.push_back("cas_contention_timeout_in_ms:10000");
1469       updateconf_command.push_back("file_cache_size_in_mb:0");
1470     }
1471 
1472     // Create Cassandra version specific updates (C* < v2.1)
1473     if (cassandra_version < "2.1.0") {
1474       updateconf_command.push_back("in_memory_compaction_limit_in_mb:1");
1475     }
1476 
1477     // Create Cassandra version specific updates (C* < v4.0)
1478     if (cassandra_version < "4.0.0") {
1479       updateconf_command.push_back("rpc_min_threads:1");
1480       updateconf_command.push_back("rpc_max_threads:1");
1481     }
1482   }
1483 
1484   // Create Cassandra version specific updated (C* 2.2+)
1485   if (cassandra_version >= "2.2.0") {
1486     updateconf_command.push_back("enable_user_defined_functions:true");
1487   }
1488 
1489   // Create Cassandra version specific updated (C* 3.0+)
1490   if (cassandra_version >= "3.0.0") {
1491     updateconf_command.push_back("enable_scripted_user_defined_functions:true");
1492   }
1493 
1494   return updateconf_command;
1495 }
1496 
generate_dse_workloads(std::vector<DseWorkload> workloads)1497 std::string CCM::Bridge::generate_dse_workloads(std::vector<DseWorkload> workloads) {
1498   std::string dse_workloads;
1499   for (std::vector<DseWorkload>::iterator iterator = workloads.begin(); iterator != workloads.end();
1500        ++iterator) {
1501     dse_workloads += dse_workloads_[*iterator];
1502     if ((iterator + 1) != workloads.end()) {
1503       dse_workloads += ",";
1504     }
1505   }
1506   return dse_workloads;
1507 }
1508 
generate_node_name(unsigned int node)1509 std::string CCM::Bridge::generate_node_name(unsigned int node) {
1510   std::stringstream node_name;
1511   node_name << "node" << node;
1512   return node_name.str();
1513 }
1514 
get_next_available_node()1515 unsigned int CCM::Bridge::get_next_available_node() {
1516   ClusterStatus status = cluster_status();
1517   unsigned int next_available_node = status.node_count + 1;
1518   if (next_available_node > CLUSTER_NODE_LIMIT) {
1519     std::stringstream message;
1520     message << "Failed to get next available node; cluster limit of \"" << CLUSTER_NODE_LIMIT
1521             << "\" nodes reached";
1522     throw BridgeException(message.str());
1523   }
1524 
1525   return next_available_node;
1526 }
1527 
is_node_availabe(unsigned int node)1528 bool CCM::Bridge::is_node_availabe(unsigned int node) {
1529   std::stringstream ip_address;
1530   ip_address << get_ip_prefix() << node;
1531   return is_node_availabe(ip_address.str());
1532 }
1533 
is_node_availabe(const std::string & ip_address)1534 bool CCM::Bridge::is_node_availabe(const std::string& ip_address) {
1535   Socket socket;
1536   try {
1537     socket.establish_connection(ip_address, CASSANDRA_BINARY_PORT);
1538     return true;
1539   } catch (...) {
1540     ; // No-op
1541   }
1542 
1543   return false;
1544 }
1545 
to_lower(const std::string & input)1546 std::string CCM::Bridge::to_lower(const std::string& input) {
1547   std::string lowercase = input;
1548   std::transform(lowercase.begin(), lowercase.end(), lowercase.begin(), ::tolower);
1549   return lowercase;
1550 }
1551 
trim(const std::string & input)1552 std::string CCM::Bridge::trim(const std::string& input) {
1553   std::string result;
1554   if (!input.empty()) {
1555     // Trim right
1556     result = input.substr(0, input.find_last_not_of(TRIM_DELIMETERS) + 1);
1557     if (!result.empty()) {
1558       // Trim left
1559       result = result.substr(result.find_first_not_of(TRIM_DELIMETERS));
1560     }
1561   }
1562   return result;
1563 }
1564 
implode(const std::vector<std::string> & elements,const char delimiter)1565 std::string CCM::Bridge::implode(const std::vector<std::string>& elements,
1566                                  const char delimiter /*= ' '*/) {
1567   // Iterate through each element in the vector and concatenate the string
1568   std::string result;
1569   for (std::vector<std::string>::const_iterator iterator = elements.begin();
1570        iterator < elements.end(); ++iterator) {
1571     result += *iterator;
1572     if ((iterator + 1) != elements.end()) {
1573       result += delimiter;
1574     }
1575   }
1576   return result;
1577 }
1578 
explode(const std::string & input,const char delimiter)1579 std::vector<std::string> CCM::Bridge::explode(const std::string& input,
1580                                               const char delimiter /*= ' '*/) {
1581   // Iterate over the input line and parse the tokens
1582   std::vector<std::string> result;
1583   std::istringstream parser(input);
1584   for (std::string token; std::getline(parser, token, delimiter);) {
1585     if (!token.empty()) {
1586       result.push_back(trim(token));
1587     }
1588   }
1589   return result;
1590 }
1591 
msleep(unsigned int milliseconds)1592 void CCM::Bridge::msleep(unsigned int milliseconds) {
1593 #ifdef _WIN32
1594   Sleep(milliseconds);
1595 #else
1596   // Convert the milliseconds into a proper timespec structure
1597   time_t seconds = static_cast<int>(milliseconds / 1000);
1598   long int nanoseconds = static_cast<long int>((milliseconds - (seconds * 1000)) * 1000000);
1599 
1600   // Assign the requested time and perform sleep
1601   struct timespec requested;
1602   requested.tv_sec = seconds;
1603   requested.tv_nsec = nanoseconds;
1604   while (nanosleep(&requested, &requested) == -1) {
1605     continue;
1606   }
1607 #endif
1608 }
1609