1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifdef _WIN32
18 // Enable memory leak detection
19 #define _CRTDBG_MAP_ALLOC
20 #include <crtdbg.h>
21 #include <stdlib.h>
22
23 // Enable memory leak detection for new operator
24 #ifdef _DEBUG
25 #ifndef DBG_NEW
26 #define DBG_NEW new (_NORMAL_BLOCK, __FILE__, __LINE__)
27 #define new DBG_NEW
28 #endif
29 #endif
30 #else
31 #include <unistd.h>
32 #endif
33 #include "bridge.hpp"
34
35 #ifdef CASS_USE_LIBSSH2
36 #include <libssh2.h>
37 #define LIBSSH2_INIT_ALL 0
38 #ifndef LIBSSH2_NO_OPENSSL
39 #ifdef OPENSSL_CLEANUP
40 #define PID_UNKNOWN 0
41 #include <openssl/conf.h>
42 #include <openssl/engine.h>
43 #include <openssl/rand.h>
44 #include <openssl/ssl.h>
45 #endif
46 #endif
47 #endif
48
49 #include <ctype.h>
50 #include <errno.h>
51 #include <fcntl.h>
52 #include <stdio.h>
53 #include <string.h>
54
55 #include <algorithm>
56 #include <fstream>
57 #include <iostream>
58 #include <sstream>
59
60 // Create simple console logging functions
61 #define LOG_MESSAGE(message, is_output) \
62 if (is_output) { \
63 std::cerr << "ccm> " << message << std::endl; \
64 }
65 #define LOG(message) LOG_MESSAGE(message, is_verbose_)
66 #define LOG_ERROR(message) LOG_MESSAGE(message, true)
67
68 // Create FALSE/TRUE defines for easier code readability
69 #ifndef FALSE
70 #define FALSE 0
71 #endif
72 #ifndef TRUE
73 #define TRUE 1
74 #endif
75
76 #define TRIM_DELIMETERS " \f\n\r\t\v"
77 #define CASSANDRA_BINARY_PORT 9042
78 #define CASSANDRA_STORAGE_PORT 7000
79 #define CASSANDRA_THRIFT_PORT 9160
80 #define CCM_NAP 100
81 #define CCM_RETRIES 100 // Up to 10 seconds for retry based on CCM_NAP
82
83 // CCM node status
84 #define CCM_NODE_STATUS_DECOMMISSIONED "decommissioned"
85 #define CCM_NODE_STATUS_DOWN "down"
86 #define CCM_NODE_STATUS_UNINITIALIZED "(not initialized)"
87 #define CCM_NODE_STATUS_UP "up"
88
89 // Workload value initialization
90 const std::string DSE_WORKLOADS[] = { "cassandra", "cfs", "dsefs", "graph",
91 "hadoop", "solr", "spark" };
92 const std::vector<std::string>
93 CCM::Bridge::dse_workloads_(DSE_WORKLOADS,
94 DSE_WORKLOADS + sizeof(DSE_WORKLOADS) / sizeof(DSE_WORKLOADS[0]));
95 const CCM::DseWorkload DEFAULT_WORKLOAD[] = { CCM::DSE_WORKLOAD_CASSANDRA };
96 const std::vector<CCM::DseWorkload> CCM::Bridge::DEFAULT_DSE_WORKLOAD(
97 DEFAULT_WORKLOAD, DEFAULT_WORKLOAD + sizeof(DEFAULT_WORKLOAD) / sizeof(DEFAULT_WORKLOAD[0]));
98
99 using namespace CCM;
100
Bridge(CassVersion server_version,bool use_git,const std::string & branch_tag,bool use_install_dir,const std::string & install_dir,ServerType server_type,std::vector<DseWorkload> dse_workload,const std::string & cluster_prefix,DseCredentialsType dse_credentials_type,const std::string & dse_username,const std::string & dse_password,DeploymentType deployment_type,AuthenticationType authentication_type,const std::string & host,short port,const std::string & username,const std::string & password,const std::string & public_key,const std::string & private_key,bool is_verbose)101 CCM::Bridge::Bridge(
102 CassVersion server_version /*= DEFAULT_CASSANDRA_VERSION*/, bool use_git /*= DEFAULT_USE_GIT*/,
103 const std::string& branch_tag /* ""*/, bool use_install_dir /*=DEFAULT_USE_INSTALL_DIR*/,
104 const std::string& install_dir /*=""*/, ServerType server_type /*= DEFAULT_SERVER_TYPE*/,
105 std::vector<DseWorkload> dse_workload /*= DEFAULT_DSE_WORKLOAD*/,
106 const std::string& cluster_prefix /*= DEFAULT_CLUSTER_PREFIX*/,
107 DseCredentialsType dse_credentials_type /*= DEFAULT_DSE_CREDENTIALS*/,
108 const std::string& dse_username /*= ""*/, const std::string& dse_password /*= ""*/,
109 DeploymentType deployment_type /*= DEFAULT_DEPLOYMENT*/,
110 AuthenticationType authentication_type /*= DEFAULT_AUTHENTICATION*/,
111 const std::string& host /*= DEFAULT_HOST*/, short port /*= DEFAULT_REMOTE_DEPLOYMENT_PORT*/,
112 const std::string& username /*= DEFAULT_USERNAME*/,
113 const std::string& password /*= DEFAULT_PASSWORD*/, const std::string& public_key /*= ""*/,
114 const std::string& private_key /*= ""*/, bool is_verbose /*= DEFAULT_IS_VERBOSE*/)
115 : cassandra_version_(server_version)
116 , dse_version_(DEFAULT_DSE_VERSION)
117 , use_git_(use_git)
118 , branch_tag_(branch_tag)
119 , use_install_dir_(use_install_dir)
120 , install_dir_(install_dir)
121 , server_type_(server_type)
122 , dse_workload_(dse_workload)
123 , cluster_prefix_(cluster_prefix)
124 , authentication_type_(authentication_type)
125 , dse_credentials_type_(dse_credentials_type)
126 , dse_username_(dse_username)
127 , dse_password_(dse_password)
128 #ifdef CASS_USE_LIBSSH2
129 , deployment_type_(deployment_type)
130 , host_(host)
131 , session_(NULL)
132 , channel_(NULL)
133 , socket_(NULL)
134 #else
135 // Force local deployment only
136 , deployment_type_(DeploymentType::LOCAL)
137 , host_("127.0.0.1")
138 #endif
139 , is_verbose_(is_verbose) {
140 #ifdef _WIN32
141 #ifdef _DEBUG
142 // Enable automatic execution of the memory leak detection upon exit
143 _CrtSetDbgFlag(_CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF);
144 #endif
145 #endif
146 // Determine if DSE/DDAC is being used
147 if (!is_cassandra()) {
148 dse_version_ = DseVersion(server_version.to_string());
149 cassandra_version_ = dse_version_.get_cass_version();
150 }
151
152 // Determine if installation directory can be used
153 if (use_install_dir_ && install_dir_.empty()) {
154 throw BridgeException("Directory must not be blank");
155 }
156 #ifdef CASS_USE_LIBSSH2
157 // Determine if libssh2 needs to be initialized
158 if (deployment_type_ == DeploymentType::REMOTE) {
159 // Initialize the socket
160 try {
161 initialize_socket(host_, port);
162 } catch (SocketException& se) {
163 // Re-throw the exception as a BridgeException
164 finalize_libssh2();
165 throw BridgeException(se.what());
166 }
167
168 // Initialize the libssh2 connection
169 initialize_libssh2();
170
171 // Authenticate and establish the libssh2 connection
172 establish_libssh2_connection(authentication_type_, username, password, public_key, private_key);
173 }
174 #endif
175 }
176
~Bridge()177 CCM::Bridge::~Bridge() {
178 #ifdef CASS_USE_LIBSSH2
179 if (deployment_type_ == DeploymentType::REMOTE) {
180 close_libssh2_terminal();
181 finalize_libssh2();
182 }
183 #endif
184 }
185
clear_cluster_data()186 void CCM::Bridge::clear_cluster_data() {
187 // Create the cluster clear data command and execute
188 std::vector<std::string> clear_cluster_data_command;
189 clear_cluster_data_command.push_back("clear");
190 execute_ccm_command(clear_cluster_data_command);
191 }
192
cluster_contact_points(bool is_all)193 std::string CCM::Bridge::cluster_contact_points(bool is_all /*= true*/) {
194 // Determine if all the nodes are being returned or just the available nodes
195 if (!is_all) {
196 // Create the cluster liveset command and execute
197 std::vector<std::string> liveset_command;
198 liveset_command.push_back("liveset");
199 return execute_ccm_command(liveset_command);
200 } else {
201 ClusterStatus status = cluster_status();
202 std::stringstream contact_points;
203 std::string ip_prefix = get_ip_prefix();
204 for (unsigned i = 1; i <= status.node_count; ++i) {
205 if (i > 1) {
206 contact_points << ",";
207 }
208 contact_points << ip_prefix << i;
209 }
210 return contact_points.str();
211 }
212 }
213
cluster_ip_addresses(bool is_all)214 std::vector<std::string> CCM::Bridge::cluster_ip_addresses(bool is_all /*= true*/) {
215 // Get and sort the IPv4 addresses and return the array/vector
216 std::vector<std::string> ip_addresses = explode(cluster_contact_points(is_all), ',');
217 std::sort(ip_addresses.begin(), ip_addresses.end());
218 return ip_addresses;
219 }
220
cluster_status()221 ClusterStatus CCM::Bridge::cluster_status() {
222 // Create the cluster status command and execute
223 std::vector<std::string> status_command;
224 status_command.push_back("status");
225 std::string ccm_output = execute_ccm_command(status_command);
226
227 // Iterate over the output line and parse the tokens
228 std::string ip_prefix = get_ip_prefix();
229 ClusterStatus status;
230 std::istringstream parser(ccm_output);
231 for (std::string token; std::getline(parser, token);) {
232 if (!token.empty()) {
233 // Handle node lines only
234 std::string current_line = to_lower(trim(token));
235 if (current_line.compare(0, 4, "node") == 0) {
236 // Remove node and colon
237 current_line.replace(0, 4, "");
238 size_t colon_index = current_line.find(":");
239 if (colon_index != std::string::npos) {
240 current_line.replace(colon_index, 1, "");
241 }
242
243 // Split into key and value (node and status) and update cluster status
244 std::vector<std::string> tokens = explode(current_line);
245 if (tokens.size() >= 2) {
246 std::string node_ip_address = ip_prefix + tokens[0];
247 std::string node_status(tokens[1]);
248 ++status.node_count;
249
250 // Determine the status being updated
251 if (node_status.compare(CCM_NODE_STATUS_DECOMMISSIONED) == 0 ||
252 node_status.compare("decommisionned") ==
253 0) { // Handle misspelling of decommissioned for older CCM versions
254 status.nodes_decommissioned.push_back(node_ip_address);
255 } else if (node_status.compare(CCM_NODE_STATUS_DOWN) == 0) {
256 // Determine if the node is actually uninitialized
257 if (tokens.size() == 4 &&
258 (tokens[2] + " " + tokens[3]).compare(CCM_NODE_STATUS_UNINITIALIZED) == 0) {
259 status.nodes_uninitialized.push_back(node_ip_address);
260 } else {
261 status.nodes_down.push_back(node_ip_address);
262 }
263 } else if (node_status.compare(CCM_NODE_STATUS_UP) == 0) {
264 status.nodes_up.push_back(node_ip_address);
265 } else {
266 LOG_ERROR("Node status \"" << node_status << "\" is not valid");
267 }
268 } else {
269 LOG_ERROR("To many tokens detected in \"" << current_line
270 << "\" to determine node status");
271 }
272 }
273 }
274 }
275 return status;
276 }
277
create_cluster(std::vector<unsigned short> data_center_nodes,bool with_vnodes,bool is_password_authenticator,bool is_ssl,bool is_client_authentication)278 bool CCM::Bridge::create_cluster(std::vector<unsigned short> data_center_nodes,
279 bool with_vnodes /*= false*/,
280 bool is_password_authenticator /*= false*/,
281 bool is_ssl /*= false*/,
282 bool is_client_authentication /*= false*/) {
283 // Generate the cluster name and determine if it needs to be created
284 std::string active_cluster_name = get_active_cluster();
285 std::string cluster_name =
286 generate_cluster_name(cassandra_version_, data_center_nodes, with_vnodes,
287 is_password_authenticator, is_ssl, is_client_authentication);
288 for (std::vector<DseWorkload>::iterator iterator = dse_workload_.begin();
289 iterator != dse_workload_.end(); ++iterator) {
290 if (is_dse() && *iterator != DSE_WORKLOAD_CASSANDRA) {
291 cluster_name.append("-").append(dse_workloads_[*iterator]);
292 }
293 }
294 if (!switch_cluster(cluster_name)) {
295 // Ensure any active cluster is stopped
296 if (!get_active_cluster().empty()) {
297 stop_cluster();
298 }
299
300 // Create the cluster create command and execute
301 std::vector<std::string> create_command;
302 create_command.push_back("create");
303 if (use_install_dir_ && !install_dir_.empty()) {
304 create_command.push_back("--install-dir=" + install_dir_);
305 } else {
306 create_command.push_back("-v");
307 if (is_cassandra()) {
308 if (use_git_) {
309 if (branch_tag_.empty()) {
310 create_command.push_back("git:cassandra-" + cassandra_version_.to_string());
311 } else {
312 create_command.push_back("git:" + branch_tag_);
313 }
314 } else {
315 create_command.push_back(cassandra_version_.ccm_version());
316 }
317 } else {
318 if (use_git_) {
319 if (branch_tag_.empty()) {
320 create_command.push_back("git:" + dse_version_.to_string());
321 } else {
322 create_command.push_back("git:" + branch_tag_);
323 }
324 } else {
325 create_command.push_back(dse_version_.ccm_version());
326 }
327 if (dse_credentials_type_ == DseCredentialsType::USERNAME_PASSWORD) {
328 create_command.push_back("--dse-username=" + dse_username_);
329 create_command.push_back("--dse-password=" + dse_password_);
330 }
331 }
332 }
333 if (is_dse()) {
334 create_command.push_back("--dse");
335 } else if (is_ddac()) {
336 create_command.push_back("--ddac");
337 }
338 create_command.push_back("-b");
339
340 // Determine if password authenticator or SSL and client authentication
341 // should be enabled
342 if (is_password_authenticator) {
343 create_command.push_back("--pwd-auth");
344 }
345 if (is_ssl) {
346 // TODO: Use test::Utils::temp_directory() after boost tests are removed and bridge is merged
347 // into testing framework
348 #ifdef _WIN32
349 char* temp = getenv("TEMP");
350 std::string ssl_command = "--ssl=";
351 ssl_command.append(temp);
352 ssl_command.append("\\");
353 ssl_command.append("ssl");
354 #else
355 std::string ssl_command = "--ssl=/tmp/ssl";
356 #endif
357
358 create_command.push_back(ssl_command);
359 if (is_client_authentication) {
360 create_command.push_back("--require_client_auth");
361 }
362 }
363
364 // Add the name of the cluster to create
365 create_command.push_back(cluster_name);
366
367 // Execute the CCM create command
368 execute_ccm_command(create_command);
369
370 // Generate the cluster update configuration command and execute
371 execute_ccm_command(generate_create_updateconf_command(cassandra_version_));
372 if (is_dse() && dse_version_ >= "6.7.0") {
373 update_cluster_configuration("user_defined_function_fail_micros", "5000000");
374 }
375
376 // Create the cluster populate command and execute
377 std::string cluster_nodes = generate_cluster_nodes(data_center_nodes);
378 std::string cluster_ip_prefix = get_ip_prefix();
379 std::vector<std::string> populate_command;
380 populate_command.push_back("populate");
381 populate_command.push_back("-n");
382 populate_command.push_back(cluster_nodes);
383 populate_command.push_back("-i");
384 populate_command.push_back(cluster_ip_prefix);
385 if (with_vnodes) {
386 populate_command.push_back("--vnodes");
387 }
388 execute_ccm_command(populate_command);
389
390 // Update the cluster configuration (set num_tokens)
391 if (with_vnodes) {
392 // Maximum number of tokens is 1536
393 update_cluster_configuration("num_tokens", "1536");
394 }
395
396 // Set the DSE workload (if applicable)
397 if (is_dse() && !(dse_workload_.size() == 1 && dse_workload_[0] == DSE_WORKLOAD_CASSANDRA)) {
398 set_dse_workloads(dse_workload_);
399 }
400 }
401
402 // Indicate if the cluster was created or switched
403 return !(active_cluster_name.compare(cluster_name) == 0);
404 }
405
is_cluster_down()406 bool CCM::Bridge::is_cluster_down() {
407 // Iterate over each node and ensure a connection cannot be made
408 ClusterStatus status = cluster_status();
409 for (unsigned int i = 1; i <= status.node_count; ++i) {
410 // Ensure the node is down
411 if (!is_node_down(i)) {
412 return false;
413 }
414 }
415
416 // Cluster is down
417 return true;
418 }
419
is_cluster_up()420 bool CCM::Bridge::is_cluster_up() {
421 // Iterate over each node and ensure a connection can be made
422 ClusterStatus status = cluster_status();
423 for (unsigned int i = 1; i <= status.node_count; ++i) {
424 // Ensure the node is ready/up
425 if (!is_node_up(i)) {
426 return false;
427 }
428 }
429
430 // Cluster is ready
431 return true;
432 }
433
hang_up_cluster()434 bool CCM::Bridge::hang_up_cluster() {
435 // Create the cluster stop command and execute
436 std::vector<std::string> stop_command;
437 stop_command.push_back("stop");
438 stop_command.push_back("--hang-up");
439 execute_ccm_command(stop_command);
440
441 // Ensure the cluster is down
442 return is_cluster_down();
443 }
444
kill_cluster()445 bool CCM::Bridge::kill_cluster() { return stop_cluster(true); }
446
remove_cluster()447 void CCM::Bridge::remove_cluster() { remove_cluster(get_active_cluster()); }
448
remove_cluster(const std::string & cluster_name)449 void CCM::Bridge::remove_cluster(const std::string& cluster_name) {
450 // Create the cluster remove command and execute
451 std::vector<std::string> remove_command;
452 remove_command.push_back("remove");
453 remove_command.push_back(cluster_name);
454 execute_ccm_command(remove_command);
455 }
456
remove_all_clusters(bool is_all)457 void CCM::Bridge::remove_all_clusters(bool is_all /*= false*/) {
458 // Iterate through all the available clusters
459 std::vector<std::string> clusters = get_available_clusters();
460 for (std::vector<std::string>::const_iterator iterator = clusters.begin();
461 iterator != clusters.end(); ++iterator) {
462 // Determine if the cluster should be removed
463 bool is_valid_cluster = is_all;
464 if (!is_valid_cluster && (*iterator).compare(0, cluster_prefix_.size(), cluster_prefix_) != 0) {
465 continue;
466 }
467 remove_cluster(*iterator);
468 }
469 }
470
start_cluster(std::vector<std::string> jvm_arguments)471 bool CCM::Bridge::start_cluster(
472 std::vector<std::string> jvm_arguments /*= DEFAULT_JVM_ARGUMENTS*/) {
473 // Create the cluster start command and execute
474 std::vector<std::string> start_command;
475 start_command.push_back("start");
476 start_command.push_back("--wait-other-notice");
477 start_command.push_back("--wait-for-binary-proto");
478 #ifdef _WIN32
479 if (deployment_type_ == DeploymentType::LOCAL) {
480 if (cassandra_version_ >= "2.2.4") {
481 start_command.push_back("--quiet-windows");
482 }
483 }
484 #endif
485 for (std::vector<std::string>::const_iterator iterator = jvm_arguments.begin();
486 iterator != jvm_arguments.end(); ++iterator) {
487 std::string jvm_argument = trim(*iterator);
488 if (!jvm_argument.empty()) {
489 start_command.push_back("--jvm_arg=" + *iterator);
490 }
491 }
492 execute_ccm_command(start_command);
493
494 // Ensure the cluster is up
495 return is_cluster_up();
496 }
497
start_cluster(std::string jvm_argument)498 bool CCM::Bridge::start_cluster(std::string jvm_argument /*= ""*/) {
499 std::vector<std::string> jvm_arguments;
500 if (!jvm_argument.empty()) {
501 jvm_arguments.push_back(jvm_argument);
502 }
503 return start_cluster(jvm_arguments);
504 }
505
stop_cluster(bool is_kill)506 bool CCM::Bridge::stop_cluster(bool is_kill /*= false*/) {
507 // Create the cluster stop command and execute
508 std::vector<std::string> stop_command;
509 stop_command.push_back("stop");
510 if (is_kill) {
511 stop_command.push_back("--not-gently");
512 }
513 execute_ccm_command(stop_command);
514
515 // Ensure the cluster is down
516 return is_cluster_down();
517 }
518
switch_cluster(const std::string & cluster_name)519 bool CCM::Bridge::switch_cluster(const std::string& cluster_name) {
520 // Get the active cluster and the available clusters
521 std::string active_cluster;
522 std::vector<std::string> clusters = get_available_clusters(active_cluster);
523
524 // Determine if the switch should be performed
525 if (active_cluster.compare(trim(cluster_name)) != 0) {
526 // Ensure the cluster is in the list
527 if (std::find(clusters.begin(), clusters.end(), cluster_name) != clusters.end()) {
528 if (!active_cluster.empty()) {
529 kill_cluster();
530 }
531
532 // Create the cluster switch command and clear the data
533 std::vector<std::string> switch_command;
534 switch_command.push_back("switch");
535 switch_command.push_back(cluster_name);
536 execute_ccm_command(switch_command);
537 clear_cluster_data();
538 return true;
539 }
540 } else {
541 // Cluster requested is already active
542 return true;
543 }
544
545 // Unable to switch the cluster
546 return false;
547 }
548
update_cluster_configuration(std::vector<std::string> key_value_pairs,bool is_dse,bool is_yaml)549 void CCM::Bridge::update_cluster_configuration(std::vector<std::string> key_value_pairs,
550 bool is_dse /*= false*/, bool is_yaml /*= false*/) {
551 // Create the update configuration command
552 if (is_yaml) {
553 for (std::vector<std::string>::const_iterator iterator = key_value_pairs.begin();
554 iterator != key_value_pairs.end(); ++iterator) {
555 update_cluster_configuration_yaml(*iterator, is_dse);
556 }
557 } else {
558 key_value_pairs.insert(key_value_pairs.begin(), (is_dse ? "updatedseconf" : "updateconf"));
559 execute_ccm_command(key_value_pairs);
560 }
561 }
562
update_cluster_configuration(const std::string & key,const std::string & value,bool is_dse)563 void CCM::Bridge::update_cluster_configuration(const std::string& key, const std::string& value,
564 bool is_dse /*= false*/) {
565 // Create the configuration to be updated
566 std::stringstream configuration;
567 configuration << key << ":" << value;
568
569 // Create the update configuration command
570 std::vector<std::string> updateconf_command;
571 updateconf_command.push_back(is_dse ? "updatedseconf" : "updateconf");
572 updateconf_command.push_back(configuration.str());
573 execute_ccm_command(updateconf_command);
574 }
575
update_cluster_configuration_yaml(const std::string & yaml,bool is_dse)576 void CCM::Bridge::update_cluster_configuration_yaml(const std::string& yaml,
577 bool is_dse /*= false*/) {
578 // Create the update configuration command for a literal YAML
579 std::vector<std::string> updateconf_command;
580 updateconf_command.push_back(is_dse ? "updatedseconf" : "updateconf");
581 updateconf_command.push_back("-y");
582 updateconf_command.push_back(yaml);
583 execute_ccm_command(updateconf_command);
584 }
585
update_node_configuration(unsigned int node,std::vector<std::string> key_value_pairs)586 void CCM::Bridge::update_node_configuration(unsigned int node,
587 std::vector<std::string> key_value_pairs) {
588 // Create the update configuration command
589 key_value_pairs.insert(key_value_pairs.begin(), generate_node_name(node));
590 key_value_pairs.insert(key_value_pairs.begin(), "updateconf");
591 execute_ccm_command(key_value_pairs);
592 }
593
update_node_configuration(unsigned int node,const std::string & key,const std::string & value)594 void CCM::Bridge::update_node_configuration(unsigned int node, const std::string& key,
595 const std::string& value) {
596 // Create the configuration to be updated
597 std::stringstream configuration;
598 configuration << key << ":" << value;
599
600 // Create the update configuration command
601 std::vector<std::string> updateconf_command;
602 updateconf_command.push_back(generate_node_name(node));
603 updateconf_command.push_back("updateconf");
604 updateconf_command.push_back(configuration.str());
605 execute_ccm_command(updateconf_command);
606 }
607
add_node(const std::string & data_center)608 unsigned int CCM::Bridge::add_node(const std::string& data_center /*= ""*/) {
609 // Generate the arguments for the add node command
610 unsigned int node = get_next_available_node();
611 std::stringstream node_ip_address;
612 node_ip_address << get_ip_prefix() << node;
613 std::stringstream jmx_port;
614 std::stringstream jmx_remote_debug_port;
615 jmx_port << (7000 + (100 * node));
616 jmx_remote_debug_port << (2000 + (100 * node));
617
618 // Create the add node command and execute
619 std::vector<std::string> add_node_command;
620 add_node_command.push_back("add");
621 add_node_command.push_back("-b");
622 add_node_command.push_back("-i");
623 add_node_command.push_back(node_ip_address.str());
624 add_node_command.push_back("-j");
625 add_node_command.push_back(jmx_port.str());
626 add_node_command.push_back("-r");
627 add_node_command.push_back(jmx_remote_debug_port.str());
628 if (!data_center.empty()) {
629 add_node_command.push_back("-d");
630 add_node_command.push_back(data_center);
631 }
632 if (is_dse()) {
633 add_node_command.push_back("--dse");
634 }
635 add_node_command.push_back(generate_node_name(node));
636 execute_ccm_command(add_node_command);
637
638 // Return the node created
639 return node;
640 }
641
bootstrap_node(const std::vector<std::string> & jvm_arguments,const std::string & data_center)642 unsigned int CCM::Bridge::bootstrap_node(const std::vector<std::string>& jvm_arguments,
643 const std::string& data_center /*= ""*/) {
644 unsigned int node = add_node(data_center);
645 start_node(node, jvm_arguments);
646 return node;
647 }
648
bootstrap_node(const std::string & jvm_argument,const std::string & data_center)649 unsigned int CCM::Bridge::bootstrap_node(const std::string& jvm_argument /*= ""*/,
650 const std::string& data_center /*= ""*/) {
651 unsigned int node = add_node(data_center);
652 start_node(node, jvm_argument);
653 return node;
654 }
655
decommission_node(unsigned int node,bool is_force)656 bool CCM::Bridge::decommission_node(unsigned int node, bool is_force /*= false*/) {
657 // Create the node decommission command and execute
658 std::vector<std::string> decommission_node_command;
659 decommission_node_command.push_back(generate_node_name(node));
660 decommission_node_command.push_back("decommission");
661 if (is_force && ((is_cassandra() && cassandra_version_ >= "3.12") || // Cassandra v3.12+
662 (!is_cassandra() && dse_version_ >= "5.1.0"))) { // DataStax Enterprise v5.1.0+
663 decommission_node_command.push_back("--force");
664 }
665 execute_ccm_command(decommission_node_command);
666
667 // Ensure the node has been decommissioned
668 return is_node_decommissioned(node);
669 }
670
disable_node_binary_protocol(unsigned int node)671 void CCM::Bridge::disable_node_binary_protocol(unsigned int node) {
672 // Create the disable node binary protocol command and execute
673 std::vector<std::string> disable_node_binary_protocol_command;
674 disable_node_binary_protocol_command.push_back(generate_node_name(node));
675 disable_node_binary_protocol_command.push_back("nodetool");
676 disable_node_binary_protocol_command.push_back("disablebinary");
677 execute_ccm_command(disable_node_binary_protocol_command);
678 }
679
disable_node_gossip(unsigned int node)680 void CCM::Bridge::disable_node_gossip(unsigned int node) {
681 // Create the disable node gossip command and execute
682 std::vector<std::string> disable_node_gossip_command;
683 disable_node_gossip_command.push_back(generate_node_name(node));
684 disable_node_gossip_command.push_back("nodetool");
685 disable_node_gossip_command.push_back("disablegossip");
686 execute_ccm_command(disable_node_gossip_command);
687 }
688
disable_node_trace(unsigned int node)689 void CCM::Bridge::disable_node_trace(unsigned int node) {
690 // Create the disable node trace command and execute
691 std::vector<std::string> disable_node_trace_command;
692 disable_node_trace_command.push_back(generate_node_name(node));
693 disable_node_trace_command.push_back("nodetool");
694 disable_node_trace_command.push_back("settraceprobability");
695 disable_node_trace_command.push_back("0");
696 execute_ccm_command(disable_node_trace_command);
697 }
698
enable_node_binary_protocol(unsigned int node)699 void CCM::Bridge::enable_node_binary_protocol(unsigned int node) {
700 // Create the enable node binary protocol command and execute
701 std::vector<std::string> enable_node_binary_protocol_command;
702 enable_node_binary_protocol_command.push_back(generate_node_name(node));
703 enable_node_binary_protocol_command.push_back("nodetool");
704 enable_node_binary_protocol_command.push_back("enablebinary");
705 execute_ccm_command(enable_node_binary_protocol_command);
706 }
707
enable_node_gossip(unsigned int node)708 void CCM::Bridge::enable_node_gossip(unsigned int node) {
709 // Create the enable node gossip command and execute
710 std::vector<std::string> disable_node_gossip_command;
711 disable_node_gossip_command.push_back(generate_node_name(node));
712 disable_node_gossip_command.push_back("nodetool");
713 disable_node_gossip_command.push_back("enablegossip");
714 execute_ccm_command(disable_node_gossip_command);
715 }
716
enable_node_trace(unsigned int node)717 void CCM::Bridge::enable_node_trace(unsigned int node) {
718 // Create the enable node trace command and execute
719 std::vector<std::string> enable_node_trace_command;
720 enable_node_trace_command.push_back(generate_node_name(node));
721 enable_node_trace_command.push_back("nodetool");
722 enable_node_trace_command.push_back("settraceprobability");
723 enable_node_trace_command.push_back("1");
724 execute_ccm_command(enable_node_trace_command);
725 }
726
execute_cql_on_node(unsigned int node,const std::string & cql)727 void CCM::Bridge::execute_cql_on_node(unsigned int node, const std::string& cql) {
728 // Update the CQL statement for the command line
729 std::stringstream execute_statement;
730 execute_statement << "\"" << cql << ";\"";
731
732 // Create the CQLSH pass through command and execute
733 std::vector<std::string> cqlsh_node_command;
734 cqlsh_node_command.push_back(generate_node_name(node));
735 cqlsh_node_command.push_back("cqlsh");
736 cqlsh_node_command.push_back("-x");
737 cqlsh_node_command.push_back(execute_statement.str());
738 execute_ccm_command(cqlsh_node_command);
739 }
740
force_decommission_node(unsigned int node)741 bool CCM::Bridge::force_decommission_node(unsigned int node) {
742 return decommission_node(node, true);
743 }
744
hang_up_node(unsigned int node)745 bool CCM::Bridge::hang_up_node(unsigned int node) {
746 // Create the node stop command and execute
747 std::vector<std::string> stop_node_command;
748 stop_node_command.push_back(generate_node_name(node));
749 stop_node_command.push_back("stop");
750 stop_node_command.push_back("--hang-up");
751 execute_ccm_command(stop_node_command);
752
753 // Ensure the node is down
754 return is_node_down(node);
755 }
756
kill_node(unsigned int node)757 bool CCM::Bridge::kill_node(unsigned int node) { return stop_node(node, true); }
758
pause_node(unsigned int node)759 void CCM::Bridge::pause_node(unsigned int node) {
760 // Create the node pause command and execute
761 std::vector<std::string> pause_node_command;
762 pause_node_command.push_back(generate_node_name(node));
763 pause_node_command.push_back("pause");
764 execute_ccm_command(pause_node_command);
765 }
766
resume_node(unsigned int node)767 void CCM::Bridge::resume_node(unsigned int node) {
768 // Create the node resume command and execute
769 std::vector<std::string> resume_node_command;
770 resume_node_command.push_back(generate_node_name(node));
771 resume_node_command.push_back("resume");
772 execute_ccm_command(resume_node_command);
773 }
774
start_node(unsigned int node,const std::vector<std::string> & jvm_arguments)775 bool CCM::Bridge::start_node(
776 unsigned int node, const std::vector<std::string>& jvm_arguments /*= DEFAULT_JVM_ARGUMENTS*/) {
777 // Create the node start command and execute
778 std::vector<std::string> start_node_command;
779 start_node_command.push_back(generate_node_name(node));
780 start_node_command.push_back("start");
781 start_node_command.push_back("--wait-other-notice");
782 start_node_command.push_back("--wait-for-binary-proto");
783 #ifdef _WIN32
784 if (deployment_type_ == DeploymentType::LOCAL) {
785 if (cassandra_version_ >= "2.2.4") {
786 start_node_command.push_back("--quiet-windows");
787 }
788 }
789 #endif
790 for (std::vector<std::string>::const_iterator iterator = jvm_arguments.begin();
791 iterator != jvm_arguments.end(); ++iterator) {
792 std::string jvm_argument = trim(*iterator);
793 if (!jvm_argument.empty()) {
794 start_node_command.push_back("--jvm_arg=" + *iterator);
795 }
796 }
797 execute_ccm_command(start_node_command);
798
799 // Ensure the node is up
800 return is_node_up(node);
801 }
802
start_node(unsigned int node,const std::string & jvm_argument)803 bool CCM::Bridge::start_node(unsigned int node, const std::string& jvm_argument) {
804 // Create the JVM arguments array/vector
805 std::vector<std::string> jvm_arguments;
806 jvm_arguments.push_back(jvm_argument);
807 return start_node(node, jvm_arguments);
808 }
809
stop_node(unsigned int node,bool is_kill)810 bool CCM::Bridge::stop_node(unsigned int node, bool is_kill /*= false*/) {
811 // Create the node stop command and execute
812 std::vector<std::string> stop_node_command;
813 stop_node_command.push_back(generate_node_name(node));
814 stop_node_command.push_back("stop");
815 if (is_kill) {
816 stop_node_command.push_back("--not-gently");
817 }
818 execute_ccm_command(stop_node_command);
819
820 // Ensure the node is down
821 return is_node_down(node);
822 }
823
get_ip_prefix()824 std::string CCM::Bridge::get_ip_prefix() { return host_.substr(0, host_.size() - 1); }
825
get_cassandra_version()826 CassVersion CCM::Bridge::get_cassandra_version() {
827 // Get the version string from CCM
828 std::vector<std::string> active_cluster_version_command;
829 active_cluster_version_command.push_back(generate_node_name(1));
830 active_cluster_version_command.push_back("version");
831 std::string ccm_output = execute_ccm_command(active_cluster_version_command);
832
833 // Ensure the version release information exists and return the version
834 size_t version_index = ccm_output.find("ReleaseVersion:");
835 if (version_index != std::string::npos) {
836 ccm_output.replace(0, version_index + 15, "");
837 return CassVersion(trim(ccm_output));
838 }
839
840 // Unable to determine version information from active cluster
841 throw BridgeException("Unable to determine version information from active Cassandra cluster \"" +
842 get_active_cluster() + "\"");
843 }
844
get_dse_version()845 DseVersion CCM::Bridge::get_dse_version() {
846 // Get the version string from CCM
847 std::vector<std::string> active_cluster_version_command;
848 active_cluster_version_command.push_back(generate_node_name(1));
849 active_cluster_version_command.push_back("dse");
850 active_cluster_version_command.push_back("-v");
851 std::string ccm_output = execute_ccm_command(active_cluster_version_command);
852
853 // Ensure the version release information exists and return the version
854 ccm_output = trim(ccm_output);
855 if (!ccm_output.empty()) {
856 return DseVersion(ccm_output);
857 }
858
859 // Unable to determine version information from active cluster
860 throw BridgeException("Unable to determine version information from active DSE/DDAC cluster \"" +
861 get_active_cluster() + "\"");
862 }
863
set_dse_workload(unsigned int node,DseWorkload workload,bool is_kill)864 bool CCM::Bridge::set_dse_workload(unsigned int node, DseWorkload workload,
865 bool is_kill /*= false */) {
866 std::vector<DseWorkload> workloads;
867 workloads.push_back(workload);
868 return set_dse_workloads(1, workloads, is_kill);
869 }
870
set_dse_workloads(unsigned int node,std::vector<DseWorkload> workloads,bool is_kill)871 bool CCM::Bridge::set_dse_workloads(unsigned int node, std::vector<DseWorkload> workloads,
872 bool is_kill /*= false */) {
873 // Ensure the workloads can be processed
874 if (workloads.empty()) {
875 throw BridgeException("No workloads to assign");
876 }
877
878 // Update the member variable with the workloads and generate workloads
879 dse_workload_.clear();
880 dse_workload_ = workloads;
881 std::string dse_workloads = generate_dse_workloads(workloads);
882
883 // Determine if the node is currently active/up
884 bool was_node_active = false;
885 if (!is_node_down(node)) {
886 LOG("Stopping active node \"" << node << "\" and assigning workload(s) \"" << dse_workloads
887 << "\"");
888 stop_node(node, is_kill);
889 was_node_active = true;
890 }
891
892 // Create the node DSE workload command and execute
893 std::vector<std::string> dse_workload_command;
894 dse_workload_command.push_back(generate_node_name(node));
895 dse_workload_command.push_back("setworkload");
896 dse_workload_command.push_back(dse_workloads);
897 execute_ccm_command(dse_workload_command);
898
899 // Determine if the node should be restarted
900 if (was_node_active) {
901 LOG("Restarting node \"" << node << "\" to applying workload(s) \"" << dse_workloads << "\"");
902 start_node(node);
903 }
904
905 return was_node_active;
906 }
907
set_dse_workload(DseWorkload workload,bool is_kill)908 bool CCM::Bridge::set_dse_workload(DseWorkload workload, bool is_kill /*= false */) {
909 std::vector<DseWorkload> workloads;
910 workloads.push_back(workload);
911 return set_dse_workloads(workloads, is_kill);
912 }
913
set_dse_workloads(std::vector<DseWorkload> workloads,bool is_kill)914 bool CCM::Bridge::set_dse_workloads(std::vector<DseWorkload> workloads, bool is_kill /*= false */) {
915 // Ensure the workloads can be processed
916 if (workloads.empty()) {
917 throw BridgeException("No workloads to assign");
918 }
919
920 // Determine if the cluster is currently active/up
921 bool was_cluster_active = false;
922 std::string cluster = get_active_cluster();
923 if (!is_cluster_down()) {
924 LOG("Stopping active cluster \"" << cluster << "\" and assigning workload(s) \""
925 << generate_dse_workloads(workloads) << "\"");
926 stop_cluster(is_kill);
927 was_cluster_active = true;
928 }
929
930 // Iterate over each node and set the DSE workload
931 ClusterStatus status = cluster_status();
932 for (unsigned int i = 1; i <= status.node_count; ++i) {
933 set_dse_workloads(i, workloads, false);
934 }
935
936 // Determine if the cluster should be restarted
937 if (was_cluster_active) {
938 LOG("Restarting cluster \"" << cluster << "\" and applying workload(s) \""
939 << generate_dse_workloads(workloads) << "\"");
940 start_cluster();
941 }
942
943 return was_cluster_active;
944 }
945
is_node_decommissioned(unsigned int node)946 bool CCM::Bridge::is_node_decommissioned(unsigned int node) {
947 // Iterate over the list of decommissioned nodes
948 std::stringstream node_ip_address;
949 node_ip_address << get_ip_prefix() << node;
950 std::vector<std::string> nodes = cluster_status().nodes_decommissioned;
951 for (std::vector<std::string>::const_iterator iterator = nodes.begin(); iterator < nodes.end();
952 ++iterator) {
953 if (node_ip_address.str().compare(*iterator) == 0) {
954 return true;
955 }
956 }
957
958 // Node has not been decommissioned
959 return false;
960 }
961
is_node_down(unsigned int node,bool is_quick_check)962 bool CCM::Bridge::is_node_down(unsigned int node, bool is_quick_check /*= false*/) {
963 if (is_quick_check) {
964 return !is_node_availabe(node);
965 }
966
967 unsigned int number_of_retries = 0;
968 while (number_of_retries++ < CCM_RETRIES) {
969 if (!is_node_availabe(node)) {
970 return true;
971 } else {
972 std::string cluster = get_active_cluster();
973 LOG("[#" << number_of_retries
974 << "] - Attempting to recheck node down "
975 "status for node \""
976 << node << "\" in cluster \"" << cluster << "\"");
977 msleep(CCM_NAP);
978 }
979 }
980
981 // Connection can still being established on node
982 return false;
983 }
984
is_node_up(unsigned int node,bool is_quick_check)985 bool CCM::Bridge::is_node_up(unsigned int node, bool is_quick_check /*= false*/) {
986 if (is_quick_check) {
987 return is_node_availabe(node);
988 }
989
990 unsigned int number_of_retries = 0;
991 while (number_of_retries++ < CCM_RETRIES) {
992 if (is_node_availabe(node)) {
993 return true;
994 } else {
995 std::string cluster = get_active_cluster();
996 LOG("[#" << number_of_retries
997 << "] - Attempting to recheck node up "
998 "status for node \""
999 << node << "\" in cluster \"" << cluster << "\"");
1000 msleep(CCM_NAP);
1001 }
1002 }
1003
1004 // Connection cannot be established on node
1005 return false;
1006 }
1007
1008 #ifdef CASS_USE_LIBSSH2
initialize_socket(const std::string & host,short port)1009 void CCM::Bridge::initialize_socket(const std::string& host, short port) {
1010 // Initialize the socket
1011 socket_ = new Socket();
1012
1013 // Establish socket connection
1014 socket_->establish_connection(host, port);
1015 }
1016
synchronize_socket()1017 void CCM::Bridge::synchronize_socket() {
1018 // Determine current read/write direction of the session
1019 bool is_read = false;
1020 bool is_write = false;
1021 int read_write_direction = libssh2_session_block_directions(session_);
1022 if (read_write_direction & LIBSSH2_SESSION_BLOCK_INBOUND) {
1023 is_read = true;
1024 }
1025 if (read_write_direction & LIBSSH2_SESSION_BLOCK_OUTBOUND) {
1026 is_write = true;
1027 }
1028
1029 // Synchronize the socket
1030 socket_->synchronize(is_read, is_write);
1031 }
1032
initialize_libssh2()1033 void CCM::Bridge::initialize_libssh2() {
1034 // Initialize libssh2
1035 int rc = libssh2_init(LIBSSH2_INIT_ALL);
1036 if (rc) {
1037 finalize_libssh2();
1038 std::stringstream message;
1039 message << "[libssh2] Failed initialization with error code \"" << rc << "\"";
1040 throw BridgeException(message.str());
1041 }
1042
1043 // Initialize and create the libssh2 session
1044 session_ = libssh2_session_init();
1045 if (!session_) {
1046 finalize_libssh2();
1047 throw BridgeException("[libssh2] Failed session failed");
1048 }
1049
1050 // Disable blocking on the session
1051 libssh2_session_set_blocking(session_, FALSE);
1052
1053 // Perform the session handshake; trade banners, exchange keys, setup cyrpto
1054 while ((rc = libssh2_session_handshake(session_, socket_->get_handle())) ==
1055 LIBSSH2_ERROR_EAGAIN) {
1056 ; // no-op
1057 }
1058 if (rc) {
1059 // Determine error that occurred
1060 std::stringstream message;
1061 message << "[libssh2] Failed session handshake with error ";
1062 switch (rc) {
1063 case LIBSSH2_ERROR_SOCKET_NONE:
1064 message << "\"the socket is invalid\"";
1065 break;
1066 case LIBSSH2_ERROR_BANNER_SEND:
1067 message << "\"unable to send banner to remote host\"";
1068 break;
1069 case LIBSSH2_ERROR_KEX_FAILURE:
1070 message << "\"encryption key exchange with the remote host failed\"";
1071 break;
1072 case LIBSSH2_ERROR_SOCKET_SEND:
1073 message << "\"unable to send data on socket\"";
1074 break;
1075 case LIBSSH2_ERROR_SOCKET_DISCONNECT:
1076 message << "\"the socket was disconnected\"";
1077 break;
1078 case LIBSSH2_ERROR_PROTO:
1079 message << "\"an invalid SSH protocol response was received on the socket\"";
1080 break;
1081 default:
1082 message << " code \"" << rc << "\"";
1083 break;
1084 }
1085 finalize_libssh2();
1086 throw BridgeException(message.str());
1087 }
1088 }
1089
establish_libssh2_connection(AuthenticationType authentication_type,const std::string & username,const std::string & password,const std::string & public_key,const std::string & private_key)1090 void CCM::Bridge::establish_libssh2_connection(AuthenticationType authentication_type,
1091 const std::string& username,
1092 const std::string& password,
1093 const std::string& public_key,
1094 const std::string& private_key) {
1095 int rc = 0;
1096
1097 // Determine authentication mechanism
1098 if (authentication_type == AuthenticationType::USERNAME_PASSWORD) {
1099 // Perform username and password authentication
1100 while ((rc = libssh2_userauth_password(session_, username.c_str(), password.c_str())) ==
1101 LIBSSH2_ERROR_EAGAIN) {
1102 ; // no-op
1103 }
1104 } else {
1105 while ((rc = libssh2_userauth_publickey_fromfile(session_, username.c_str(), public_key.c_str(),
1106 private_key.c_str(), "")) ==
1107 LIBSSH2_ERROR_EAGAIN) {
1108 ; // no-op
1109 }
1110 }
1111
1112 if (rc) {
1113 // Determine error that occurred
1114 std::stringstream message;
1115 message << "[libssh2] Failed username/password authentication with error ";
1116 switch (rc) {
1117 case LIBSSH2_ERROR_ALLOC:
1118 message << "\"an internal memory allocation call failed\"";
1119 break;
1120 case LIBSSH2_ERROR_SOCKET_SEND:
1121 message << "\"unable to send data on socket\"";
1122 break;
1123 case LIBSSH2_ERROR_SOCKET_TIMEOUT:
1124 message << "\"timed out waiting for response\"";
1125 break;
1126 case LIBSSH2_ERROR_PASSWORD_EXPIRED:
1127 message << "\"password has expired\"";
1128 break;
1129 case LIBSSH2_ERROR_PUBLICKEY_UNVERIFIED:
1130 message << "\"the username/public key combination was invalid\"";
1131 break;
1132 case LIBSSH2_ERROR_AUTHENTICATION_FAILED:
1133 // Ensure error message is displayed for the authentication type
1134 if (authentication_type == AuthenticationType::USERNAME_PASSWORD) {
1135 message << "\"invalid username/password\"";
1136 } else {
1137 message << "\"authentication using the supplied public key was not accepted\"";
1138 }
1139 break;
1140 default:
1141 message << "code \"" << rc << "\"";
1142 break;
1143 }
1144 finalize_libssh2();
1145 throw BridgeException(message.str());
1146 }
1147 }
1148
open_libssh2_terminal()1149 void CCM::Bridge::open_libssh2_terminal() {
1150 // Open a channel; request a shell
1151 while (session_ != NULL && (channel_ = libssh2_channel_open_session(session_)) == NULL &&
1152 libssh2_session_last_error(session_, NULL, NULL, FALSE) == LIBSSH2_ERROR_EAGAIN) {
1153 synchronize_socket();
1154 }
1155 if (!channel_) {
1156 // Determine error that occurred
1157 int rc = libssh2_session_last_error(session_, NULL, NULL, FALSE);
1158 std::string message("[libssh2] Failed opening session channel with error \"");
1159 switch (rc) {
1160 case LIBSSH2_ERROR_ALLOC:
1161 message.append("an internal memory allocation call failed");
1162 break;
1163 case LIBSSH2_ERROR_SOCKET_SEND:
1164 message.append("unable to send data on socket");
1165 break;
1166 case LIBSSH2_ERROR_CHANNEL_FAILURE:
1167 message.append("unable to open channel");
1168 }
1169 message.append("\"");
1170 finalize_libssh2();
1171 throw BridgeException(message);
1172 }
1173 }
1174
close_libssh2_terminal()1175 void CCM::Bridge::close_libssh2_terminal() {
1176 if (channel_) {
1177 // Close the libssh2 channel/terminal
1178 int rc = 0;
1179 while ((rc = libssh2_channel_close(channel_)) == LIBSSH2_ERROR_EAGAIN) {
1180 synchronize_socket();
1181 }
1182 if (rc == 0) {
1183 char* exit_signal = NULL;
1184 libssh2_channel_get_exit_status(channel_);
1185 libssh2_channel_get_exit_signal(channel_, &exit_signal, NULL, NULL, NULL, NULL, NULL);
1186 if (exit_signal) {
1187 LOG_ERROR("[libssh2] Failed to close channel with exit signal \"" << exit_signal << "\"");
1188 }
1189 }
1190 if (rc) {
1191 LOG_ERROR("[libssh2] Failed to close channel with error code \"" << rc << "\"");
1192 }
1193
1194 // Free the channel/terminal resources
1195 while ((rc = libssh2_channel_free(channel_)) == LIBSSH2_ERROR_EAGAIN) {
1196 ; // no-op
1197 }
1198 if (rc) {
1199 LOG_ERROR("[libssh2] Failed to free channel resources with error code \"" << rc << "\"");
1200 }
1201 channel_ = NULL;
1202 }
1203 }
1204
finalize_libssh2()1205 void CCM::Bridge::finalize_libssh2() {
1206 // Free the libssh2 session
1207 if (session_) {
1208 // Perform session disconnection
1209 int rc = 0;
1210 while ((rc = libssh2_session_disconnect(
1211 session_, "Shutting down libssh2 CCM bridge session")) == LIBSSH2_ERROR_EAGAIN) {
1212 ; // no-op
1213 }
1214 if (rc) {
1215 LOG_ERROR("[libssh2] Failed to disconnect session with error code \"" << rc << "\"");
1216 }
1217 while ((rc = libssh2_session_free(session_)) == LIBSSH2_ERROR_EAGAIN) {
1218 ; // no-op
1219 }
1220 if (rc) {
1221 LOG_ERROR("[libssh2] Failed to free session resources with error code \"" << rc << "\"");
1222 }
1223 session_ = NULL;
1224 }
1225
1226 // Free the socket (closes connection)
1227 delete socket_;
1228 socket_ = NULL;
1229
1230 // Free up remaining libssh2 memory
1231 libssh2_exit();
1232
1233 #ifndef LIBSSH2_NO_OPENSSL
1234 #ifdef OPENSSL_CLEANUP
1235 // Free OpenSSL resources
1236 RAND_cleanup();
1237 ENGINE_cleanup();
1238 CONF_modules_unload(TRUE);
1239 CONF_modules_free();
1240 EVP_cleanup();
1241 ERR_free_strings();
1242 ERR_remove_state(PID_UNKNOWN);
1243 CRYPTO_cleanup_all_ex_data();
1244 #endif
1245 #endif
1246 }
1247
execute_libssh2_command(const std::vector<std::string> & command)1248 std::string CCM::Bridge::execute_libssh2_command(const std::vector<std::string>& command) {
1249 // Make sure the libssh2 session wasn't terminated
1250 if (!session_) {
1251 throw BridgeException("[libssh2] Session is invalid/terminated");
1252 }
1253
1254 // Create/Open libssh2 terminal
1255 open_libssh2_terminal();
1256
1257 // Execute the command
1258 int rc = 0;
1259 std::string full_command = implode(command);
1260 while ((rc = libssh2_channel_exec(channel_, full_command.c_str())) == LIBSSH2_ERROR_EAGAIN) {
1261 synchronize_socket();
1262 }
1263 if (rc) {
1264 // Determine error that occurred
1265 std::stringstream message;
1266 message << "[libssh2] Failed to execute command with error ";
1267 switch (rc) {
1268 case LIBSSH2_ERROR_ALLOC:
1269 message << "\"An internal memory allocation call failed\"";
1270 break;
1271 case LIBSSH2_ERROR_SOCKET_SEND:
1272 message << "\"Unable to send data on socket\"";
1273 break;
1274 case LIBSSH2_ERROR_CHANNEL_REQUEST_DENIED:
1275 message << "\"Request denied\"";
1276 break;
1277 default:
1278 message << "code \"" << rc << "\"";
1279 break;
1280 }
1281 finalize_libssh2();
1282 throw BridgeException(message.str());
1283 }
1284
1285 // Get the terminal output, close the terminal and return the output
1286 std::string output = read_libssh2_terminal();
1287 close_libssh2_terminal();
1288 return output;
1289 }
1290 #endif
1291
1292 #ifdef CASS_USE_LIBSSH2
read_libssh2_terminal()1293 std::string CCM::Bridge::read_libssh2_terminal() {
1294 ssize_t nread = 0;
1295 char buffer[512];
1296 memset(buffer, '\0', sizeof(char) * 512);
1297 std::string output;
1298
1299 // Read stdout
1300 while (true) {
1301 while ((nread = libssh2_channel_read(channel_, buffer, sizeof(buffer))) > 0) {
1302 if (nread > 0) {
1303 output.append(buffer, nread);
1304 }
1305 }
1306 if (nread == LIBSSH2_ERROR_EAGAIN) {
1307 synchronize_socket();
1308 msleep(CCM_NAP);
1309 } else {
1310 break;
1311 }
1312 }
1313
1314 // Read stderr
1315 while (true) {
1316 while ((nread = libssh2_channel_read_stderr(channel_, buffer, sizeof(buffer))) > 0) {
1317 if (nread > 0) {
1318 output.append(buffer, nread);
1319 }
1320 }
1321 if (nread == LIBSSH2_ERROR_EAGAIN) {
1322 synchronize_socket();
1323 msleep(CCM_NAP);
1324 } else {
1325 break;
1326 }
1327 }
1328
1329 return output;
1330 }
1331 #endif
1332
execute_ccm_command(const std::vector<std::string> & command)1333 std::string CCM::Bridge::execute_ccm_command(const std::vector<std::string>& command) {
1334 // Create the CCM command
1335 std::vector<std::string> ccm_command;
1336 ccm_command.push_back("ccm");
1337 ccm_command.insert(ccm_command.end(), command.begin(), command.end());
1338 LOG(implode(ccm_command));
1339
1340 // Determine how to execute the command
1341 std::string output;
1342 if (deployment_type_ == DeploymentType::LOCAL) {
1343 #ifdef _WIN32
1344 if (!is_cassandra()) {
1345 std::stringstream message;
1346 message << server_type_.to_string() << " v" << dse_version_.to_string()
1347 << " cannot be launched on Windows platform";
1348 throw BridgeException(message.str());
1349 }
1350 #endif
1351 utils::Process::Result result = utils::Process::execute(ccm_command);
1352 if (result.exit_status != 0) {
1353 throw BridgeException(result.standard_error);
1354 }
1355 output = result.standard_output;
1356 #ifdef CASS_USE_LIBSSH2
1357 } else if (deployment_type_ == DeploymentType::REMOTE) {
1358 output = execute_libssh2_command(ccm_command);
1359 if (!output.empty()) LOG(trim(output));
1360 #endif
1361 }
1362
1363 return output;
1364 }
1365
get_active_cluster()1366 std::string CCM::Bridge::get_active_cluster() {
1367 std::string active_cluster;
1368 std::vector<std::string> clusters = get_available_clusters(active_cluster);
1369 return active_cluster;
1370 }
1371
get_available_clusters()1372 std::vector<std::string> CCM::Bridge::get_available_clusters() {
1373 std::string active_cluster;
1374 return get_available_clusters(active_cluster);
1375 }
1376
get_available_clusters(std::string & active_cluster)1377 std::vector<std::string> CCM::Bridge::get_available_clusters(std::string& active_cluster) {
1378 // Create the cluster list command and get the list of clusters
1379 std::vector<std::string> list_command;
1380 list_command.push_back("list");
1381 std::vector<std::string> clusters = explode(execute_ccm_command(list_command));
1382
1383 // Determine the active cluster and correct the cluster array
1384 int index = 0;
1385 for (std::vector<std::string>::const_iterator iterator = clusters.begin();
1386 iterator < clusters.end(); ++iterator) {
1387 std::string cluster = *iterator;
1388 if (cluster.compare(0, 1, "*") == 0) {
1389 cluster.erase(std::remove(cluster.begin(), cluster.end(), '*'), cluster.end());
1390 active_cluster = cluster;
1391 clusters[index] = cluster;
1392 }
1393 ++index;
1394 }
1395 return clusters;
1396 }
1397
generate_cluster_name(CassVersion cassandra_version,std::vector<unsigned short> data_center_nodes,bool with_vnodes,bool is_password_authenticator,bool is_ssl,bool is_client_authentication)1398 std::string CCM::Bridge::generate_cluster_name(CassVersion cassandra_version,
1399 std::vector<unsigned short> data_center_nodes,
1400 bool with_vnodes, bool is_password_authenticator,
1401 bool is_ssl, bool is_client_authentication) {
1402 std::stringstream cluster_name;
1403 std::string server_version =
1404 !is_cassandra() ? dse_version_.to_string(false) : cassandra_version.to_string(false);
1405 std::replace(server_version.begin(), server_version.end(), '.', '-');
1406 cluster_name << cluster_prefix_ << "_" << server_version << "_"
1407 << generate_cluster_nodes(data_center_nodes, '-');
1408 if (with_vnodes) {
1409 cluster_name << "-vnodes";
1410 }
1411 if (is_password_authenticator) {
1412 cluster_name << "-password_authenticator";
1413 }
1414 if (is_ssl) {
1415 cluster_name << "-ssl";
1416 if (is_client_authentication) {
1417 cluster_name << "-client_authentication";
1418 }
1419 }
1420 return cluster_name.str();
1421 }
1422
generate_cluster_nodes(std::vector<unsigned short> data_center_nodes,char separator)1423 std::string CCM::Bridge::generate_cluster_nodes(std::vector<unsigned short> data_center_nodes,
1424 char separator /* = ':'*/) {
1425 std::stringstream cluster_nodes;
1426 for (std::vector<unsigned short>::iterator iterator = data_center_nodes.begin();
1427 iterator != data_center_nodes.end(); ++iterator) {
1428 cluster_nodes << *iterator;
1429 if ((iterator + 1) != data_center_nodes.end()) {
1430 cluster_nodes << separator;
1431 }
1432 }
1433 return cluster_nodes.str();
1434 }
1435
1436 std::vector<std::string>
generate_create_updateconf_command(CassVersion cassandra_version)1437 CCM::Bridge::generate_create_updateconf_command(CassVersion cassandra_version) {
1438 // TODO: Add SSL setup and client authentication
1439 // Create the update configuration command (common updates)
1440 std::vector<std::string> updateconf_command;
1441 updateconf_command.push_back("updateconf");
1442 // Disable optimizations (limits) when using DSE/DDAC
1443 if (is_cassandra()) {
1444 updateconf_command.push_back("--rt=10000");
1445 updateconf_command.push_back("read_request_timeout_in_ms:10000");
1446 updateconf_command.push_back("write_request_timeout_in_ms:10000");
1447 updateconf_command.push_back("request_timeout_in_ms:10000");
1448 updateconf_command.push_back("phi_convict_threshold:16");
1449 updateconf_command.push_back("hinted_handoff_enabled:false");
1450 updateconf_command.push_back("dynamic_snitch_update_interval_in_ms:1000");
1451 updateconf_command.push_back("native_transport_max_threads:1");
1452 updateconf_command.push_back("concurrent_reads:2");
1453 updateconf_command.push_back("concurrent_writes:2");
1454 updateconf_command.push_back("concurrent_compactors:1");
1455 updateconf_command.push_back("compaction_throughput_mb_per_sec:0");
1456 updateconf_command.push_back("key_cache_size_in_mb:0");
1457 updateconf_command.push_back("key_cache_save_period:0");
1458 updateconf_command.push_back("memtable_flush_writers:1");
1459 updateconf_command.push_back("max_hints_delivery_threads:1");
1460
1461 // Create Cassandra version specific updates (C* v1.2.x)
1462 if (cassandra_version < "2.0.0") {
1463 updateconf_command.push_back("reduce_cache_sizes_at:0");
1464 updateconf_command.push_back("reduce_cache_capacity_to:0");
1465 updateconf_command.push_back("flush_largest_memtables_at:0");
1466 updateconf_command.push_back("index_interval:512");
1467 } else {
1468 updateconf_command.push_back("cas_contention_timeout_in_ms:10000");
1469 updateconf_command.push_back("file_cache_size_in_mb:0");
1470 }
1471
1472 // Create Cassandra version specific updates (C* < v2.1)
1473 if (cassandra_version < "2.1.0") {
1474 updateconf_command.push_back("in_memory_compaction_limit_in_mb:1");
1475 }
1476
1477 // Create Cassandra version specific updates (C* < v4.0)
1478 if (cassandra_version < "4.0.0") {
1479 updateconf_command.push_back("rpc_min_threads:1");
1480 updateconf_command.push_back("rpc_max_threads:1");
1481 }
1482 }
1483
1484 // Create Cassandra version specific updated (C* 2.2+)
1485 if (cassandra_version >= "2.2.0") {
1486 updateconf_command.push_back("enable_user_defined_functions:true");
1487 }
1488
1489 // Create Cassandra version specific updated (C* 3.0+)
1490 if (cassandra_version >= "3.0.0") {
1491 updateconf_command.push_back("enable_scripted_user_defined_functions:true");
1492 }
1493
1494 return updateconf_command;
1495 }
1496
generate_dse_workloads(std::vector<DseWorkload> workloads)1497 std::string CCM::Bridge::generate_dse_workloads(std::vector<DseWorkload> workloads) {
1498 std::string dse_workloads;
1499 for (std::vector<DseWorkload>::iterator iterator = workloads.begin(); iterator != workloads.end();
1500 ++iterator) {
1501 dse_workloads += dse_workloads_[*iterator];
1502 if ((iterator + 1) != workloads.end()) {
1503 dse_workloads += ",";
1504 }
1505 }
1506 return dse_workloads;
1507 }
1508
generate_node_name(unsigned int node)1509 std::string CCM::Bridge::generate_node_name(unsigned int node) {
1510 std::stringstream node_name;
1511 node_name << "node" << node;
1512 return node_name.str();
1513 }
1514
get_next_available_node()1515 unsigned int CCM::Bridge::get_next_available_node() {
1516 ClusterStatus status = cluster_status();
1517 unsigned int next_available_node = status.node_count + 1;
1518 if (next_available_node > CLUSTER_NODE_LIMIT) {
1519 std::stringstream message;
1520 message << "Failed to get next available node; cluster limit of \"" << CLUSTER_NODE_LIMIT
1521 << "\" nodes reached";
1522 throw BridgeException(message.str());
1523 }
1524
1525 return next_available_node;
1526 }
1527
is_node_availabe(unsigned int node)1528 bool CCM::Bridge::is_node_availabe(unsigned int node) {
1529 std::stringstream ip_address;
1530 ip_address << get_ip_prefix() << node;
1531 return is_node_availabe(ip_address.str());
1532 }
1533
is_node_availabe(const std::string & ip_address)1534 bool CCM::Bridge::is_node_availabe(const std::string& ip_address) {
1535 Socket socket;
1536 try {
1537 socket.establish_connection(ip_address, CASSANDRA_BINARY_PORT);
1538 return true;
1539 } catch (...) {
1540 ; // No-op
1541 }
1542
1543 return false;
1544 }
1545
to_lower(const std::string & input)1546 std::string CCM::Bridge::to_lower(const std::string& input) {
1547 std::string lowercase = input;
1548 std::transform(lowercase.begin(), lowercase.end(), lowercase.begin(), ::tolower);
1549 return lowercase;
1550 }
1551
trim(const std::string & input)1552 std::string CCM::Bridge::trim(const std::string& input) {
1553 std::string result;
1554 if (!input.empty()) {
1555 // Trim right
1556 result = input.substr(0, input.find_last_not_of(TRIM_DELIMETERS) + 1);
1557 if (!result.empty()) {
1558 // Trim left
1559 result = result.substr(result.find_first_not_of(TRIM_DELIMETERS));
1560 }
1561 }
1562 return result;
1563 }
1564
implode(const std::vector<std::string> & elements,const char delimiter)1565 std::string CCM::Bridge::implode(const std::vector<std::string>& elements,
1566 const char delimiter /*= ' '*/) {
1567 // Iterate through each element in the vector and concatenate the string
1568 std::string result;
1569 for (std::vector<std::string>::const_iterator iterator = elements.begin();
1570 iterator < elements.end(); ++iterator) {
1571 result += *iterator;
1572 if ((iterator + 1) != elements.end()) {
1573 result += delimiter;
1574 }
1575 }
1576 return result;
1577 }
1578
explode(const std::string & input,const char delimiter)1579 std::vector<std::string> CCM::Bridge::explode(const std::string& input,
1580 const char delimiter /*= ' '*/) {
1581 // Iterate over the input line and parse the tokens
1582 std::vector<std::string> result;
1583 std::istringstream parser(input);
1584 for (std::string token; std::getline(parser, token, delimiter);) {
1585 if (!token.empty()) {
1586 result.push_back(trim(token));
1587 }
1588 }
1589 return result;
1590 }
1591
msleep(unsigned int milliseconds)1592 void CCM::Bridge::msleep(unsigned int milliseconds) {
1593 #ifdef _WIN32
1594 Sleep(milliseconds);
1595 #else
1596 // Convert the milliseconds into a proper timespec structure
1597 time_t seconds = static_cast<int>(milliseconds / 1000);
1598 long int nanoseconds = static_cast<long int>((milliseconds - (seconds * 1000)) * 1000000);
1599
1600 // Assign the requested time and perform sleep
1601 struct timespec requested;
1602 requested.tv_sec = seconds;
1603 requested.tv_nsec = nanoseconds;
1604 while (nanosleep(&requested, &requested) == -1) {
1605 continue;
1606 }
1607 #endif
1608 }
1609