1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "gcs_xcom_utils.h"
24 #include "gcs_group_identifier.h"
25 #include "gcs_logging.h"
26
27 #include "xcom_cfg.h"
28 #include "task_net.h"
29 #include "gcs_message_stage_lz4.h"
30 #include "task_os.h"
31 #include "gcs_xcom_networking.h"
32 #include "xcom_ssl_transport.h"
33
34 #include <sstream>
35 #include <iostream>
36 #include <algorithm>
37 #include <climits>
38 #include <set>
39 #include <limits>
40 #include <assert.h>
41
42 /**
43 6 is the recommended value. Too large numbers
44 here hinder testing and do not bring too much
45 performance improvement as confirmed by our
46 benchmarks.
47 */
48 static const int XCOM_MAX_HANDLERS= 6;
49
50 /*
51 Time is defined in seconds.
52 */
53 static const uint64_t WAITING_TIME= 30;
54
55 /*
56 Number of attempts to join a group.
57 */
58 static const unsigned int JOIN_ATTEMPTS= 0;
59
60 /*
61 Sleep time between attempts defined in seconds.
62 */
63 static const uint64_t JOIN_SLEEP_TIME= 5;
64
65
~Gcs_xcom_utils()66 Gcs_xcom_utils::~Gcs_xcom_utils() {}
67
build_xcom_group_id(Gcs_group_identifier & group_id)68 u_long Gcs_xcom_utils::build_xcom_group_id(Gcs_group_identifier &group_id)
69 {
70 std::string group_id_str= group_id.get_group_id();
71 return mhash((unsigned char *)group_id_str.c_str(), group_id_str.size());
72 }
73
74
75 void
76 Gcs_xcom_utils::
process_peer_nodes(const std::string * peer_nodes,std::vector<std::string> & processed_peers)77 process_peer_nodes(const std::string *peer_nodes,
78 std::vector<std::string> &processed_peers)
79 {
80 std::string peer_init(peer_nodes->c_str());
81 std::string delimiter= ",";
82
83 //Clear all whitespace in the string
84 peer_init.erase(std::remove(peer_init.begin(), peer_init.end(), ' '),
85 peer_init.end());
86
87 // Skip delimiter at beginning.
88 std::string::size_type lastPos= peer_init.find_first_not_of(delimiter, 0);
89
90 // Find first "non-delimiter".
91 std::string::size_type pos= peer_init.find_first_of(delimiter, lastPos);
92
93 while (std::string::npos != pos || std::string::npos != lastPos)
94 {
95 std::string peer(peer_init.substr(lastPos, pos - lastPos));
96 processed_peers.push_back(peer);
97
98 // Skip delimiter
99 lastPos= peer_init.find_first_not_of(delimiter, pos);
100
101 // Find next "non-delimiter"
102 pos= peer_init.find_first_of(delimiter, lastPos);
103 }
104 }
105
106 void
107 Gcs_xcom_utils::
validate_peer_nodes(std::vector<std::string> & peers,std::vector<std::string> & invalid_peers)108 validate_peer_nodes(std::vector<std::string> &peers,
109 std::vector<std::string> &invalid_peers)
110 {
111 std::vector<std::string>::iterator it;
112 for(it= peers.begin(); it != peers.end();)
113 {
114 std::string server_and_port= *it;
115 if (!is_valid_hostname(server_and_port))
116 {
117 invalid_peers.push_back(server_and_port);
118 it= peers.erase(it);
119 }
120 else
121 {
122 ++it;
123 }
124 }
125 }
126
127 uint32_t
mhash(unsigned char * buf,size_t length)128 Gcs_xcom_utils::mhash(unsigned char *buf, size_t length)
129 {
130 size_t i= 0;
131 uint32_t sum= 0;
132 for (i= 0; i < length; i++)
133 {
134 sum+= 0x811c9dc5 * (uint32_t)buf[i];
135 }
136
137 return sum;
138 }
139
init_net()140 int Gcs_xcom_utils::init_net()
141 {
142 return ::init_net();
143 }
144
145
deinit_net()146 int Gcs_xcom_utils::deinit_net()
147 {
148 return ::deinit_net();
149 }
150
151
152 void
delete_node_address(unsigned int n,node_address * na)153 Gcs_xcom_proxy_impl::delete_node_address(unsigned int n, node_address *na)
154 {
155 ::delete_node_address(n, na);
156 }
157
158
xcom_client_close_connection(connection_descriptor * fd)159 int Gcs_xcom_proxy_impl::xcom_client_close_connection(connection_descriptor* fd)
160 {
161 return ::xcom_close_client_connection(fd);
162 }
163
164
165 connection_descriptor*
xcom_client_open_connection(std::string saddr,xcom_port port)166 Gcs_xcom_proxy_impl::xcom_client_open_connection(std::string saddr, xcom_port port)
167 {
168 char *addr= (char *) saddr.c_str();
169 return ::xcom_open_client_connection(addr, port);
170 }
171
172
173 int Gcs_xcom_proxy_impl::
xcom_client_add_node(connection_descriptor * fd,node_list * nl,uint32_t gid)174 xcom_client_add_node(connection_descriptor* fd, node_list *nl, uint32_t gid)
175 {
176 return ::xcom_client_add_node(fd, nl, gid);
177 }
178
179 int
xcom_client_remove_node(connection_descriptor * fd,node_list * nl,uint32_t gid)180 Gcs_xcom_proxy_impl::xcom_client_remove_node(connection_descriptor* fd, node_list* nl, uint32_t gid)
181 {
182 return ::xcom_client_remove_node(fd, nl, gid);
183 }
184
185
186 int
xcom_client_remove_node(node_list * nl,uint32_t gid)187 Gcs_xcom_proxy_impl::xcom_client_remove_node(node_list *nl, uint32_t gid)
188 {
189 int index= xcom_acquire_handler();
190 int res= true;
191
192 if (index != -1)
193 {
194 connection_descriptor* fd= m_xcom_handlers[index]->get_fd();
195
196 /*
197 XCOM will return 1 if the request is successfully processed or
198 0 otherwise.
199 */
200 if (fd != NULL)
201 res= ::xcom_client_remove_node(fd, nl, gid) ? false : true;
202 }
203 xcom_release_handler(index);
204 return res;
205 }
206
207
xcom_client_boot(node_list * nl,uint32_t gid)208 int Gcs_xcom_proxy_impl::xcom_client_boot(node_list *nl, uint32_t gid)
209 {
210 int index= xcom_acquire_handler();
211 int res= true;
212
213 if (index != -1)
214 {
215 connection_descriptor* fd= m_xcom_handlers[index]->get_fd();
216
217 if (fd != NULL)
218 res= ::xcom_client_boot(fd, nl, gid);
219 }
220 xcom_release_handler(index);
221 return res;
222 }
223
224
xcom_client_send_data(unsigned long long len,char * data)225 int Gcs_xcom_proxy_impl::xcom_client_send_data(unsigned long long len,
226 char *data)
227 {
228 int res= true;
229
230 if (len <= std::numeric_limits<unsigned int>::max())
231 {
232 int index= xcom_acquire_handler();
233 if (index != -1)
234 {
235 connection_descriptor* fd= m_xcom_handlers[index]->get_fd();
236 /*
237 XCOM will write all requested bytes or return -1 if there is
238 an error. However, the wrapper will return 1 if connections
239 to XCOM are not configured.
240
241 Having said that, it should be enough to check whether data
242 size was written and report false if so and true otherwise.
243 */
244 if (fd != NULL)
245 {
246 assert(len > 0);
247 int64_t written=
248 ::xcom_client_send_data(static_cast<uint32_t>(len), data, fd);
249 if (static_cast<unsigned int>(written) >= len)
250 res= false;
251 }
252 }
253 xcom_release_handler(index);
254 }
255 else
256 {
257 /*
258 GCS's message length is defined as unsigned long long type, but
259 XCOM can only accept packets length of which are in unsigned int range.
260 So it throws an error when gcs message is too big.
261 */
262
263 MYSQL_GCS_LOG_ERROR("The data is too big. Data length should not"
264 << " exceed "
265 << std::numeric_limits<unsigned int>::max()
266 << " bytes.");
267 }
268 return res;
269 }
270
271
xcom_init(xcom_port xcom_listen_port)272 int Gcs_xcom_proxy_impl::xcom_init(xcom_port xcom_listen_port)
273 {
274 /* Init XCom */
275 ::xcom_fsm(xa_init, int_arg(0)); /* Basic xcom init */
276
277 ::xcom_taskmain2(xcom_listen_port);
278
279 return 0;
280 }
281
xcom_exit(bool xcom_handlers_open)282 int Gcs_xcom_proxy_impl::xcom_exit(bool xcom_handlers_open)
283 {
284 int index= xcom_acquire_handler();
285 int res= true;
286
287
288 if (index != -1)
289 {
290 connection_descriptor* fd= m_xcom_handlers[index]->get_fd();
291
292 /* Stop XCom */
293 if (fd != NULL)
294 {
295 res= ::xcom_client_terminate_and_exit(fd);
296 }
297
298 xcom_release_handler(index);
299 }
300 else if (!xcom_handlers_open)
301 {
302 /* The handlers were not yet open, so use basic xcom stop */
303 ::xcom_fsm(xa_exit, int_arg(0));
304 res= false;
305 }
306
307 return res;
308 }
309
xcom_set_cleanup()310 void Gcs_xcom_proxy_impl::xcom_set_cleanup()
311 {
312 xcom_set_ready(false);
313 xcom_set_exit(false);
314 xcom_set_comms_status(XCOM_COMM_STATUS_UNDEFINED);
315 }
316
xcom_get_ssl_mode(const char * mode)317 int Gcs_xcom_proxy_impl::xcom_get_ssl_mode(const char* mode)
318 {
319 return ::xcom_get_ssl_mode(mode);
320 }
321
322
xcom_set_ssl_mode(int mode)323 int Gcs_xcom_proxy_impl::xcom_set_ssl_mode(int mode)
324 {
325 return ::xcom_set_ssl_mode(mode);
326 }
327
328
xcom_init_ssl()329 int Gcs_xcom_proxy_impl::xcom_init_ssl()
330 {
331 return ::xcom_init_ssl(m_server_key_file, m_server_cert_file,
332 m_client_key_file, m_client_cert_file, m_ca_file,
333 m_ca_path, m_crl_file, m_crl_path, m_cipher,
334 m_tls_version);
335 }
336
337
xcom_destroy_ssl()338 void Gcs_xcom_proxy_impl::xcom_destroy_ssl()
339 {
340 ::xcom_destroy_ssl();
341 }
342
343
xcom_use_ssl()344 int Gcs_xcom_proxy_impl::xcom_use_ssl()
345 {
346 return ::xcom_use_ssl();
347 }
348
349
xcom_set_ssl_parameters(const char * server_key_file,const char * server_cert_file,const char * client_key_file,const char * client_cert_file,const char * ca_file,const char * ca_path,const char * crl_file,const char * crl_path,const char * cipher,const char * tls_version)350 void Gcs_xcom_proxy_impl::xcom_set_ssl_parameters(
351 const char *server_key_file, const char *server_cert_file,
352 const char *client_key_file, const char *client_cert_file,
353 const char *ca_file, const char *ca_path,
354 const char *crl_file, const char *crl_path,
355 const char *cipher, const char *tls_version)
356 {
357 m_server_key_file= server_key_file;
358 m_server_cert_file= server_cert_file;
359 m_client_key_file= client_key_file;
360 m_client_cert_file= client_cert_file;
361 m_ca_file= ca_file;
362 m_ca_path= ca_path;
363 m_crl_file= crl_file;
364 m_crl_path= crl_path;
365 m_cipher= cipher;
366 m_tls_version= tls_version;
367 }
368
369
xcom_open_handlers(std::string saddr,xcom_port port)370 bool Gcs_xcom_proxy_impl::xcom_open_handlers(std::string saddr, xcom_port port)
371 {
372 bool success= true;
373 char *addr= (char *)saddr.c_str();
374 int n= 0;
375
376 m_lock_xcom_cursor.lock();
377 if (m_xcom_handlers_cursor == -1 && addr != NULL)
378 {
379 for (int i= 0; i < m_xcom_handlers_size && success; i++)
380 {
381 connection_descriptor* con= NULL;
382
383 while ((con= xcom_client_open_connection(addr, port)) == NULL &&
384 n < Gcs_xcom_proxy::connection_attempts)
385 {
386 My_xp_util::sleep_seconds(1);
387 n++;
388 }
389
390 n= 0;
391
392 if (con == NULL)
393 {
394 success= false;
395 break;
396 }
397 else
398 {
399 if(m_socket_util->disable_nagle_in_socket(con->fd) < 0)
400 {
401 success= false;
402 }
403
404 // This is a hack. It forces a protocol negotiation in
405 // the current connection with the local xcom, so that
406 // it does not happen later on.
407 if ((xcom_client_enable_arbitrator(con) <= 0) ||
408 (xcom_client_disable_arbitrator(con) <= 0))
409 success= false;
410 }
411
412 m_xcom_handlers[i]->set_fd(con);
413 }
414
415 if (!success)
416 {
417 for (int i= 0; i < m_xcom_handlers_size; i++)
418 {
419 if (m_xcom_handlers[i]->get_fd() != NULL)
420 {
421 xcom_close_client_connection(m_xcom_handlers[i]->get_fd());
422 m_xcom_handlers[i]->set_fd(NULL);
423 }
424 }
425 m_xcom_handlers_cursor= -1;
426 }
427 else
428 m_xcom_handlers_cursor= 0;
429 }
430 else
431 {
432 success= false;
433 }
434 m_lock_xcom_cursor.unlock();
435
436 return success ? false : true;
437 }
438
439
xcom_close_handlers()440 bool Gcs_xcom_proxy_impl::xcom_close_handlers()
441 {
442 m_lock_xcom_cursor.lock();
443 // Prevent that any other thread gets a new handler.
444 m_xcom_handlers_cursor= -1;
445 m_lock_xcom_cursor.unlock();
446
447 /* Close the file descriptors */
448 for (int i= 0; i < m_xcom_handlers_size; i++)
449 {
450 Xcom_handler *handler= m_xcom_handlers[i];
451 if (handler && handler->get_fd() != NULL)
452 {
453 handler->lock();
454 xcom_close_client_connection(handler->get_fd());
455 handler->unlock();
456 }
457 }
458
459 #ifdef XCOM_HAVE_OPENSSL
460 ::xcom_cleanup_ssl();
461 #endif
462
463 return false;
464 }
465
466
xcom_release_handler(int index)467 void Gcs_xcom_proxy_impl::xcom_release_handler(int index)
468 {
469 if (index < m_xcom_handlers_size && index >= 0)
470 m_xcom_handlers[index]->unlock();
471 }
472
473
xcom_acquire_handler()474 int Gcs_xcom_proxy_impl::xcom_acquire_handler()
475 {
476 int res= -1;
477 m_lock_xcom_cursor.lock();
478
479 if (m_xcom_handlers_cursor != -1)
480 {
481 res= m_xcom_handlers_cursor;
482 m_xcom_handlers[res]->lock();
483 m_xcom_handlers_cursor= (m_xcom_handlers_cursor+1) % m_xcom_handlers_size;
484 }
485 m_lock_xcom_cursor.unlock();
486
487 return res;
488 }
489
490 /* purecov: begin deadcode */
Gcs_xcom_proxy_impl()491 Gcs_xcom_proxy_impl::Gcs_xcom_proxy_impl()
492 :m_xcom_handlers_cursor(-1), m_lock_xcom_cursor(),
493 m_xcom_handlers_size(XCOM_MAX_HANDLERS),
494 m_wait_time(WAITING_TIME),
495 m_xcom_handlers(NULL), m_lock_xcom_ready(),
496 m_cond_xcom_ready(), m_is_xcom_ready(false),
497 m_lock_xcom_comms_status(), m_cond_xcom_comms_status(),
498 m_xcom_comms_status(XCOM_COMM_STATUS_UNDEFINED),
499 m_lock_xcom_exit(), m_cond_xcom_exit(),
500 m_is_xcom_exit(false),
501 m_socket_util(NULL),
502 m_server_key_file(),
503 m_server_cert_file(),
504 m_client_key_file(),
505 m_client_cert_file(),
506 m_ca_file(),
507 m_ca_path(),
508 m_crl_file(),
509 m_crl_path(),
510 m_cipher(),
511 m_tls_version()
512 {
513 m_xcom_handlers= new Xcom_handler *[m_xcom_handlers_size];
514
515 for (int i= 0; i < m_xcom_handlers_size; i++)
516 m_xcom_handlers[i]= new Xcom_handler();
517
518 m_lock_xcom_cursor.init(NULL);
519 m_lock_xcom_ready.init(NULL);
520 m_cond_xcom_ready.init();
521 m_lock_xcom_comms_status.init(NULL);
522 m_cond_xcom_comms_status.init();
523 m_lock_xcom_exit.init(NULL);
524 m_cond_xcom_exit.init();
525
526 m_socket_util= new My_xp_socket_util_impl();
527 }
528 /* purecov: begin end */
529
Gcs_xcom_proxy_impl(int wt)530 Gcs_xcom_proxy_impl::Gcs_xcom_proxy_impl(int wt)
531 :m_xcom_handlers_cursor(-1), m_lock_xcom_cursor(),
532 m_xcom_handlers_size(XCOM_MAX_HANDLERS),
533 m_wait_time(wt),
534 m_xcom_handlers(NULL), m_lock_xcom_ready(),
535 m_cond_xcom_ready(), m_is_xcom_ready(false),
536 m_lock_xcom_comms_status(), m_cond_xcom_comms_status(),
537 m_xcom_comms_status(XCOM_COMM_STATUS_UNDEFINED),
538 m_lock_xcom_exit(), m_cond_xcom_exit(),
539 m_is_xcom_exit(false),
540 m_socket_util(NULL),
541 m_server_key_file(),
542 m_server_cert_file(),
543 m_client_key_file(),
544 m_client_cert_file(),
545 m_ca_file(),
546 m_ca_path(),
547 m_crl_file(),
548 m_crl_path(),
549 m_cipher(),
550 m_tls_version()
551 {
552 m_xcom_handlers= new Xcom_handler *[m_xcom_handlers_size];
553
554 for (int i= 0; i < m_xcom_handlers_size; i++)
555 m_xcom_handlers[i]= new Xcom_handler();
556
557 m_lock_xcom_cursor.init(NULL);
558 m_lock_xcom_ready.init(NULL);
559 m_cond_xcom_ready.init();
560 m_lock_xcom_comms_status.init(NULL);
561 m_cond_xcom_comms_status.init();
562 m_lock_xcom_exit.init(NULL);
563 m_cond_xcom_exit.init();
564
565 m_socket_util= new My_xp_socket_util_impl();
566 }
567
568
~Gcs_xcom_proxy_impl()569 Gcs_xcom_proxy_impl::~Gcs_xcom_proxy_impl()
570 {
571 for (int i= 0; i < m_xcom_handlers_size; i++)
572 delete m_xcom_handlers[i];
573
574 delete [] m_xcom_handlers;
575 m_lock_xcom_cursor.destroy();
576 m_lock_xcom_ready.destroy();
577 m_cond_xcom_ready.destroy();
578 m_lock_xcom_comms_status.destroy();
579 m_cond_xcom_comms_status.destroy();
580 m_lock_xcom_exit.destroy();
581 m_cond_xcom_exit.destroy();
582
583 delete m_socket_util;
584 }
585
586
find_site_def(synode_no synode)587 site_def const *Gcs_xcom_proxy_impl::find_site_def(synode_no synode)
588 {
589 return ::find_site_def(synode);
590 }
591
592
Xcom_handler()593 Gcs_xcom_proxy_impl::Xcom_handler::Xcom_handler()
594 :m_lock(), m_fd(NULL)
595 {
596 m_lock.init(NULL);
597 }
598
599
~Xcom_handler()600 Gcs_xcom_proxy_impl::Xcom_handler::~Xcom_handler()
601 {
602 m_lock.destroy();
603 }
604
605
new_node_address_uuid(unsigned int n,char * names[],blob uuids[])606 node_address *Gcs_xcom_proxy_impl::new_node_address_uuid(
607 unsigned int n, char *names[], blob uuids[])
608 {
609 return ::new_node_address_uuid(static_cast<int>(n), names, uuids);
610 }
611
612
xcom_wait_ready()613 enum_gcs_error Gcs_xcom_proxy_impl::xcom_wait_ready()
614 {
615 enum_gcs_error ret= GCS_OK;
616 struct timespec ts;
617 int res= 0;
618
619 m_lock_xcom_ready.lock();
620
621 if (!m_is_xcom_ready)
622 {
623 My_xp_util::set_timespec(&ts, m_wait_time);
624 res= m_cond_xcom_ready.timed_wait(
625 m_lock_xcom_ready.get_native_mutex(), &ts);
626 }
627
628 if (res != 0)
629 {
630 ret= GCS_NOK;
631 // There was an error
632 if(res == ETIMEDOUT)
633 {
634 // timeout
635 MYSQL_GCS_LOG_ERROR("Timeout while waiting for the group" <<
636 " communication engine to be ready!");
637 }
638 else if(res == EINVAL)
639 {
640 // invalid abstime or cond or mutex
641 MYSQL_GCS_LOG_ERROR("Invalid parameter received by the timed wait for" <<
642 " the group communication engine to be ready.");
643 }
644 else if(res == EPERM)
645 {
646 // mutex isn't owned by the current thread at the time of the call
647 MYSQL_GCS_LOG_ERROR("Thread waiting for the group communication" <<
648 " engine to be ready does not own the mutex at the" <<
649 " time of the call!");
650 }
651 else
652 MYSQL_GCS_LOG_ERROR("Error while waiting for the group" <<
653 "communication engine to be ready!");
654 }
655
656 m_lock_xcom_ready.unlock();
657
658 return ret;
659 }
660
661
662 bool
xcom_is_ready()663 Gcs_xcom_proxy_impl::xcom_is_ready()
664 {
665 bool retval;
666
667 m_lock_xcom_ready.lock();
668 retval= m_is_xcom_ready;
669 m_lock_xcom_ready.unlock();
670
671 return retval;
672 }
673
674
675 void
xcom_set_ready(bool value)676 Gcs_xcom_proxy_impl::xcom_set_ready(bool value)
677 {
678 m_lock_xcom_ready.lock();
679 m_is_xcom_ready= value;
680 m_lock_xcom_ready.unlock();
681 }
682
683
684 void
xcom_signal_ready()685 Gcs_xcom_proxy_impl::xcom_signal_ready()
686 {
687 m_lock_xcom_ready.lock();
688 m_is_xcom_ready= true;
689 m_cond_xcom_ready.broadcast();
690 m_lock_xcom_ready.unlock();
691 }
692
693
xcom_wait_exit()694 enum_gcs_error Gcs_xcom_proxy_impl::xcom_wait_exit()
695 {
696 enum_gcs_error ret= GCS_OK;
697 struct timespec ts;
698 int res= 0;
699
700 m_lock_xcom_exit.lock();
701
702 if (!m_is_xcom_exit)
703 {
704 My_xp_util::set_timespec(&ts, m_wait_time);
705 res= m_cond_xcom_exit.timed_wait(
706 m_lock_xcom_exit.get_native_mutex(), &ts);
707 }
708
709 if (res != 0)
710 {
711 ret= GCS_NOK;
712 // There was an error
713 if(res == ETIMEDOUT)
714 {
715 // timeout
716 MYSQL_GCS_LOG_ERROR(
717 "Timeout while waiting for the group communication engine to exit!"
718 )
719 }
720 else if(res == EINVAL)
721 {
722 // invalid abstime or cond or mutex
723 MYSQL_GCS_LOG_ERROR(
724 "Timed wait for group communication engine to exit received an "
725 "invalid parameter!"
726 )
727 }
728 else if(res == EPERM)
729 {
730 // mutex isn't owned by the current thread at the time of the call
731 MYSQL_GCS_LOG_ERROR(
732 "Timed wait for group communication engine to exit using mutex that "
733 "isn't owned by the current thread at the time of the call!"
734 )
735 }
736 else
737 MYSQL_GCS_LOG_ERROR(
738 "Error while waiting for group communication to exit!"
739 )
740 }
741
742 m_lock_xcom_exit.unlock();
743
744 return ret;
745 }
746
747
748 bool
xcom_is_exit()749 Gcs_xcom_proxy_impl::xcom_is_exit()
750 {
751 bool retval;
752
753 m_lock_xcom_exit.lock();
754 retval= m_is_xcom_exit;
755 m_lock_xcom_exit.unlock();
756
757 return retval;
758 }
759
760
761 void
xcom_set_exit(bool value)762 Gcs_xcom_proxy_impl::xcom_set_exit(bool value)
763 {
764 m_lock_xcom_exit.lock();
765 m_is_xcom_exit= value;
766 m_lock_xcom_exit.unlock();
767 }
768
769
770 void
xcom_signal_exit()771 Gcs_xcom_proxy_impl::xcom_signal_exit()
772 {
773 m_lock_xcom_exit.lock();
774 m_is_xcom_exit= true;
775 m_cond_xcom_exit.broadcast();
776 m_lock_xcom_exit.unlock();
777 }
778
779
780 void
xcom_wait_for_xcom_comms_status_change(int & status)781 Gcs_xcom_proxy_impl::xcom_wait_for_xcom_comms_status_change(int& status)
782 {
783 struct timespec ts;
784 int res= 0;
785
786 m_lock_xcom_comms_status.lock();
787
788 if (m_xcom_comms_status == XCOM_COMM_STATUS_UNDEFINED)
789 {
790 My_xp_util::set_timespec(&ts, m_wait_time);
791 res= m_cond_xcom_comms_status.timed_wait(
792 m_lock_xcom_comms_status.get_native_mutex(), &ts);
793 }
794
795 if (res != 0)
796 {
797 // There was an error
798 status= XCOM_COMMS_OTHER;
799
800 if(res == ETIMEDOUT)
801 {
802 // timeout
803 MYSQL_GCS_LOG_ERROR("Timeout while waiting for the group communication" <<
804 " engine's communications status to change!");
805 }
806 else if(res == EINVAL)
807 {
808 // invalid abstime or cond or mutex
809 MYSQL_GCS_LOG_ERROR("Invalid parameter received by the timed wait for" <<
810 " the group communication engine's communications" <<
811 " status to change.");
812 }
813 else if(res == EPERM)
814 {
815 // mutex isn't owned by the current thread at the time of the call
816 MYSQL_GCS_LOG_ERROR("Thread waiting for the group communication" <<
817 " engine's communications status to change does" <<
818 " not own the mutex at the time of the call!");
819 }
820 else
821 MYSQL_GCS_LOG_ERROR("Error while waiting for the group communication" <<
822 " engine's communications status to change!");
823 }
824 else
825 status= m_xcom_comms_status;
826
827 m_lock_xcom_comms_status.unlock();
828 }
829
830 bool
xcom_has_comms_status_changed()831 Gcs_xcom_proxy_impl::xcom_has_comms_status_changed()
832 {
833 bool retval;
834
835 m_lock_xcom_comms_status.lock();
836 retval= (m_xcom_comms_status != XCOM_COMM_STATUS_UNDEFINED);
837 m_lock_xcom_comms_status.unlock();
838
839 return retval;
840 }
841
842 void
xcom_set_comms_status(int value)843 Gcs_xcom_proxy_impl::xcom_set_comms_status(int value)
844 {
845 m_lock_xcom_comms_status.lock();
846 m_xcom_comms_status= value;
847 m_lock_xcom_comms_status.unlock();
848 }
849
850 void
xcom_signal_comms_status_changed(int status)851 Gcs_xcom_proxy_impl::xcom_signal_comms_status_changed(int status)
852 {
853 m_lock_xcom_comms_status.lock();
854 m_xcom_comms_status= status;
855 m_cond_xcom_comms_status.broadcast();
856 m_lock_xcom_comms_status.unlock();
857 }
858
init()859 void Gcs_xcom_app_cfg::init()
860 {
861 ::init_cfg_app_xcom();
862 }
863
deinit()864 void Gcs_xcom_app_cfg::deinit()
865 {
866 ::deinit_cfg_app_xcom();
867 }
868
set_poll_spin_loops(unsigned int loops)869 void Gcs_xcom_app_cfg::set_poll_spin_loops(unsigned int loops)
870 {
871 if (the_app_xcom_cfg)
872 the_app_xcom_cfg->m_poll_spin_loops= loops;
873 }
874
875 int
xcom_client_force_config(connection_descriptor * fd,node_list * nl,uint32_t group_id)876 Gcs_xcom_proxy_impl::xcom_client_force_config(connection_descriptor *fd,
877 node_list *nl,
878 uint32_t group_id)
879 {
880 return ::xcom_client_force_config(fd, nl, group_id);
881 }
882
883 int
xcom_client_force_config(node_list * nl,uint32_t group_id)884 Gcs_xcom_proxy_impl::xcom_client_force_config(node_list *nl,
885 uint32_t group_id)
886 {
887 int index= xcom_acquire_handler();
888 int res= true;
889
890 if (index != -1)
891 {
892 connection_descriptor* fd= m_xcom_handlers[index]->get_fd();
893
894 if (fd != NULL)
895 res= this->xcom_client_force_config(fd, nl, group_id);
896 }
897 xcom_release_handler(index);
898 return res;
899 }
900
Gcs_xcom_nodes(const site_def * site,node_set & nodes)901 Gcs_xcom_nodes::Gcs_xcom_nodes(const site_def *site, node_set &nodes)
902 : m_node_no(site->nodeno), m_addresses(), m_uuids(), m_statuses(),
903 m_size(nodes.node_set_len)
904 {
905 Gcs_uuid uuid;
906 for (unsigned int i= 0; i < nodes.node_set_len; ++i)
907 {
908 /* Get member address and save it. */
909 std::string address(site->nodes.node_list_val[i].address);
910 m_addresses.push_back(address);
911
912 /* Get member uuid and save it. */
913 uuid.decode(
914 reinterpret_cast<uchar *>(site->nodes.node_list_val[i].uuid.data.data_val),
915 site->nodes.node_list_val[i].uuid.data.data_len
916 );
917 m_uuids.push_back(uuid);
918
919 /* Get member status and save it */
920 m_statuses.push_back(nodes.node_set_val[i] ? true: false);
921 }
922 assert(m_size == m_addresses.size());
923 assert(m_size == m_statuses.size());
924 }
925
926
Gcs_xcom_nodes()927 Gcs_xcom_nodes::Gcs_xcom_nodes()
928 : m_node_no(0), m_addresses(), m_uuids(), m_statuses(),
929 m_size(0)
930 {
931 }
932
933
get_node_no() const934 unsigned int Gcs_xcom_nodes::get_node_no() const
935 {
936 return m_node_no;
937 }
938
get_addresses() const939 const std::vector<std::string> &Gcs_xcom_nodes::get_addresses() const
940 {
941 return m_addresses;
942 }
943
get_uuids() const944 const std::vector<Gcs_uuid> &Gcs_xcom_nodes::get_uuids() const
945 {
946 return m_uuids;
947 }
948
949
get_uuid(const std::string & address) const950 const Gcs_uuid *Gcs_xcom_nodes::get_uuid(const std::string &address) const
951 {
952 for (size_t index= 0; index < m_size; index++)
953 {
954 if (!m_addresses[index].compare(address))
955 {
956 return &m_uuids[index];
957 }
958 }
959 return NULL;
960 }
961
962
get_statuses() const963 const std::vector<bool> &Gcs_xcom_nodes::get_statuses() const
964 {
965 return m_statuses;
966 }
967
get_size() const968 unsigned int Gcs_xcom_nodes::get_size() const
969 {
970 return m_size;
971 }
972
973 bool
is_valid_hostname(const std::string & server_and_port)974 is_valid_hostname(const std::string &server_and_port)
975 {
976 std::string::size_type delim_pos= server_and_port.find_last_of(":");
977 std::string s_port= server_and_port.substr(delim_pos+1, server_and_port.length());
978 std::string hostname= server_and_port.substr(0, delim_pos);
979 int port;
980 bool error= false;
981 struct addrinfo *addr= NULL;
982
983 if ((error= (delim_pos == std::string::npos)))
984 goto end;
985
986 /* handle hostname*/
987 error= (checked_getaddrinfo(hostname.c_str(), 0, NULL, &addr) != 0);
988 if (error)
989 goto end;
990
991 /* handle port */
992 if ((error= !is_number(s_port)))
993 goto end;
994
995 port= atoi(s_port.c_str());
996 if ((error= port > USHRT_MAX))
997 goto end;
998
999 end:
1000 if (addr)
1001 freeaddrinfo(addr);
1002 return error == false;
1003 }
1004
1005 void
fix_parameters_syntax(Gcs_interface_parameters & interface_params)1006 fix_parameters_syntax(Gcs_interface_parameters &interface_params)
1007 {
1008 std::string *compression_str= const_cast<std::string *>(
1009 interface_params.get_parameter("compression"));
1010 std::string *compression_threshold_str= const_cast<std::string *>(
1011 interface_params.get_parameter("compression_threshold"));
1012 std::string *wait_time_str= const_cast<std::string *>(
1013 interface_params.get_parameter("wait_time"));
1014 std::string *ip_whitelist_str= const_cast<std::string *>(
1015 interface_params.get_parameter("ip_whitelist"));
1016 std::string *join_attempts_str= const_cast<std::string *>(
1017 interface_params.get_parameter("join_attempts"));
1018 std::string *join_sleep_time_str= const_cast<std::string *>(
1019 interface_params.get_parameter("join_sleep_time"));
1020
1021 // sets the default value for compression (ON by default)
1022 if (!compression_str)
1023 {
1024 interface_params.add_parameter("compression", "on");
1025 }
1026
1027 // sets the default threshold if no threshold has been set
1028 if (!compression_threshold_str)
1029 {
1030 std::stringstream ss;
1031 ss << Gcs_message_stage_lz4::DEFAULT_THRESHOLD;
1032 interface_params.add_parameter("compression_threshold", ss.str());
1033 }
1034
1035 // sets the default waiting time for timed_waits
1036 if (!wait_time_str)
1037 {
1038 std::stringstream ss;
1039 ss << WAITING_TIME;
1040 interface_params.add_parameter("wait_time", ss.str());
1041 }
1042
1043 // sets the default ip whitelist
1044 if (!ip_whitelist_str)
1045 {
1046 std::stringstream ss;
1047 std::string iplist;
1048 std::map<std::string, int> out;
1049
1050 // add local private networks that one has an IP on by default
1051 get_ipv4_local_private_addresses(out);
1052
1053 if (out.empty())
1054 ss << "127.0.0.1/32,::1/128,";
1055 else
1056 {
1057 std::map<std::string, int>::iterator it;
1058 for (it= out.begin(); it != out.end(); it++)
1059 {
1060 ss << (*it).first << "/" << (*it).second << ",";
1061 }
1062 }
1063
1064 iplist= ss.str();
1065 iplist.erase(iplist.end() - 1); // remove trailing comma
1066
1067 MYSQL_GCS_LOG_INFO("Added automatically IP ranges " << iplist <<
1068 " to the whitelist");
1069
1070 interface_params.add_parameter("ip_whitelist", iplist);
1071 }
1072
1073 // sets the default join attempts
1074 if (!join_attempts_str)
1075 {
1076 std::stringstream ss;
1077 ss << JOIN_ATTEMPTS;
1078 interface_params.add_parameter("join_attempts", ss.str());
1079 }
1080
1081 // sets the default sleep time between join attempts
1082 if (!join_sleep_time_str)
1083 {
1084 std::stringstream ss;
1085 ss << JOIN_SLEEP_TIME;
1086 interface_params.add_parameter("join_sleep_time", ss.str());
1087 }
1088 }
1089
1090 static enum_gcs_error
is_valid_flag(const std::string param,std::string & flag)1091 is_valid_flag(const std::string param, std::string &flag)
1092 {
1093 enum_gcs_error error= GCS_OK;
1094
1095 // transform to lower case
1096 std::transform (flag.begin(), flag.end(), flag.begin(), ::tolower);
1097
1098 if (flag.compare("on") && flag.compare("off") &&
1099 flag.compare("true") && flag.compare("false"))
1100 {
1101 std::stringstream ss;
1102 ss << "Invalid parameter set to " << param << ". ";
1103 ss << "Valid values are either \"on\" or \"off\".";
1104 MYSQL_GCS_LOG_ERROR(ss.str());
1105 error= GCS_NOK;
1106 }
1107 return error;
1108 }
1109
1110 bool
is_parameters_syntax_correct(const Gcs_interface_parameters & interface_params)1111 is_parameters_syntax_correct(const Gcs_interface_parameters &interface_params)
1112 {
1113 enum_gcs_error error= GCS_OK;
1114
1115 // get the parameters
1116 const std::string *group_name_str=
1117 interface_params.get_parameter("group_name");
1118 const std::string *local_node_str=
1119 interface_params.get_parameter("local_node");
1120 const std::string *peer_nodes_str=
1121 interface_params.get_parameter("peer_nodes");
1122 const std::string *bootstrap_group_str=
1123 interface_params.get_parameter("bootstrap_group");
1124 const std::string *poll_spin_loops_str=
1125 interface_params.get_parameter("poll_spin_loops");
1126 const std::string *compression_threshold_str=
1127 interface_params.get_parameter("compression_threshold");
1128 const std::string *compression_str=
1129 interface_params.get_parameter("compression");
1130 const std::string *wait_time_str=
1131 interface_params.get_parameter("wait_time");
1132 const std::string *join_attempts_str=
1133 interface_params.get_parameter("join_attempts");
1134 const std::string *join_sleep_time_str=
1135 interface_params.get_parameter("join_sleep_time");
1136
1137 /*
1138 -----------------------------------------------------
1139 Checks
1140 -----------------------------------------------------
1141 */
1142 // validate group name
1143 if (group_name_str != NULL &&
1144 group_name_str->size() == 0)
1145 {
1146 MYSQL_GCS_LOG_ERROR("The group_name parameter (" << group_name_str << ")" <<
1147 " is not valid.")
1148 error= GCS_NOK;
1149 goto end;
1150 }
1151
1152 // validate bootstrap string
1153 // accepted values: true, false, on, off
1154 if (bootstrap_group_str != NULL)
1155 {
1156 std::string &flag= const_cast<std::string &>(*bootstrap_group_str);
1157 error= is_valid_flag("bootstrap_group", flag);
1158 if (error == GCS_NOK)
1159 goto end;
1160 }
1161
1162 // validate peer addresses addresses
1163 if (peer_nodes_str != NULL)
1164 {
1165 /*
1166 Parse and validate hostname and ports.
1167 */
1168 std::vector<std::string> hostnames_and_ports;
1169 std::vector<std::string> invalid_hostnames_and_ports;
1170 Gcs_xcom_utils::process_peer_nodes(peer_nodes_str, hostnames_and_ports);
1171 Gcs_xcom_utils::validate_peer_nodes(hostnames_and_ports,
1172 invalid_hostnames_and_ports);
1173
1174 if(!invalid_hostnames_and_ports.empty())
1175 {
1176 std::vector<std::string>::iterator invalid_hostnames_and_ports_it;
1177 for(invalid_hostnames_and_ports_it= invalid_hostnames_and_ports.begin();
1178 invalid_hostnames_and_ports_it != invalid_hostnames_and_ports.end();
1179 ++invalid_hostnames_and_ports_it)
1180 {
1181 MYSQL_GCS_LOG_WARN("Peer address \"" <<
1182 (*invalid_hostnames_and_ports_it).c_str()
1183 << "\" is not valid.");
1184 }
1185 }
1186
1187 /*
1188 This means that none of the provided hosts is valid and that
1189 hostnames_and_ports had some sort of value
1190 */
1191 if(!invalid_hostnames_and_ports.empty() && hostnames_and_ports.empty())
1192 {
1193 MYSQL_GCS_LOG_ERROR("None of the provided peer address is valid.");
1194 error= GCS_NOK;
1195 goto end;
1196 }
1197 }
1198
1199 // local peer address
1200 if (local_node_str != NULL)
1201 {
1202 bool matches_local_ip= false;
1203 std::map<std::string, int> ips;
1204 std::map<std::string, int>::iterator it;
1205
1206 std::string::size_type delim_pos= (*local_node_str).find_last_of(":");
1207 std::string host= (*local_node_str).substr(0, delim_pos);
1208 std::string ip;
1209
1210 // first validate hostname
1211 if (!is_valid_hostname(*local_node_str))
1212 {
1213 MYSQL_GCS_LOG_ERROR("Invalid hostname or IP address (" <<
1214 *local_node_str << ") assigned to the parameter " <<
1215 "local_node!");
1216
1217 error= GCS_NOK;
1218 goto end;
1219 }
1220
1221 // hostname was validated already, lets find the IP
1222 if (resolve_ip_addr_from_hostname(host, ip))
1223 {
1224 MYSQL_GCS_LOG_ERROR("Unable to translate hostname " << host <<
1225 " to IP address!");
1226 error= GCS_NOK;
1227 goto end;
1228 }
1229
1230 if (ip.compare(host) != 0)
1231 MYSQL_GCS_LOG_INFO("Translated '" << host << "' to " << ip);
1232
1233 // second check that this host has that IP assigned
1234 if (get_ipv4_local_addresses(ips, true))
1235 {
1236 MYSQL_GCS_LOG_ERROR("Unable to get the list of local IP addresses for "
1237 "the server!");
1238 error= GCS_NOK;
1239 goto end;
1240 }
1241
1242 // see if any IP matches
1243 for (it= ips.begin(); it != ips.end() && !matches_local_ip; it++)
1244 matches_local_ip= (*it).first.compare(ip) == 0;
1245 if(!matches_local_ip)
1246 {
1247 MYSQL_GCS_LOG_ERROR("There is no local IP address matching the one "
1248 "configured for the local node (" <<
1249 *local_node_str << ").");
1250 error= GCS_NOK;
1251 goto end;
1252 }
1253 }
1254
1255 // poll spin loops
1256 if(poll_spin_loops_str &&
1257 (poll_spin_loops_str->size() == 0 ||
1258 !is_number(*poll_spin_loops_str)))
1259 {
1260 MYSQL_GCS_LOG_ERROR("The poll_spin_loops parameter ("
1261 << poll_spin_loops_str << ") is not valid.")
1262 error= GCS_NOK;
1263 goto end;
1264 }
1265
1266 // validate compression
1267 if (compression_str != NULL)
1268 {
1269 std::string &flag= const_cast<std::string &>(*compression_str);
1270 error= is_valid_flag("compression", flag);
1271 if (error == GCS_NOK)
1272 goto end;
1273 }
1274
1275 if (compression_threshold_str &&
1276 (compression_threshold_str->size() == 0 ||
1277 !is_number(*compression_threshold_str)))
1278 {
1279 MYSQL_GCS_LOG_ERROR("The compression_threshold parameter (" <<
1280 compression_threshold_str << ") is not valid.")
1281 error= GCS_NOK;
1282 goto end;
1283 }
1284
1285 if (wait_time_str &&
1286 (wait_time_str->size() == 0 ||
1287 !is_number(*wait_time_str)))
1288 {
1289 MYSQL_GCS_LOG_ERROR("The wait_time parameter (" << wait_time_str <<
1290 ") is not valid.")
1291 error= GCS_NOK;
1292 goto end;
1293 }
1294
1295 if(join_attempts_str &&
1296 (join_attempts_str->size() == 0 ||
1297 !is_number(*join_attempts_str)))
1298 {
1299 MYSQL_GCS_LOG_ERROR("The join_attempts parameter ("
1300 << join_attempts_str << ") is not valid.")
1301 error= GCS_NOK;
1302 goto end;
1303 }
1304
1305 if(join_sleep_time_str &&
1306 (join_sleep_time_str->size() == 0 ||
1307 !is_number(*join_sleep_time_str)))
1308 {
1309 MYSQL_GCS_LOG_ERROR("The join_sleep_time parameter ("
1310 << join_sleep_time_str << ") is not valid.")
1311 error= GCS_NOK;
1312 goto end;
1313 }
1314
1315 end:
1316 return error == GCS_NOK ? false : true;
1317 }
1318