1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "gcs_xcom_utils.h"
24 #include "gcs_group_identifier.h"
25 #include "gcs_logging.h"
26 
27 #include "xcom_cfg.h"
28 #include "task_net.h"
29 #include "gcs_message_stage_lz4.h"
30 #include "task_os.h"
31 #include "gcs_xcom_networking.h"
32 #include "xcom_ssl_transport.h"
33 
34 #include <sstream>
35 #include <iostream>
36 #include <algorithm>
37 #include <climits>
38 #include <set>
39 #include <limits>
40 #include <assert.h>
41 
42 /**
43   6 is the recommended value. Too large numbers
44   here hinder testing and do not bring too much
45   performance improvement as confirmed by our
46   benchmarks.
47  */
48 static const int XCOM_MAX_HANDLERS= 6;
49 
50 /*
51   Time is defined in seconds.
52 */
53 static const uint64_t WAITING_TIME= 30;
54 
55 /*
56   Number of attempts to join a group.
57 */
58 static const unsigned int JOIN_ATTEMPTS= 0;
59 
60 /*
61   Sleep time between attempts defined in seconds.
62 */
63 static const uint64_t JOIN_SLEEP_TIME= 5;
64 
65 
~Gcs_xcom_utils()66 Gcs_xcom_utils::~Gcs_xcom_utils() {}
67 
build_xcom_group_id(Gcs_group_identifier & group_id)68 u_long Gcs_xcom_utils::build_xcom_group_id(Gcs_group_identifier &group_id)
69 {
70   std::string group_id_str= group_id.get_group_id();
71   return mhash((unsigned char *)group_id_str.c_str(), group_id_str.size());
72 }
73 
74 
75 void
76 Gcs_xcom_utils::
process_peer_nodes(const std::string * peer_nodes,std::vector<std::string> & processed_peers)77 process_peer_nodes(const std::string *peer_nodes,
78                    std::vector<std::string> &processed_peers)
79 {
80   std::string peer_init(peer_nodes->c_str());
81   std::string delimiter= ",";
82 
83   //Clear all whitespace in the string
84   peer_init.erase(std::remove(peer_init.begin(), peer_init.end(), ' '),
85                   peer_init.end());
86 
87   // Skip delimiter at beginning.
88   std::string::size_type lastPos= peer_init.find_first_not_of(delimiter, 0);
89 
90   // Find first "non-delimiter".
91   std::string::size_type pos= peer_init.find_first_of(delimiter, lastPos);
92 
93   while (std::string::npos != pos || std::string::npos != lastPos)
94   {
95     std::string peer(peer_init.substr(lastPos, pos - lastPos));
96     processed_peers.push_back(peer);
97 
98     // Skip delimiter
99     lastPos= peer_init.find_first_not_of(delimiter, pos);
100 
101     // Find next "non-delimiter"
102     pos= peer_init.find_first_of(delimiter, lastPos);
103   }
104 }
105 
106 void
107 Gcs_xcom_utils::
validate_peer_nodes(std::vector<std::string> & peers,std::vector<std::string> & invalid_peers)108 validate_peer_nodes(std::vector<std::string> &peers,
109                     std::vector<std::string> &invalid_peers)
110 {
111   std::vector<std::string>::iterator it;
112   for(it= peers.begin(); it != peers.end();)
113   {
114     std::string server_and_port= *it;
115     if (!is_valid_hostname(server_and_port))
116     {
117       invalid_peers.push_back(server_and_port);
118       it= peers.erase(it);
119     }
120     else
121     {
122       ++it;
123     }
124   }
125 }
126 
127 uint32_t
mhash(unsigned char * buf,size_t length)128 Gcs_xcom_utils::mhash(unsigned char *buf, size_t length)
129 {
130   size_t i=     0;
131   uint32_t sum= 0;
132   for (i= 0; i < length; i++)
133   {
134     sum+= 0x811c9dc5 * (uint32_t)buf[i];
135   }
136 
137   return sum;
138 }
139 
init_net()140 int Gcs_xcom_utils::init_net()
141 {
142   return ::init_net();
143 }
144 
145 
deinit_net()146 int Gcs_xcom_utils::deinit_net()
147 {
148   return ::deinit_net();
149 }
150 
151 
152 void
delete_node_address(unsigned int n,node_address * na)153 Gcs_xcom_proxy_impl::delete_node_address(unsigned int n, node_address *na)
154 {
155   ::delete_node_address(n, na);
156 }
157 
158 
xcom_client_close_connection(connection_descriptor * fd)159 int Gcs_xcom_proxy_impl::xcom_client_close_connection(connection_descriptor* fd)
160 {
161   return ::xcom_close_client_connection(fd);
162 }
163 
164 
165 connection_descriptor*
xcom_client_open_connection(std::string saddr,xcom_port port)166 Gcs_xcom_proxy_impl::xcom_client_open_connection(std::string saddr, xcom_port port)
167 {
168   char *addr= (char *) saddr.c_str();
169   return ::xcom_open_client_connection(addr, port);
170 }
171 
172 
173 int Gcs_xcom_proxy_impl::
xcom_client_add_node(connection_descriptor * fd,node_list * nl,uint32_t gid)174 xcom_client_add_node(connection_descriptor* fd, node_list *nl, uint32_t gid)
175 {
176   return ::xcom_client_add_node(fd, nl, gid);
177 }
178 
179 int
xcom_client_remove_node(connection_descriptor * fd,node_list * nl,uint32_t gid)180 Gcs_xcom_proxy_impl::xcom_client_remove_node(connection_descriptor* fd, node_list* nl, uint32_t gid)
181 {
182   return ::xcom_client_remove_node(fd, nl, gid);
183 }
184 
185 
186 int
xcom_client_remove_node(node_list * nl,uint32_t gid)187 Gcs_xcom_proxy_impl::xcom_client_remove_node(node_list *nl, uint32_t gid)
188 {
189   int index= xcom_acquire_handler();
190   int res=   true;
191 
192   if (index != -1)
193   {
194     connection_descriptor* fd= m_xcom_handlers[index]->get_fd();
195 
196     /*
197       XCOM will return 1 if the request is successfully processed or
198       0 otherwise.
199     */
200     if (fd != NULL)
201       res= ::xcom_client_remove_node(fd, nl, gid) ? false : true;
202   }
203   xcom_release_handler(index);
204   return res;
205 }
206 
207 
xcom_client_boot(node_list * nl,uint32_t gid)208 int Gcs_xcom_proxy_impl::xcom_client_boot(node_list *nl, uint32_t gid)
209 {
210   int index= xcom_acquire_handler();
211   int res=   true;
212 
213   if (index != -1)
214   {
215     connection_descriptor* fd= m_xcom_handlers[index]->get_fd();
216 
217     if (fd != NULL)
218       res= ::xcom_client_boot(fd, nl, gid);
219   }
220   xcom_release_handler(index);
221   return res;
222 }
223 
224 
xcom_client_send_data(unsigned long long len,char * data)225 int Gcs_xcom_proxy_impl::xcom_client_send_data(unsigned long long len,
226                                                char *data)
227 {
228   int res=   true;
229 
230   if (len <= std::numeric_limits<unsigned int>::max())
231   {
232     int index= xcom_acquire_handler();
233     if (index != -1)
234     {
235       connection_descriptor* fd= m_xcom_handlers[index]->get_fd();
236       /*
237         XCOM will write all requested bytes or return -1 if there is
238         an error. However, the wrapper will return 1 if connections
239         to XCOM are not configured.
240 
241         Having said that, it should be enough to check whether data
242         size was written and report false if so and true otherwise.
243       */
244       if (fd != NULL)
245       {
246         assert(len > 0);
247         int64_t written=
248           ::xcom_client_send_data(static_cast<uint32_t>(len), data, fd);
249         if (static_cast<unsigned int>(written) >= len)
250           res= false;
251       }
252     }
253     xcom_release_handler(index);
254   }
255   else
256   {
257     /*
258       GCS's message length is defined as unsigned long long type, but
259       XCOM can only accept packets length of which are in unsigned int range.
260       So it throws an error when gcs message is too big.
261     */
262 
263     MYSQL_GCS_LOG_ERROR("The data is too big. Data length should not"
264                         << " exceed "
265                         << std::numeric_limits<unsigned int>::max()
266                         << " bytes.");
267   }
268   return res;
269 }
270 
271 
xcom_init(xcom_port xcom_listen_port)272 int Gcs_xcom_proxy_impl::xcom_init(xcom_port xcom_listen_port)
273 {
274   /* Init XCom */
275   ::xcom_fsm(xa_init, int_arg(0));                       /* Basic xcom init */
276 
277   ::xcom_taskmain2(xcom_listen_port);
278 
279   return 0;
280 }
281 
xcom_exit(bool xcom_handlers_open)282 int Gcs_xcom_proxy_impl::xcom_exit(bool xcom_handlers_open)
283 {
284   int index= xcom_acquire_handler();
285   int res= true;
286 
287 
288   if (index != -1)
289   {
290     connection_descriptor* fd= m_xcom_handlers[index]->get_fd();
291 
292     /* Stop XCom */
293     if (fd != NULL)
294     {
295       res= ::xcom_client_terminate_and_exit(fd);
296     }
297 
298     xcom_release_handler(index);
299   }
300   else if (!xcom_handlers_open)
301   {
302     /* The handlers were not yet open, so use basic xcom stop */
303     ::xcom_fsm(xa_exit, int_arg(0));
304     res= false;
305   }
306 
307   return res;
308 }
309 
xcom_set_cleanup()310 void Gcs_xcom_proxy_impl::xcom_set_cleanup()
311 {
312   xcom_set_ready(false);
313   xcom_set_exit(false);
314   xcom_set_comms_status(XCOM_COMM_STATUS_UNDEFINED);
315 }
316 
xcom_get_ssl_mode(const char * mode)317 int Gcs_xcom_proxy_impl::xcom_get_ssl_mode(const char* mode)
318 {
319    return ::xcom_get_ssl_mode(mode);
320 }
321 
322 
xcom_set_ssl_mode(int mode)323 int Gcs_xcom_proxy_impl::xcom_set_ssl_mode(int mode)
324 {
325    return ::xcom_set_ssl_mode(mode);
326 }
327 
328 
xcom_init_ssl()329 int Gcs_xcom_proxy_impl::xcom_init_ssl()
330 {
331   return ::xcom_init_ssl(m_server_key_file, m_server_cert_file,
332                          m_client_key_file, m_client_cert_file, m_ca_file,
333                          m_ca_path, m_crl_file, m_crl_path, m_cipher,
334                          m_tls_version);
335 }
336 
337 
xcom_destroy_ssl()338 void Gcs_xcom_proxy_impl::xcom_destroy_ssl()
339 {
340   ::xcom_destroy_ssl();
341 }
342 
343 
xcom_use_ssl()344 int Gcs_xcom_proxy_impl::xcom_use_ssl()
345 {
346   return ::xcom_use_ssl();
347 }
348 
349 
xcom_set_ssl_parameters(const char * server_key_file,const char * server_cert_file,const char * client_key_file,const char * client_cert_file,const char * ca_file,const char * ca_path,const char * crl_file,const char * crl_path,const char * cipher,const char * tls_version)350 void Gcs_xcom_proxy_impl::xcom_set_ssl_parameters(
351                    const char *server_key_file, const char *server_cert_file,
352                    const char *client_key_file, const char *client_cert_file,
353                    const char *ca_file, const char *ca_path,
354                    const char *crl_file, const char *crl_path,
355                    const char *cipher, const char *tls_version)
356 {
357   m_server_key_file= server_key_file;
358   m_server_cert_file= server_cert_file;
359   m_client_key_file= client_key_file;
360   m_client_cert_file= client_cert_file;
361   m_ca_file= ca_file;
362   m_ca_path= ca_path;
363   m_crl_file= crl_file;
364   m_crl_path= crl_path;
365   m_cipher= cipher;
366   m_tls_version= tls_version;
367 }
368 
369 
xcom_open_handlers(std::string saddr,xcom_port port)370 bool Gcs_xcom_proxy_impl::xcom_open_handlers(std::string saddr, xcom_port port)
371 {
372   bool success= true;
373   char *addr=   (char *)saddr.c_str();
374   int n=        0;
375 
376   m_lock_xcom_cursor.lock();
377   if (m_xcom_handlers_cursor == -1 && addr != NULL)
378   {
379     for (int i= 0; i < m_xcom_handlers_size && success; i++)
380     {
381       connection_descriptor* con= NULL;
382 
383       while ((con= xcom_client_open_connection(addr, port)) == NULL &&
384              n < Gcs_xcom_proxy::connection_attempts)
385       {
386         My_xp_util::sleep_seconds(1);
387         n++;
388       }
389 
390       n= 0;
391 
392       if (con == NULL)
393       {
394         success= false;
395         break;
396       }
397       else
398       {
399         if(m_socket_util->disable_nagle_in_socket(con->fd) < 0)
400         {
401           success= false;
402         }
403 
404         // This is a hack. It forces a protocol negotiation in
405         // the current connection with the local xcom, so that
406         // it does not happen later on.
407         if ((xcom_client_enable_arbitrator(con) <= 0) ||
408             (xcom_client_disable_arbitrator(con) <= 0))
409           success= false;
410       }
411 
412       m_xcom_handlers[i]->set_fd(con);
413     }
414 
415     if (!success)
416     {
417       for (int i= 0; i < m_xcom_handlers_size; i++)
418       {
419         if (m_xcom_handlers[i]->get_fd() != NULL)
420         {
421           xcom_close_client_connection(m_xcom_handlers[i]->get_fd());
422           m_xcom_handlers[i]->set_fd(NULL);
423         }
424       }
425       m_xcom_handlers_cursor= -1;
426     }
427     else
428       m_xcom_handlers_cursor= 0;
429   }
430   else
431   {
432     success= false;
433   }
434   m_lock_xcom_cursor.unlock();
435 
436   return success ? false : true;
437 }
438 
439 
xcom_close_handlers()440 bool Gcs_xcom_proxy_impl::xcom_close_handlers()
441 {
442   m_lock_xcom_cursor.lock();
443   // Prevent that any other thread gets a new handler.
444   m_xcom_handlers_cursor= -1;
445   m_lock_xcom_cursor.unlock();
446 
447   /* Close the file descriptors */
448   for (int i= 0; i < m_xcom_handlers_size; i++)
449   {
450     Xcom_handler *handler= m_xcom_handlers[i];
451     if (handler && handler->get_fd() != NULL)
452     {
453       handler->lock();
454       xcom_close_client_connection(handler->get_fd());
455       handler->unlock();
456     }
457   }
458 
459 #ifdef XCOM_HAVE_OPENSSL
460   ::xcom_cleanup_ssl();
461 #endif
462 
463   return false;
464 }
465 
466 
xcom_release_handler(int index)467 void Gcs_xcom_proxy_impl::xcom_release_handler(int index)
468 {
469   if (index < m_xcom_handlers_size && index >= 0)
470     m_xcom_handlers[index]->unlock();
471 }
472 
473 
xcom_acquire_handler()474 int Gcs_xcom_proxy_impl::xcom_acquire_handler()
475 {
476   int res= -1;
477   m_lock_xcom_cursor.lock();
478 
479   if (m_xcom_handlers_cursor != -1)
480   {
481     res= m_xcom_handlers_cursor;
482     m_xcom_handlers[res]->lock();
483     m_xcom_handlers_cursor= (m_xcom_handlers_cursor+1) % m_xcom_handlers_size;
484   }
485   m_lock_xcom_cursor.unlock();
486 
487   return res;
488 }
489 
490 /* purecov: begin deadcode */
Gcs_xcom_proxy_impl()491 Gcs_xcom_proxy_impl::Gcs_xcom_proxy_impl()
492   :m_xcom_handlers_cursor(-1), m_lock_xcom_cursor(),
493    m_xcom_handlers_size(XCOM_MAX_HANDLERS),
494    m_wait_time(WAITING_TIME),
495    m_xcom_handlers(NULL), m_lock_xcom_ready(),
496    m_cond_xcom_ready(), m_is_xcom_ready(false),
497    m_lock_xcom_comms_status(), m_cond_xcom_comms_status(),
498    m_xcom_comms_status(XCOM_COMM_STATUS_UNDEFINED),
499    m_lock_xcom_exit(), m_cond_xcom_exit(),
500    m_is_xcom_exit(false),
501    m_socket_util(NULL),
502    m_server_key_file(),
503    m_server_cert_file(),
504    m_client_key_file(),
505    m_client_cert_file(),
506    m_ca_file(),
507    m_ca_path(),
508    m_crl_file(),
509    m_crl_path(),
510    m_cipher(),
511    m_tls_version()
512 {
513   m_xcom_handlers= new Xcom_handler *[m_xcom_handlers_size];
514 
515   for (int i= 0; i < m_xcom_handlers_size; i++)
516     m_xcom_handlers[i]= new Xcom_handler();
517 
518   m_lock_xcom_cursor.init(NULL);
519   m_lock_xcom_ready.init(NULL);
520   m_cond_xcom_ready.init();
521   m_lock_xcom_comms_status.init(NULL);
522   m_cond_xcom_comms_status.init();
523   m_lock_xcom_exit.init(NULL);
524   m_cond_xcom_exit.init();
525 
526   m_socket_util= new My_xp_socket_util_impl();
527 }
528 /* purecov: begin end */
529 
Gcs_xcom_proxy_impl(int wt)530 Gcs_xcom_proxy_impl::Gcs_xcom_proxy_impl(int wt)
531   :m_xcom_handlers_cursor(-1), m_lock_xcom_cursor(),
532    m_xcom_handlers_size(XCOM_MAX_HANDLERS),
533    m_wait_time(wt),
534    m_xcom_handlers(NULL), m_lock_xcom_ready(),
535    m_cond_xcom_ready(), m_is_xcom_ready(false),
536    m_lock_xcom_comms_status(), m_cond_xcom_comms_status(),
537    m_xcom_comms_status(XCOM_COMM_STATUS_UNDEFINED),
538    m_lock_xcom_exit(), m_cond_xcom_exit(),
539    m_is_xcom_exit(false),
540    m_socket_util(NULL),
541    m_server_key_file(),
542    m_server_cert_file(),
543    m_client_key_file(),
544    m_client_cert_file(),
545    m_ca_file(),
546    m_ca_path(),
547    m_crl_file(),
548    m_crl_path(),
549    m_cipher(),
550    m_tls_version()
551 {
552   m_xcom_handlers= new Xcom_handler *[m_xcom_handlers_size];
553 
554   for (int i= 0; i < m_xcom_handlers_size; i++)
555     m_xcom_handlers[i]= new Xcom_handler();
556 
557   m_lock_xcom_cursor.init(NULL);
558   m_lock_xcom_ready.init(NULL);
559   m_cond_xcom_ready.init();
560   m_lock_xcom_comms_status.init(NULL);
561   m_cond_xcom_comms_status.init();
562   m_lock_xcom_exit.init(NULL);
563   m_cond_xcom_exit.init();
564 
565   m_socket_util= new My_xp_socket_util_impl();
566 }
567 
568 
~Gcs_xcom_proxy_impl()569 Gcs_xcom_proxy_impl::~Gcs_xcom_proxy_impl()
570 {
571   for (int i= 0; i < m_xcom_handlers_size; i++)
572     delete m_xcom_handlers[i];
573 
574   delete [] m_xcom_handlers;
575   m_lock_xcom_cursor.destroy();
576   m_lock_xcom_ready.destroy();
577   m_cond_xcom_ready.destroy();
578   m_lock_xcom_comms_status.destroy();
579   m_cond_xcom_comms_status.destroy();
580   m_lock_xcom_exit.destroy();
581   m_cond_xcom_exit.destroy();
582 
583   delete m_socket_util;
584 }
585 
586 
find_site_def(synode_no synode)587 site_def const *Gcs_xcom_proxy_impl::find_site_def(synode_no synode)
588 {
589   return ::find_site_def(synode);
590 }
591 
592 
Xcom_handler()593 Gcs_xcom_proxy_impl::Xcom_handler::Xcom_handler()
594   :m_lock(), m_fd(NULL)
595 {
596   m_lock.init(NULL);
597 }
598 
599 
~Xcom_handler()600 Gcs_xcom_proxy_impl::Xcom_handler::~Xcom_handler()
601 {
602   m_lock.destroy();
603 }
604 
605 
new_node_address_uuid(unsigned int n,char * names[],blob uuids[])606 node_address *Gcs_xcom_proxy_impl::new_node_address_uuid(
607   unsigned int n, char *names[], blob uuids[])
608 {
609   return ::new_node_address_uuid(static_cast<int>(n), names, uuids);
610 }
611 
612 
xcom_wait_ready()613 enum_gcs_error Gcs_xcom_proxy_impl::xcom_wait_ready()
614 {
615   enum_gcs_error ret= GCS_OK;
616   struct timespec ts;
617   int res= 0;
618 
619   m_lock_xcom_ready.lock();
620 
621   if (!m_is_xcom_ready)
622   {
623     My_xp_util::set_timespec(&ts, m_wait_time);
624     res= m_cond_xcom_ready.timed_wait(
625       m_lock_xcom_ready.get_native_mutex(), &ts);
626   }
627 
628   if (res != 0)
629   {
630     ret= GCS_NOK;
631     // There was an error
632     if(res == ETIMEDOUT)
633     {
634       // timeout
635       MYSQL_GCS_LOG_ERROR("Timeout while waiting for the group" <<
636                           " communication engine to be ready!");
637     }
638     else if(res == EINVAL)
639     {
640       // invalid abstime or cond or mutex
641       MYSQL_GCS_LOG_ERROR("Invalid parameter received by the timed wait for" <<
642                           " the group communication engine to be ready.");
643     }
644     else if(res == EPERM)
645     {
646       // mutex isn't owned by the current thread at the time of the call
647       MYSQL_GCS_LOG_ERROR("Thread waiting for the group communication" <<
648                           " engine to be ready does not own the mutex at the" <<
649                           " time of the call!");
650     }
651     else
652       MYSQL_GCS_LOG_ERROR("Error while waiting for the group" <<
653                           "communication engine to be ready!");
654   }
655 
656   m_lock_xcom_ready.unlock();
657 
658   return ret;
659 }
660 
661 
662 bool
xcom_is_ready()663 Gcs_xcom_proxy_impl::xcom_is_ready()
664 {
665   bool retval;
666 
667   m_lock_xcom_ready.lock();
668   retval= m_is_xcom_ready;
669   m_lock_xcom_ready.unlock();
670 
671   return retval;
672 }
673 
674 
675 void
xcom_set_ready(bool value)676 Gcs_xcom_proxy_impl::xcom_set_ready(bool value)
677 {
678   m_lock_xcom_ready.lock();
679   m_is_xcom_ready= value;
680   m_lock_xcom_ready.unlock();
681 }
682 
683 
684 void
xcom_signal_ready()685 Gcs_xcom_proxy_impl::xcom_signal_ready()
686 {
687   m_lock_xcom_ready.lock();
688   m_is_xcom_ready= true;
689   m_cond_xcom_ready.broadcast();
690   m_lock_xcom_ready.unlock();
691 }
692 
693 
xcom_wait_exit()694 enum_gcs_error Gcs_xcom_proxy_impl::xcom_wait_exit()
695 {
696   enum_gcs_error ret= GCS_OK;
697   struct timespec ts;
698   int res= 0;
699 
700   m_lock_xcom_exit.lock();
701 
702   if (!m_is_xcom_exit)
703   {
704     My_xp_util::set_timespec(&ts, m_wait_time);
705     res= m_cond_xcom_exit.timed_wait(
706       m_lock_xcom_exit.get_native_mutex(), &ts);
707   }
708 
709   if (res != 0)
710   {
711     ret= GCS_NOK;
712     // There was an error
713     if(res == ETIMEDOUT)
714     {
715       // timeout
716       MYSQL_GCS_LOG_ERROR(
717         "Timeout while waiting for the group communication engine to exit!"
718       )
719     }
720     else if(res == EINVAL)
721     {
722       // invalid abstime or cond or mutex
723       MYSQL_GCS_LOG_ERROR(
724         "Timed wait for group communication engine to exit received an "
725         "invalid parameter!"
726       )
727     }
728     else if(res == EPERM)
729     {
730       // mutex isn't owned by the current thread at the time of the call
731       MYSQL_GCS_LOG_ERROR(
732         "Timed wait for group communication engine to exit using mutex that "
733         "isn't owned by the current thread at the time of the call!"
734       )
735     }
736     else
737       MYSQL_GCS_LOG_ERROR(
738         "Error while waiting for group communication to exit!"
739       )
740   }
741 
742   m_lock_xcom_exit.unlock();
743 
744   return ret;
745 }
746 
747 
748 bool
xcom_is_exit()749 Gcs_xcom_proxy_impl::xcom_is_exit()
750 {
751   bool retval;
752 
753   m_lock_xcom_exit.lock();
754   retval= m_is_xcom_exit;
755   m_lock_xcom_exit.unlock();
756 
757   return retval;
758 }
759 
760 
761 void
xcom_set_exit(bool value)762 Gcs_xcom_proxy_impl::xcom_set_exit(bool value)
763 {
764   m_lock_xcom_exit.lock();
765   m_is_xcom_exit= value;
766   m_lock_xcom_exit.unlock();
767 }
768 
769 
770 void
xcom_signal_exit()771 Gcs_xcom_proxy_impl::xcom_signal_exit()
772 {
773   m_lock_xcom_exit.lock();
774   m_is_xcom_exit= true;
775   m_cond_xcom_exit.broadcast();
776   m_lock_xcom_exit.unlock();
777 }
778 
779 
780 void
xcom_wait_for_xcom_comms_status_change(int & status)781 Gcs_xcom_proxy_impl::xcom_wait_for_xcom_comms_status_change(int& status)
782 {
783   struct timespec ts;
784   int res= 0;
785 
786   m_lock_xcom_comms_status.lock();
787 
788   if (m_xcom_comms_status == XCOM_COMM_STATUS_UNDEFINED)
789   {
790     My_xp_util::set_timespec(&ts, m_wait_time);
791     res= m_cond_xcom_comms_status.timed_wait(
792       m_lock_xcom_comms_status.get_native_mutex(), &ts);
793   }
794 
795   if (res != 0)
796   {
797     // There was an error
798     status= XCOM_COMMS_OTHER;
799 
800     if(res == ETIMEDOUT)
801     {
802       // timeout
803       MYSQL_GCS_LOG_ERROR("Timeout while waiting for the group communication" <<
804                           " engine's communications status to change!");
805     }
806     else if(res == EINVAL)
807     {
808       // invalid abstime or cond or mutex
809       MYSQL_GCS_LOG_ERROR("Invalid parameter received by the timed wait for" <<
810                           " the group communication engine's communications" <<
811                           " status to change.");
812     }
813     else if(res == EPERM)
814     {
815       // mutex isn't owned by the current thread at the time of the call
816       MYSQL_GCS_LOG_ERROR("Thread waiting for the group communication" <<
817                           " engine's communications status to change does" <<
818                           " not own the mutex at the time of the call!");
819     }
820     else
821       MYSQL_GCS_LOG_ERROR("Error while waiting for the group communication" <<
822                           " engine's communications status to change!");
823   }
824   else
825     status= m_xcom_comms_status;
826 
827   m_lock_xcom_comms_status.unlock();
828 }
829 
830 bool
xcom_has_comms_status_changed()831 Gcs_xcom_proxy_impl::xcom_has_comms_status_changed()
832 {
833   bool retval;
834 
835   m_lock_xcom_comms_status.lock();
836   retval= (m_xcom_comms_status != XCOM_COMM_STATUS_UNDEFINED);
837   m_lock_xcom_comms_status.unlock();
838 
839   return retval;
840 }
841 
842 void
xcom_set_comms_status(int value)843 Gcs_xcom_proxy_impl::xcom_set_comms_status(int value)
844 {
845   m_lock_xcom_comms_status.lock();
846   m_xcom_comms_status= value;
847   m_lock_xcom_comms_status.unlock();
848 }
849 
850 void
xcom_signal_comms_status_changed(int status)851 Gcs_xcom_proxy_impl::xcom_signal_comms_status_changed(int status)
852 {
853   m_lock_xcom_comms_status.lock();
854   m_xcom_comms_status= status;
855   m_cond_xcom_comms_status.broadcast();
856   m_lock_xcom_comms_status.unlock();
857 }
858 
init()859 void Gcs_xcom_app_cfg::init()
860 {
861   ::init_cfg_app_xcom();
862 }
863 
deinit()864 void Gcs_xcom_app_cfg::deinit()
865 {
866   ::deinit_cfg_app_xcom();
867 }
868 
set_poll_spin_loops(unsigned int loops)869 void Gcs_xcom_app_cfg::set_poll_spin_loops(unsigned int loops)
870 {
871   if (the_app_xcom_cfg)
872     the_app_xcom_cfg->m_poll_spin_loops= loops;
873 }
874 
875 int
xcom_client_force_config(connection_descriptor * fd,node_list * nl,uint32_t group_id)876 Gcs_xcom_proxy_impl::xcom_client_force_config(connection_descriptor *fd,
877                                               node_list *nl,
878                                               uint32_t group_id)
879 {
880   return ::xcom_client_force_config(fd, nl, group_id);
881 }
882 
883 int
xcom_client_force_config(node_list * nl,uint32_t group_id)884 Gcs_xcom_proxy_impl::xcom_client_force_config(node_list *nl,
885                                               uint32_t group_id)
886 {
887   int index= xcom_acquire_handler();
888   int res=   true;
889 
890   if (index != -1)
891   {
892     connection_descriptor* fd= m_xcom_handlers[index]->get_fd();
893 
894     if (fd != NULL)
895       res= this->xcom_client_force_config(fd, nl, group_id);
896   }
897   xcom_release_handler(index);
898   return res;
899 }
900 
Gcs_xcom_nodes(const site_def * site,node_set & nodes)901 Gcs_xcom_nodes::Gcs_xcom_nodes(const site_def *site, node_set &nodes)
902   : m_node_no(site->nodeno), m_addresses(), m_uuids(), m_statuses(),
903     m_size(nodes.node_set_len)
904 {
905   Gcs_uuid uuid;
906   for (unsigned int i= 0; i < nodes.node_set_len; ++i)
907   {
908     /* Get member address and save it. */
909     std::string address(site->nodes.node_list_val[i].address);
910     m_addresses.push_back(address);
911 
912     /* Get member uuid and save it. */
913     uuid.decode(
914       reinterpret_cast<uchar *>(site->nodes.node_list_val[i].uuid.data.data_val),
915       site->nodes.node_list_val[i].uuid.data.data_len
916     );
917     m_uuids.push_back(uuid);
918 
919     /* Get member status and save it */
920     m_statuses.push_back(nodes.node_set_val[i] ? true: false);
921   }
922   assert(m_size == m_addresses.size());
923   assert(m_size == m_statuses.size());
924 }
925 
926 
Gcs_xcom_nodes()927 Gcs_xcom_nodes::Gcs_xcom_nodes()
928   : m_node_no(0), m_addresses(), m_uuids(), m_statuses(),
929     m_size(0)
930 {
931 }
932 
933 
get_node_no() const934 unsigned int Gcs_xcom_nodes::get_node_no() const
935 {
936    return m_node_no;
937 }
938 
get_addresses() const939 const std::vector<std::string> &Gcs_xcom_nodes::get_addresses() const
940 {
941   return m_addresses;
942 }
943 
get_uuids() const944 const std::vector<Gcs_uuid> &Gcs_xcom_nodes::get_uuids() const
945 {
946   return m_uuids;
947 }
948 
949 
get_uuid(const std::string & address) const950 const Gcs_uuid *Gcs_xcom_nodes::get_uuid(const std::string &address) const
951 {
952   for (size_t index= 0; index < m_size; index++)
953   {
954     if (!m_addresses[index].compare(address))
955     {
956       return &m_uuids[index];
957     }
958   }
959   return NULL;
960 }
961 
962 
get_statuses() const963 const std::vector<bool> &Gcs_xcom_nodes::get_statuses() const
964 {
965   return m_statuses;
966 }
967 
get_size() const968 unsigned int Gcs_xcom_nodes::get_size() const
969 {
970   return m_size;
971 }
972 
973 bool
is_valid_hostname(const std::string & server_and_port)974 is_valid_hostname(const std::string &server_and_port)
975 {
976   std::string::size_type delim_pos= server_and_port.find_last_of(":");
977   std::string s_port= server_and_port.substr(delim_pos+1, server_and_port.length());
978   std::string hostname= server_and_port.substr(0, delim_pos);
979   int port;
980   bool error= false;
981   struct addrinfo *addr= NULL;
982 
983   if ((error= (delim_pos == std::string::npos)))
984     goto end;
985 
986   /* handle hostname*/
987   error= (checked_getaddrinfo(hostname.c_str(), 0, NULL, &addr) != 0);
988   if (error)
989     goto end;
990 
991   /* handle port */
992   if ((error= !is_number(s_port)))
993     goto end;
994 
995   port= atoi(s_port.c_str());
996   if ((error= port > USHRT_MAX))
997     goto end;
998 
999 end:
1000   if (addr)
1001     freeaddrinfo(addr);
1002   return error == false;
1003 }
1004 
1005 void
fix_parameters_syntax(Gcs_interface_parameters & interface_params)1006 fix_parameters_syntax(Gcs_interface_parameters &interface_params)
1007 {
1008   std::string *compression_str= const_cast<std::string *>(
1009     interface_params.get_parameter("compression"));
1010   std::string *compression_threshold_str= const_cast<std::string *>(
1011     interface_params.get_parameter("compression_threshold"));
1012   std::string *wait_time_str= const_cast<std::string *>(
1013     interface_params.get_parameter("wait_time"));
1014   std::string *ip_whitelist_str= const_cast<std::string *>(
1015     interface_params.get_parameter("ip_whitelist"));
1016   std::string *join_attempts_str= const_cast<std::string *>(
1017     interface_params.get_parameter("join_attempts"));
1018   std::string *join_sleep_time_str= const_cast<std::string *>(
1019     interface_params.get_parameter("join_sleep_time"));
1020 
1021   // sets the default value for compression (ON by default)
1022   if (!compression_str)
1023   {
1024     interface_params.add_parameter("compression", "on");
1025   }
1026 
1027   // sets the default threshold if no threshold has been set
1028   if (!compression_threshold_str)
1029   {
1030     std::stringstream ss;
1031     ss << Gcs_message_stage_lz4::DEFAULT_THRESHOLD;
1032     interface_params.add_parameter("compression_threshold", ss.str());
1033   }
1034 
1035   // sets the default waiting time for timed_waits
1036   if (!wait_time_str)
1037   {
1038     std::stringstream ss;
1039     ss << WAITING_TIME;
1040     interface_params.add_parameter("wait_time", ss.str());
1041   }
1042 
1043   // sets the default ip whitelist
1044   if (!ip_whitelist_str)
1045   {
1046     std::stringstream ss;
1047     std::string iplist;
1048     std::map<std::string, int> out;
1049 
1050     // add local private networks that one has an IP on by default
1051     get_ipv4_local_private_addresses(out);
1052 
1053     if (out.empty())
1054       ss << "127.0.0.1/32,::1/128,";
1055     else
1056     {
1057       std::map<std::string, int>::iterator it;
1058       for (it= out.begin(); it != out.end(); it++)
1059       {
1060         ss << (*it).first << "/" << (*it).second << ",";
1061       }
1062     }
1063 
1064     iplist= ss.str();
1065     iplist.erase(iplist.end() - 1); // remove trailing comma
1066 
1067     MYSQL_GCS_LOG_INFO("Added automatically IP ranges " << iplist <<
1068                        " to the whitelist");
1069 
1070     interface_params.add_parameter("ip_whitelist", iplist);
1071   }
1072 
1073   // sets the default join attempts
1074   if (!join_attempts_str)
1075   {
1076     std::stringstream ss;
1077     ss << JOIN_ATTEMPTS;
1078     interface_params.add_parameter("join_attempts", ss.str());
1079   }
1080 
1081   // sets the default sleep time between join attempts
1082   if (!join_sleep_time_str)
1083   {
1084     std::stringstream ss;
1085     ss << JOIN_SLEEP_TIME;
1086     interface_params.add_parameter("join_sleep_time", ss.str());
1087   }
1088 }
1089 
1090 static enum_gcs_error
is_valid_flag(const std::string param,std::string & flag)1091 is_valid_flag(const std::string param, std::string &flag)
1092 {
1093   enum_gcs_error error= GCS_OK;
1094 
1095   // transform to lower case
1096   std::transform (flag.begin(), flag.end(), flag.begin(), ::tolower);
1097 
1098   if (flag.compare("on") && flag.compare("off") &&
1099       flag.compare("true") && flag.compare("false"))
1100   {
1101     std::stringstream ss;
1102     ss << "Invalid parameter set to " << param << ". ";
1103     ss << "Valid values are either \"on\" or \"off\".";
1104     MYSQL_GCS_LOG_ERROR(ss.str());
1105     error= GCS_NOK;
1106   }
1107   return error;
1108 }
1109 
1110 bool
is_parameters_syntax_correct(const Gcs_interface_parameters & interface_params)1111 is_parameters_syntax_correct(const Gcs_interface_parameters &interface_params)
1112 {
1113   enum_gcs_error error= GCS_OK;
1114 
1115   // get the parameters
1116   const std::string *group_name_str=
1117     interface_params.get_parameter("group_name");
1118   const std::string *local_node_str=
1119     interface_params.get_parameter("local_node");
1120   const std::string *peer_nodes_str=
1121     interface_params.get_parameter("peer_nodes");
1122   const std::string *bootstrap_group_str=
1123     interface_params.get_parameter("bootstrap_group");
1124   const std::string *poll_spin_loops_str=
1125     interface_params.get_parameter("poll_spin_loops");
1126   const std::string *compression_threshold_str=
1127     interface_params.get_parameter("compression_threshold");
1128   const std::string *compression_str=
1129     interface_params.get_parameter("compression");
1130   const std::string *wait_time_str=
1131     interface_params.get_parameter("wait_time");
1132   const std::string *join_attempts_str=
1133     interface_params.get_parameter("join_attempts");
1134   const std::string *join_sleep_time_str=
1135     interface_params.get_parameter("join_sleep_time");
1136 
1137   /*
1138     -----------------------------------------------------
1139     Checks
1140     -----------------------------------------------------
1141    */
1142   // validate group name
1143   if (group_name_str != NULL &&
1144       group_name_str->size() == 0)
1145   {
1146     MYSQL_GCS_LOG_ERROR("The group_name parameter (" << group_name_str << ")" <<
1147                         " is not valid.")
1148     error= GCS_NOK;
1149     goto end;
1150   }
1151 
1152   // validate bootstrap string
1153   // accepted values: true, false, on, off
1154   if (bootstrap_group_str != NULL)
1155   {
1156     std::string &flag= const_cast<std::string &>(*bootstrap_group_str);
1157     error= is_valid_flag("bootstrap_group", flag);
1158     if (error == GCS_NOK)
1159       goto end;
1160   }
1161 
1162   // validate peer addresses addresses
1163   if (peer_nodes_str != NULL)
1164   {
1165     /*
1166      Parse and validate hostname and ports.
1167      */
1168     std::vector<std::string> hostnames_and_ports;
1169     std::vector<std::string> invalid_hostnames_and_ports;
1170     Gcs_xcom_utils::process_peer_nodes(peer_nodes_str, hostnames_and_ports);
1171     Gcs_xcom_utils::validate_peer_nodes(hostnames_and_ports,
1172                                         invalid_hostnames_and_ports);
1173 
1174     if(!invalid_hostnames_and_ports.empty())
1175     {
1176       std::vector<std::string>::iterator invalid_hostnames_and_ports_it;
1177       for(invalid_hostnames_and_ports_it= invalid_hostnames_and_ports.begin();
1178           invalid_hostnames_and_ports_it != invalid_hostnames_and_ports.end();
1179           ++invalid_hostnames_and_ports_it)
1180       {
1181         MYSQL_GCS_LOG_WARN("Peer address \"" <<
1182                            (*invalid_hostnames_and_ports_it).c_str()
1183                            << "\" is not valid.");
1184       }
1185     }
1186 
1187     /*
1188      This means that none of the provided hosts is valid and that
1189      hostnames_and_ports had some sort of value
1190      */
1191     if(!invalid_hostnames_and_ports.empty() && hostnames_and_ports.empty())
1192     {
1193       MYSQL_GCS_LOG_ERROR("None of the provided peer address is valid.");
1194       error= GCS_NOK;
1195       goto end;
1196     }
1197   }
1198 
1199   // local peer address
1200   if (local_node_str != NULL)
1201   {
1202     bool matches_local_ip= false;
1203     std::map<std::string, int> ips;
1204     std::map<std::string, int>::iterator it;
1205 
1206     std::string::size_type delim_pos= (*local_node_str).find_last_of(":");
1207     std::string host= (*local_node_str).substr(0, delim_pos);
1208     std::string ip;
1209 
1210     // first validate hostname
1211     if (!is_valid_hostname(*local_node_str))
1212     {
1213       MYSQL_GCS_LOG_ERROR("Invalid hostname or IP address (" <<
1214                           *local_node_str << ") assigned to the parameter " <<
1215                           "local_node!");
1216 
1217       error= GCS_NOK;
1218       goto end;
1219     }
1220 
1221     // hostname was validated already, lets find the IP
1222     if (resolve_ip_addr_from_hostname(host, ip))
1223     {
1224       MYSQL_GCS_LOG_ERROR("Unable to translate hostname " << host <<
1225                           " to IP address!");
1226       error= GCS_NOK;
1227       goto end;
1228     }
1229 
1230     if (ip.compare(host) != 0)
1231       MYSQL_GCS_LOG_INFO("Translated '" << host << "' to " << ip);
1232 
1233     // second check that this host has that IP assigned
1234     if (get_ipv4_local_addresses(ips, true))
1235     {
1236       MYSQL_GCS_LOG_ERROR("Unable to get the list of local IP addresses for "
1237                           "the server!");
1238       error= GCS_NOK;
1239       goto end;
1240     }
1241 
1242     // see if any IP matches
1243     for (it= ips.begin(); it != ips.end() && !matches_local_ip; it++)
1244       matches_local_ip= (*it).first.compare(ip) == 0;
1245     if(!matches_local_ip)
1246     {
1247       MYSQL_GCS_LOG_ERROR("There is no local IP address matching the one "
1248                           "configured for the local node (" <<
1249                           *local_node_str << ").");
1250       error= GCS_NOK;
1251       goto end;
1252     }
1253   }
1254 
1255   // poll spin loops
1256   if(poll_spin_loops_str &&
1257      (poll_spin_loops_str->size() == 0 ||
1258       !is_number(*poll_spin_loops_str)))
1259   {
1260     MYSQL_GCS_LOG_ERROR("The poll_spin_loops parameter ("
1261                         << poll_spin_loops_str << ") is not valid.")
1262     error= GCS_NOK;
1263     goto end;
1264   }
1265 
1266   // validate compression
1267   if (compression_str != NULL)
1268   {
1269     std::string &flag= const_cast<std::string &>(*compression_str);
1270     error= is_valid_flag("compression", flag);
1271     if (error == GCS_NOK)
1272       goto end;
1273   }
1274 
1275   if (compression_threshold_str &&
1276       (compression_threshold_str->size() == 0 ||
1277        !is_number(*compression_threshold_str)))
1278   {
1279     MYSQL_GCS_LOG_ERROR("The compression_threshold parameter (" <<
1280                         compression_threshold_str << ") is not valid.")
1281     error= GCS_NOK;
1282     goto end;
1283   }
1284 
1285   if (wait_time_str &&
1286       (wait_time_str->size() == 0 ||
1287        !is_number(*wait_time_str)))
1288   {
1289     MYSQL_GCS_LOG_ERROR("The wait_time parameter (" << wait_time_str <<
1290                         ") is not valid.")
1291     error= GCS_NOK;
1292     goto end;
1293   }
1294 
1295   if(join_attempts_str &&
1296      (join_attempts_str->size() == 0 ||
1297       !is_number(*join_attempts_str)))
1298   {
1299     MYSQL_GCS_LOG_ERROR("The join_attempts parameter ("
1300                         << join_attempts_str << ") is not valid.")
1301     error= GCS_NOK;
1302     goto end;
1303   }
1304 
1305   if(join_sleep_time_str &&
1306      (join_sleep_time_str->size() == 0 ||
1307       !is_number(*join_sleep_time_str)))
1308   {
1309     MYSQL_GCS_LOG_ERROR("The join_sleep_time parameter ("
1310                         << join_sleep_time_str << ") is not valid.")
1311     error= GCS_NOK;
1312     goto end;
1313   }
1314 
1315 end:
1316   return error == GCS_NOK ? false : true;
1317 }
1318