1 /*
2    Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "storage/ndb/plugin/ha_ndbcluster_connection.h"
26 
27 #include <mysql/psi/mysql_thread.h>
28 
29 #include "my_dbug.h"
30 #include "mysql/plugin.h"
31 #include "mysqld_error.h"
32 #include "sql/auth/auth_acls.h"
33 #include "sql/mysqld.h"  // server_id, connection_events_loop_aborted
34 #include "sql/sql_class.h"
35 #include "sql/sql_lex.h"
36 #include "storage/ndb/include/kernel/ndb_limits.h"
37 #include "storage/ndb/include/ndbapi/NdbApi.hpp"
38 #include "storage/ndb/include/portlib/NdbTick.h"
39 #include "storage/ndb/include/util/BaseString.hpp"
40 #include "storage/ndb/include/util/Vector.hpp"
41 #ifndef _WIN32
42 #include <netdb.h>  // getservbyname
43 #endif
44 
45 #include "sql/table.h"
46 #include "storage/ndb/plugin/ndb_log.h"
47 #include "storage/ndb/plugin/ndb_sleep.h"
48 
49 Ndb *g_ndb = NULL;
50 Ndb_cluster_connection *g_ndb_cluster_connection = NULL;
51 static Ndb_cluster_connection **g_pool = NULL;
52 static uint g_pool_alloc = 0;
53 static uint g_pool_pos = 0;
54 static mysql_mutex_t g_pool_mutex;
55 
56 /**
57    @brief Parse the --ndb-cluster-connection-pool-nodeids=nodeid[,nodeidN]
58           comma separated list of nodeids to use for the pool
59 
60    @param opt_str      string containing list of nodeids to parse.
61    @param pool_size    size used for the connection pool
62    @param force_nodeid nodeid requested with --ndb-nodeid
63    @param nodeids      the parsed list of nodeids
64    @return             true or false when option parsing failed. Error message
65                        describing the problem has been printed to error log.
66  */
parse_pool_nodeids(const char * opt_str,uint pool_size,uint force_nodeid,Vector<uint> & nodeids)67 static bool parse_pool_nodeids(const char *opt_str, uint pool_size,
68                                uint force_nodeid, Vector<uint> &nodeids) {
69   if (!opt_str) {
70     // The option was not specified.
71     return true;
72   }
73 
74   BaseString tmp(opt_str);
75   Vector<BaseString> list(pool_size);
76   tmp.split(list, ",");
77 
78   for (unsigned i = 0; i < list.size(); i++) {
79     list[i].trim();
80 
81     // Don't allow empty string
82     if (list[i].empty()) {
83       ndb_log_error(
84           "Found empty nodeid specified in "
85           "--ndb-cluster-connection-pool-nodeids='%s'.",
86           opt_str);
87       return false;
88     }
89 
90     // Convert string to number
91     uint nodeid = 0;
92     if (sscanf(list[i].c_str(), "%u", &nodeid) != 1) {
93       ndb_log_error(
94           "Could not parse '%s' in "
95           "--ndb-cluster-connection-pool-nodeids='%s'.",
96           list[i].c_str(), opt_str);
97       return false;
98     }
99 
100     // Check that number is a valid nodeid
101     if (nodeid <= 0 || nodeid > MAX_NODES_ID) {
102       ndb_log_error(
103           "Invalid nodeid %d in "
104           "--ndb-cluster-connection-pool-nodeids='%s'.",
105           nodeid, opt_str);
106       return false;
107     }
108 
109     // Check that nodeid is unique(not already in the list)
110     for (unsigned j = 0; j < nodeids.size(); j++) {
111       if (nodeid == nodeids[j]) {
112         ndb_log_error(
113             "Found duplicate nodeid %d in "
114             "--ndb-cluster-connection-pool-nodeids='%s'.",
115             nodeid, opt_str);
116         return false;
117       }
118     }
119 
120     nodeids.push_back(nodeid);
121   }
122 
123   // Check that size of nodeids match the pool size
124   if (nodeids.size() != pool_size) {
125     ndb_log_error(
126         "The size of the cluster connection pool must be "
127         "equal to the number of nodeids in "
128         "--ndb-cluster-connection-pool-nodeids='%s'.",
129         opt_str);
130     return false;
131   }
132 
133   // Check that --ndb-nodeid(if given) is first in the list
134   if (force_nodeid != 0 && force_nodeid != nodeids[0]) {
135     ndb_log_error(
136         "The nodeid specified by --ndb-nodeid must be equal "
137         "to the first nodeid in "
138         "--ndb-cluster-connection-pool-nodeids='%s'.",
139         opt_str);
140     return false;
141   }
142 
143   return true;
144 }
145 
146 /* Get the port number, hostname, and socket path for processinfo.
147 
148    opt_disable_networking, mysqld_port, my_bind_addr_str, report_port,
149    report_host, and mysqld_unix_port are all server global variables.
150 */
151 extern uint report_port;
152 extern char *report_host;
153 extern char *my_bind_addr_str;
154 
get_processinfo_port()155 static int get_processinfo_port() {
156   int port = 0;
157 
158   if (!opt_disable_networking) {
159     port = report_port ? report_port : mysqld_port;
160     DBUG_ASSERT(port);
161   }
162   return port;
163 }
164 
get_processinfo_host()165 static const char *get_processinfo_host() {
166   const char *host = report_host;
167   if (!host) {
168     host = my_bind_addr_str;
169     if (!(strcmp(host, "*") &&        // If bind_address matches any of
170           strcmp(host, "0.0.0.0") &&  // these strings, let ProcessInfo
171           strcmp(host, "::")))        // use the NDB transporter address.
172     {
173       host = nullptr;
174     }
175   }
176   return host;
177 }
178 
get_processinfo_path()179 static const char *get_processinfo_path() { return mysqld_unix_port; }
180 
181 /*
182   Global flag in ndbapi to specify if api should wait to connect
183   until dict cache is clean.
184 
185   Set to 1 below to not wait, as ndb handler makes sure that no
186   old ndb objects are used.
187 */
188 extern int global_flag_skip_waiting_for_clean_cache;
189 
ndbcluster_connect(int (* connect_callback)(void),ulong wait_connected,uint connection_pool_size,const char * connection_pool_nodeids_str,bool optimized_node_select,const char * connect_string,uint force_nodeid,uint recv_thread_activation_threshold,uint data_node_neighbour)190 int ndbcluster_connect(int (*connect_callback)(void),
191                        ulong wait_connected,  // Timeout in seconds
192                        uint connection_pool_size,
193                        const char *connection_pool_nodeids_str,
194                        bool optimized_node_select, const char *connect_string,
195                        uint force_nodeid, uint recv_thread_activation_threshold,
196                        uint data_node_neighbour) {
197   const char mysqld_name[] = "mysqld";
198   int res;
199   DBUG_TRACE;
200   DBUG_PRINT("enter", ("connect_string: %s, force_nodeid: %d", connect_string,
201                        force_nodeid));
202 
203   /* For Service URI in ndbinfo */
204   const int processinfo_port = get_processinfo_port();
205   const char *processinfo_host = get_processinfo_host();
206   const char *processinfo_path = processinfo_port ? "" : get_processinfo_path();
207   char server_id_string[64];
208   if (server_id > 0)
209     snprintf(server_id_string, sizeof(server_id_string), "?server-id=%lu",
210              server_id);
211   else
212     server_id_string[0] = '\0';
213 
214   // Parse the --ndb-cluster-connection-pool-nodeids=nodeid[,nodeidN]
215   // comma separated list of nodeids to use for the pool
216   Vector<uint> nodeids;
217   if (!parse_pool_nodeids(connection_pool_nodeids_str, connection_pool_size,
218                           force_nodeid, nodeids)) {
219     // Error message already printed
220     return -1;
221   }
222 
223   // Find specified nodeid for first connection and let it override
224   // force_nodeid(if both has been specified they are equal).
225   if (nodeids.size()) {
226     assert(force_nodeid == 0 || force_nodeid == nodeids[0]);
227     force_nodeid = nodeids[0];
228     ndb_log_info("using nodeid %u", force_nodeid);
229   }
230 
231   global_flag_skip_waiting_for_clean_cache = 1;
232 
233   g_ndb_cluster_connection =
234       new (std::nothrow) Ndb_cluster_connection(connect_string, force_nodeid);
235   if (g_ndb_cluster_connection == nullptr) {
236     ndb_log_error("failed to allocate global ndb cluster connection");
237     DBUG_PRINT("error", ("Ndb_cluster_connection(%s)", connect_string));
238     return -1;
239   }
240   {
241     char buf[128];
242     snprintf(buf, sizeof(buf), "%s --server-id=%lu", mysqld_name, server_id);
243     g_ndb_cluster_connection->set_name(buf);
244     snprintf(buf, sizeof(buf), "%s%s", processinfo_path, server_id_string);
245     g_ndb_cluster_connection->set_service_uri("mysql", processinfo_host,
246                                               processinfo_port, buf);
247   }
248   g_ndb_cluster_connection->set_optimized_node_selection(optimized_node_select);
249   g_ndb_cluster_connection->set_recv_thread_activation_threshold(
250       recv_thread_activation_threshold);
251   g_ndb_cluster_connection->set_data_node_neighbour(data_node_neighbour);
252 
253   // Create a Ndb object to open the connection  to NDB
254   g_ndb = new (std::nothrow) Ndb(g_ndb_cluster_connection, "sys");
255   if (g_ndb == nullptr) {
256     ndb_log_error("failed to allocate global ndb object");
257     DBUG_PRINT("error", ("failed to create global ndb object"));
258     return -1;
259   }
260   if (g_ndb->init() != 0) {
261     DBUG_PRINT("error", ("%d  message: %s", g_ndb->getNdbError().code,
262                          g_ndb->getNdbError().message));
263     return -1;
264   }
265 
266   /* Connect to management server */
267 
268   const NDB_TICKS start = NdbTick_getCurrentTicks();
269 
270   while ((res = g_ndb_cluster_connection->connect(0, 0, 0)) == 1) {
271     const NDB_TICKS now = NdbTick_getCurrentTicks();
272     if (NdbTick_Elapsed(start, now).seconds() > wait_connected) break;
273     ndb_retry_sleep(100);
274     if (connection_events_loop_aborted()) return -1;
275   }
276 
277   {
278     g_pool_alloc = connection_pool_size;
279     g_pool = (Ndb_cluster_connection **)my_malloc(
280         PSI_INSTRUMENT_ME, g_pool_alloc * sizeof(Ndb_cluster_connection *),
281         MYF(MY_WME | MY_ZEROFILL));
282     mysql_mutex_init(PSI_INSTRUMENT_ME, &g_pool_mutex, MY_MUTEX_INIT_FAST);
283     g_pool[0] = g_ndb_cluster_connection;
284     for (uint i = 1; i < g_pool_alloc; i++) {
285       // Find specified nodeid for this connection or use default zero
286       uint nodeid = 0;
287       if (i < nodeids.size()) {
288         nodeid = nodeids[i];
289         ndb_log_info("connection[%u], using nodeid %u", i, nodeid);
290       }
291 
292       g_pool[i] = new (std::nothrow) Ndb_cluster_connection(
293           connect_string, g_ndb_cluster_connection, nodeid);
294       if (g_pool[i] == nullptr) {
295         ndb_log_error("connection[%u], failed to allocate connect object", i);
296         DBUG_PRINT("error",
297                    ("Ndb_cluster_connection[%u](%s)", i, connect_string));
298         return -1;
299       }
300       {
301         char buf[128];
302         snprintf(buf, sizeof(buf), "%s --server-id=%lu (connection %u)",
303                  mysqld_name, server_id, i + 1);
304         g_pool[i]->set_name(buf);
305         const char *uri_sep = server_id ? ";" : "?";
306         snprintf(buf, sizeof(buf), "%s%s%sconnection=%u", processinfo_path,
307                  server_id_string, uri_sep, i + 1);
308         g_pool[i]->set_service_uri("mysql", processinfo_host, processinfo_port,
309                                    buf);
310       }
311       g_pool[i]->set_optimized_node_selection(optimized_node_select);
312       g_pool[i]->set_recv_thread_activation_threshold(
313           recv_thread_activation_threshold);
314       g_pool[i]->set_data_node_neighbour(data_node_neighbour);
315     }
316   }
317 
318   if (res == 0) {
319     connect_callback();
320     for (uint i = 0; i < g_pool_alloc; i++) {
321       int node_id = g_pool[i]->node_id();
322       if (node_id == 0) {
323         // not connected to mgmd yet, try again
324         g_pool[i]->connect(0, 0, 0);
325         if (g_pool[i]->node_id() == 0) {
326           ndb_log_info("connection[%u], starting connect thread", i);
327           g_pool[i]->start_connect_thread();
328           continue;
329         }
330         node_id = g_pool[i]->node_id();
331       }
332       DBUG_PRINT("info", ("NDBCLUSTER storage engine (%u) at %s on port %d", i,
333                           g_pool[i]->get_connected_host(),
334                           g_pool[i]->get_connected_port()));
335 
336       Uint64 waited;
337       do {
338         res = g_pool[i]->wait_until_ready(1, 1);
339         const NDB_TICKS now = NdbTick_getCurrentTicks();
340         waited = NdbTick_Elapsed(start, now).seconds();
341       } while (res != 0 && waited < wait_connected);
342 
343       const char *msg = 0;
344       if (res == 0) {
345         msg = "all storage nodes connected";
346       } else if (res > 0) {
347         msg = "some storage nodes connected";
348       } else if (res < 0) {
349         msg = "no storage nodes connected (timed out)";
350       }
351       ndb_log_info("connection[%u], NodeID: %d, %s", i, node_id, msg);
352     }
353   } else if (res == 1) {
354     for (uint i = 0; i < g_pool_alloc; i++) {
355       if (g_pool[i]->start_connect_thread(i == 0 ? connect_callback : NULL)) {
356         ndb_log_error("connection[%u], failed to start connect thread", i);
357         DBUG_PRINT("error",
358                    ("g_ndb_cluster_connection->start_connect_thread()"));
359         return -1;
360       }
361     }
362 #ifndef DBUG_OFF
363     {
364       char buf[1024];
365       DBUG_PRINT("info", ("NDBCLUSTER storage engine not started, "
366                           "will connect using %s",
367                           g_ndb_cluster_connection->get_connectstring(
368                               buf, sizeof(buf))));
369     }
370 #endif
371   } else {
372     DBUG_ASSERT(res == -1);
373     DBUG_PRINT("error", ("permanent error"));
374     ndb_log_error("error (%u) %s", g_ndb_cluster_connection->get_latest_error(),
375                   g_ndb_cluster_connection->get_latest_error_msg());
376     return -1;
377   }
378   return 0;
379 }
380 
ndbcluster_disconnect(void)381 void ndbcluster_disconnect(void) {
382   DBUG_TRACE;
383   if (g_ndb) delete g_ndb;
384   g_ndb = NULL;
385   {
386     if (g_pool) {
387       /* first in pool is the main one, wait with release */
388       for (uint i = 1; i < g_pool_alloc; i++) {
389         if (g_pool[i]) delete g_pool[i];
390       }
391       my_free(g_pool);
392       mysql_mutex_destroy(&g_pool_mutex);
393       g_pool = 0;
394     }
395     g_pool_alloc = 0;
396     g_pool_pos = 0;
397   }
398   if (g_ndb_cluster_connection) delete g_ndb_cluster_connection;
399   g_ndb_cluster_connection = NULL;
400 }
401 
ndb_get_cluster_connection()402 Ndb_cluster_connection *ndb_get_cluster_connection() {
403   mysql_mutex_lock(&g_pool_mutex);
404   Ndb_cluster_connection *connection = g_pool[g_pool_pos];
405   g_pool_pos++;
406   if (g_pool_pos == g_pool_alloc) g_pool_pos = 0;
407   mysql_mutex_unlock(&g_pool_mutex);
408   return connection;
409 }
410 
ndb_get_latest_trans_gci()411 ulonglong ndb_get_latest_trans_gci() {
412   ulonglong val = *g_ndb_cluster_connection->get_latest_trans_gci();
413   for (uint i = 1; i < g_pool_alloc; i++) {
414     ulonglong tmp = *g_pool[i]->get_latest_trans_gci();
415     if (tmp > val) val = tmp;
416   }
417   return val;
418 }
419 
ndb_set_latest_trans_gci(ulonglong val)420 void ndb_set_latest_trans_gci(ulonglong val) {
421   for (uint i = 0; i < g_pool_alloc; i++) {
422     *g_pool[i]->get_latest_trans_gci() = val;
423   }
424 }
425 
ndb_has_node_id(uint id)426 int ndb_has_node_id(uint id) {
427   for (uint i = 0; i < g_pool_alloc; i++) {
428     if (id == g_pool[i]->node_id()) return 1;
429   }
430   return 0;
431 }
432 
ndb_set_recv_thread_activation_threshold(Uint32 threshold)433 int ndb_set_recv_thread_activation_threshold(Uint32 threshold) {
434   for (uint i = 0; i < g_pool_alloc; i++) {
435     g_pool[i]->set_recv_thread_activation_threshold(threshold);
436   }
437   return 0;
438 }
439 
ndb_set_recv_thread_cpu(Uint16 * cpuid_array,Uint32 cpuid_array_size)440 int ndb_set_recv_thread_cpu(Uint16 *cpuid_array, Uint32 cpuid_array_size) {
441   int ret_code = 0;
442   Uint32 num_cpu_needed = g_pool_alloc;
443 
444   if (cpuid_array_size == 0) {
445     for (Uint32 i = 0; i < g_pool_alloc; i++) {
446       ret_code = g_pool[i]->unset_recv_thread_cpu(0);
447     }
448     return ret_code;
449   }
450 
451   if (cpuid_array_size < num_cpu_needed) {
452     /* Ignore cpu masks that is too short */
453     ndb_log_info(
454         "Ignored receive thread CPU mask, mask too short,"
455         " %u CPUs needed in mask, only %u CPUs provided",
456         num_cpu_needed, cpuid_array_size);
457     return 1;
458   }
459   for (Uint32 i = 0; i < g_pool_alloc; i++) {
460     ret_code = g_pool[i]->set_recv_thread_cpu(&cpuid_array[i], (Uint32)1, 0);
461   }
462   return ret_code;
463 }
464 
ndb_set_data_node_neighbour(ulong data_node_neighbour)465 void ndb_set_data_node_neighbour(ulong data_node_neighbour) {
466   for (uint i = 0; i < g_pool_alloc; i++)
467     g_pool[i]->set_data_node_neighbour(data_node_neighbour);
468 }
469 
ndb_get_connection_stats(Uint64 * statsArr)470 void ndb_get_connection_stats(Uint64 *statsArr) {
471   Uint64 connectionStats[Ndb::NumClientStatistics];
472   memset(statsArr, 0, sizeof(connectionStats));
473 
474   for (uint i = 0; i < g_pool_alloc; i++) {
475     g_pool[i]->collect_client_stats(connectionStats, Ndb::NumClientStatistics);
476 
477     for (Uint32 s = 0; s < Ndb::NumClientStatistics; s++)
478       statsArr[s] += connectionStats[s];
479   }
480 }
481 
482 static ST_FIELD_INFO ndb_transid_mysql_connection_map_fields_info[] = {
483     {"mysql_connection_id", MY_INT64_NUM_DECIMAL_DIGITS, MYSQL_TYPE_LONGLONG, 0,
484      MY_I_S_UNSIGNED, "", 0},
485 
486     {"node_id", MY_INT64_NUM_DECIMAL_DIGITS, MYSQL_TYPE_LONG, 0,
487      MY_I_S_UNSIGNED, "", 0},
488     {"ndb_transid", MY_INT64_NUM_DECIMAL_DIGITS, MYSQL_TYPE_LONGLONG, 0,
489      MY_I_S_UNSIGNED, "", 0},
490 
491     {0, 0, MYSQL_TYPE_NULL, 0, 0, "", 0}};
492 
ndb_transid_mysql_connection_map_fill_table(THD * thd,TABLE_LIST * tables,Item *)493 static int ndb_transid_mysql_connection_map_fill_table(THD *thd,
494                                                        TABLE_LIST *tables,
495                                                        Item *) {
496   DBUG_TRACE;
497 
498   const bool all = (check_global_access(thd, PROCESS_ACL) == 0);
499   const ulonglong self = thd_get_thread_id(thd);
500 
501   TABLE *table = tables->table;
502   for (uint i = 0; i < g_pool_alloc; i++) {
503     if (g_pool[i]) {
504       g_pool[i]->lock_ndb_objects();
505       const Ndb *p = g_pool[i]->get_next_ndb_object(0);
506       while (p) {
507         Uint64 connection_id = p->getCustomData64();
508         if ((connection_id == self) || all) {
509           table->field[0]->set_notnull();
510           table->field[0]->store(p->getCustomData64(), true);
511           table->field[1]->set_notnull();
512           table->field[1]->store(g_pool[i]->node_id());
513           table->field[2]->set_notnull();
514           table->field[2]->store(p->getNextTransactionId(), true);
515           schema_table_store_record(thd, table);
516         }
517         p = g_pool[i]->get_next_ndb_object(p);
518       }
519       g_pool[i]->unlock_ndb_objects();
520     }
521   }
522 
523   return 0;
524 }
525 
ndb_transid_mysql_connection_map_init(void * p)526 static int ndb_transid_mysql_connection_map_init(void *p) {
527   DBUG_TRACE;
528   ST_SCHEMA_TABLE *schema = reinterpret_cast<ST_SCHEMA_TABLE *>(p);
529   schema->fields_info = ndb_transid_mysql_connection_map_fields_info;
530   schema->fill_table = ndb_transid_mysql_connection_map_fill_table;
531   return 0;
532 }
533 
534 static struct st_mysql_information_schema i_s_info = {
535     MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION};
536 
537 /*
538   information_schema table plugin providing a list of MySQL
539   connection ID's and their corresponding NDB transaction ID
540 */
541 struct st_mysql_plugin ndb_transid_mysql_connection_map_table = {
542     MYSQL_INFORMATION_SCHEMA_PLUGIN,
543     &i_s_info,
544     "ndb_transid_mysql_connection_map",
545     "Oracle Corporation",
546     "Map between MySQL connection ID and NDB transaction ID",
547     PLUGIN_LICENSE_GPL,
548     ndb_transid_mysql_connection_map_init,
549     NULL,
550     NULL,
551     0x0001,
552     NULL,
553     NULL,
554     NULL,
555     0};
556