1 /*
2 Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "storage/ndb/plugin/ha_ndbcluster_connection.h"
26
27 #include <mysql/psi/mysql_thread.h>
28
29 #include "my_dbug.h"
30 #include "mysql/plugin.h"
31 #include "mysqld_error.h"
32 #include "sql/auth/auth_acls.h"
33 #include "sql/mysqld.h" // server_id, connection_events_loop_aborted
34 #include "sql/sql_class.h"
35 #include "sql/sql_lex.h"
36 #include "storage/ndb/include/kernel/ndb_limits.h"
37 #include "storage/ndb/include/ndbapi/NdbApi.hpp"
38 #include "storage/ndb/include/portlib/NdbTick.h"
39 #include "storage/ndb/include/util/BaseString.hpp"
40 #include "storage/ndb/include/util/Vector.hpp"
41 #ifndef _WIN32
42 #include <netdb.h> // getservbyname
43 #endif
44
45 #include "sql/table.h"
46 #include "storage/ndb/plugin/ndb_log.h"
47 #include "storage/ndb/plugin/ndb_sleep.h"
48
49 Ndb *g_ndb = NULL;
50 Ndb_cluster_connection *g_ndb_cluster_connection = NULL;
51 static Ndb_cluster_connection **g_pool = NULL;
52 static uint g_pool_alloc = 0;
53 static uint g_pool_pos = 0;
54 static mysql_mutex_t g_pool_mutex;
55
56 /**
57 @brief Parse the --ndb-cluster-connection-pool-nodeids=nodeid[,nodeidN]
58 comma separated list of nodeids to use for the pool
59
60 @param opt_str string containing list of nodeids to parse.
61 @param pool_size size used for the connection pool
62 @param force_nodeid nodeid requested with --ndb-nodeid
63 @param nodeids the parsed list of nodeids
64 @return true or false when option parsing failed. Error message
65 describing the problem has been printed to error log.
66 */
parse_pool_nodeids(const char * opt_str,uint pool_size,uint force_nodeid,Vector<uint> & nodeids)67 static bool parse_pool_nodeids(const char *opt_str, uint pool_size,
68 uint force_nodeid, Vector<uint> &nodeids) {
69 if (!opt_str) {
70 // The option was not specified.
71 return true;
72 }
73
74 BaseString tmp(opt_str);
75 Vector<BaseString> list(pool_size);
76 tmp.split(list, ",");
77
78 for (unsigned i = 0; i < list.size(); i++) {
79 list[i].trim();
80
81 // Don't allow empty string
82 if (list[i].empty()) {
83 ndb_log_error(
84 "Found empty nodeid specified in "
85 "--ndb-cluster-connection-pool-nodeids='%s'.",
86 opt_str);
87 return false;
88 }
89
90 // Convert string to number
91 uint nodeid = 0;
92 if (sscanf(list[i].c_str(), "%u", &nodeid) != 1) {
93 ndb_log_error(
94 "Could not parse '%s' in "
95 "--ndb-cluster-connection-pool-nodeids='%s'.",
96 list[i].c_str(), opt_str);
97 return false;
98 }
99
100 // Check that number is a valid nodeid
101 if (nodeid <= 0 || nodeid > MAX_NODES_ID) {
102 ndb_log_error(
103 "Invalid nodeid %d in "
104 "--ndb-cluster-connection-pool-nodeids='%s'.",
105 nodeid, opt_str);
106 return false;
107 }
108
109 // Check that nodeid is unique(not already in the list)
110 for (unsigned j = 0; j < nodeids.size(); j++) {
111 if (nodeid == nodeids[j]) {
112 ndb_log_error(
113 "Found duplicate nodeid %d in "
114 "--ndb-cluster-connection-pool-nodeids='%s'.",
115 nodeid, opt_str);
116 return false;
117 }
118 }
119
120 nodeids.push_back(nodeid);
121 }
122
123 // Check that size of nodeids match the pool size
124 if (nodeids.size() != pool_size) {
125 ndb_log_error(
126 "The size of the cluster connection pool must be "
127 "equal to the number of nodeids in "
128 "--ndb-cluster-connection-pool-nodeids='%s'.",
129 opt_str);
130 return false;
131 }
132
133 // Check that --ndb-nodeid(if given) is first in the list
134 if (force_nodeid != 0 && force_nodeid != nodeids[0]) {
135 ndb_log_error(
136 "The nodeid specified by --ndb-nodeid must be equal "
137 "to the first nodeid in "
138 "--ndb-cluster-connection-pool-nodeids='%s'.",
139 opt_str);
140 return false;
141 }
142
143 return true;
144 }
145
146 /* Get the port number, hostname, and socket path for processinfo.
147
148 opt_disable_networking, mysqld_port, my_bind_addr_str, report_port,
149 report_host, and mysqld_unix_port are all server global variables.
150 */
151 extern uint report_port;
152 extern char *report_host;
153 extern char *my_bind_addr_str;
154
get_processinfo_port()155 static int get_processinfo_port() {
156 int port = 0;
157
158 if (!opt_disable_networking) {
159 port = report_port ? report_port : mysqld_port;
160 DBUG_ASSERT(port);
161 }
162 return port;
163 }
164
get_processinfo_host()165 static const char *get_processinfo_host() {
166 const char *host = report_host;
167 if (!host) {
168 host = my_bind_addr_str;
169 if (!(strcmp(host, "*") && // If bind_address matches any of
170 strcmp(host, "0.0.0.0") && // these strings, let ProcessInfo
171 strcmp(host, "::"))) // use the NDB transporter address.
172 {
173 host = nullptr;
174 }
175 }
176 return host;
177 }
178
get_processinfo_path()179 static const char *get_processinfo_path() { return mysqld_unix_port; }
180
181 /*
182 Global flag in ndbapi to specify if api should wait to connect
183 until dict cache is clean.
184
185 Set to 1 below to not wait, as ndb handler makes sure that no
186 old ndb objects are used.
187 */
188 extern int global_flag_skip_waiting_for_clean_cache;
189
ndbcluster_connect(int (* connect_callback)(void),ulong wait_connected,uint connection_pool_size,const char * connection_pool_nodeids_str,bool optimized_node_select,const char * connect_string,uint force_nodeid,uint recv_thread_activation_threshold,uint data_node_neighbour)190 int ndbcluster_connect(int (*connect_callback)(void),
191 ulong wait_connected, // Timeout in seconds
192 uint connection_pool_size,
193 const char *connection_pool_nodeids_str,
194 bool optimized_node_select, const char *connect_string,
195 uint force_nodeid, uint recv_thread_activation_threshold,
196 uint data_node_neighbour) {
197 const char mysqld_name[] = "mysqld";
198 int res;
199 DBUG_TRACE;
200 DBUG_PRINT("enter", ("connect_string: %s, force_nodeid: %d", connect_string,
201 force_nodeid));
202
203 /* For Service URI in ndbinfo */
204 const int processinfo_port = get_processinfo_port();
205 const char *processinfo_host = get_processinfo_host();
206 const char *processinfo_path = processinfo_port ? "" : get_processinfo_path();
207 char server_id_string[64];
208 if (server_id > 0)
209 snprintf(server_id_string, sizeof(server_id_string), "?server-id=%lu",
210 server_id);
211 else
212 server_id_string[0] = '\0';
213
214 // Parse the --ndb-cluster-connection-pool-nodeids=nodeid[,nodeidN]
215 // comma separated list of nodeids to use for the pool
216 Vector<uint> nodeids;
217 if (!parse_pool_nodeids(connection_pool_nodeids_str, connection_pool_size,
218 force_nodeid, nodeids)) {
219 // Error message already printed
220 return -1;
221 }
222
223 // Find specified nodeid for first connection and let it override
224 // force_nodeid(if both has been specified they are equal).
225 if (nodeids.size()) {
226 assert(force_nodeid == 0 || force_nodeid == nodeids[0]);
227 force_nodeid = nodeids[0];
228 ndb_log_info("using nodeid %u", force_nodeid);
229 }
230
231 global_flag_skip_waiting_for_clean_cache = 1;
232
233 g_ndb_cluster_connection =
234 new (std::nothrow) Ndb_cluster_connection(connect_string, force_nodeid);
235 if (g_ndb_cluster_connection == nullptr) {
236 ndb_log_error("failed to allocate global ndb cluster connection");
237 DBUG_PRINT("error", ("Ndb_cluster_connection(%s)", connect_string));
238 return -1;
239 }
240 {
241 char buf[128];
242 snprintf(buf, sizeof(buf), "%s --server-id=%lu", mysqld_name, server_id);
243 g_ndb_cluster_connection->set_name(buf);
244 snprintf(buf, sizeof(buf), "%s%s", processinfo_path, server_id_string);
245 g_ndb_cluster_connection->set_service_uri("mysql", processinfo_host,
246 processinfo_port, buf);
247 }
248 g_ndb_cluster_connection->set_optimized_node_selection(optimized_node_select);
249 g_ndb_cluster_connection->set_recv_thread_activation_threshold(
250 recv_thread_activation_threshold);
251 g_ndb_cluster_connection->set_data_node_neighbour(data_node_neighbour);
252
253 // Create a Ndb object to open the connection to NDB
254 g_ndb = new (std::nothrow) Ndb(g_ndb_cluster_connection, "sys");
255 if (g_ndb == nullptr) {
256 ndb_log_error("failed to allocate global ndb object");
257 DBUG_PRINT("error", ("failed to create global ndb object"));
258 return -1;
259 }
260 if (g_ndb->init() != 0) {
261 DBUG_PRINT("error", ("%d message: %s", g_ndb->getNdbError().code,
262 g_ndb->getNdbError().message));
263 return -1;
264 }
265
266 /* Connect to management server */
267
268 const NDB_TICKS start = NdbTick_getCurrentTicks();
269
270 while ((res = g_ndb_cluster_connection->connect(0, 0, 0)) == 1) {
271 const NDB_TICKS now = NdbTick_getCurrentTicks();
272 if (NdbTick_Elapsed(start, now).seconds() > wait_connected) break;
273 ndb_retry_sleep(100);
274 if (connection_events_loop_aborted()) return -1;
275 }
276
277 {
278 g_pool_alloc = connection_pool_size;
279 g_pool = (Ndb_cluster_connection **)my_malloc(
280 PSI_INSTRUMENT_ME, g_pool_alloc * sizeof(Ndb_cluster_connection *),
281 MYF(MY_WME | MY_ZEROFILL));
282 mysql_mutex_init(PSI_INSTRUMENT_ME, &g_pool_mutex, MY_MUTEX_INIT_FAST);
283 g_pool[0] = g_ndb_cluster_connection;
284 for (uint i = 1; i < g_pool_alloc; i++) {
285 // Find specified nodeid for this connection or use default zero
286 uint nodeid = 0;
287 if (i < nodeids.size()) {
288 nodeid = nodeids[i];
289 ndb_log_info("connection[%u], using nodeid %u", i, nodeid);
290 }
291
292 g_pool[i] = new (std::nothrow) Ndb_cluster_connection(
293 connect_string, g_ndb_cluster_connection, nodeid);
294 if (g_pool[i] == nullptr) {
295 ndb_log_error("connection[%u], failed to allocate connect object", i);
296 DBUG_PRINT("error",
297 ("Ndb_cluster_connection[%u](%s)", i, connect_string));
298 return -1;
299 }
300 {
301 char buf[128];
302 snprintf(buf, sizeof(buf), "%s --server-id=%lu (connection %u)",
303 mysqld_name, server_id, i + 1);
304 g_pool[i]->set_name(buf);
305 const char *uri_sep = server_id ? ";" : "?";
306 snprintf(buf, sizeof(buf), "%s%s%sconnection=%u", processinfo_path,
307 server_id_string, uri_sep, i + 1);
308 g_pool[i]->set_service_uri("mysql", processinfo_host, processinfo_port,
309 buf);
310 }
311 g_pool[i]->set_optimized_node_selection(optimized_node_select);
312 g_pool[i]->set_recv_thread_activation_threshold(
313 recv_thread_activation_threshold);
314 g_pool[i]->set_data_node_neighbour(data_node_neighbour);
315 }
316 }
317
318 if (res == 0) {
319 connect_callback();
320 for (uint i = 0; i < g_pool_alloc; i++) {
321 int node_id = g_pool[i]->node_id();
322 if (node_id == 0) {
323 // not connected to mgmd yet, try again
324 g_pool[i]->connect(0, 0, 0);
325 if (g_pool[i]->node_id() == 0) {
326 ndb_log_info("connection[%u], starting connect thread", i);
327 g_pool[i]->start_connect_thread();
328 continue;
329 }
330 node_id = g_pool[i]->node_id();
331 }
332 DBUG_PRINT("info", ("NDBCLUSTER storage engine (%u) at %s on port %d", i,
333 g_pool[i]->get_connected_host(),
334 g_pool[i]->get_connected_port()));
335
336 Uint64 waited;
337 do {
338 res = g_pool[i]->wait_until_ready(1, 1);
339 const NDB_TICKS now = NdbTick_getCurrentTicks();
340 waited = NdbTick_Elapsed(start, now).seconds();
341 } while (res != 0 && waited < wait_connected);
342
343 const char *msg = 0;
344 if (res == 0) {
345 msg = "all storage nodes connected";
346 } else if (res > 0) {
347 msg = "some storage nodes connected";
348 } else if (res < 0) {
349 msg = "no storage nodes connected (timed out)";
350 }
351 ndb_log_info("connection[%u], NodeID: %d, %s", i, node_id, msg);
352 }
353 } else if (res == 1) {
354 for (uint i = 0; i < g_pool_alloc; i++) {
355 if (g_pool[i]->start_connect_thread(i == 0 ? connect_callback : NULL)) {
356 ndb_log_error("connection[%u], failed to start connect thread", i);
357 DBUG_PRINT("error",
358 ("g_ndb_cluster_connection->start_connect_thread()"));
359 return -1;
360 }
361 }
362 #ifndef DBUG_OFF
363 {
364 char buf[1024];
365 DBUG_PRINT("info", ("NDBCLUSTER storage engine not started, "
366 "will connect using %s",
367 g_ndb_cluster_connection->get_connectstring(
368 buf, sizeof(buf))));
369 }
370 #endif
371 } else {
372 DBUG_ASSERT(res == -1);
373 DBUG_PRINT("error", ("permanent error"));
374 ndb_log_error("error (%u) %s", g_ndb_cluster_connection->get_latest_error(),
375 g_ndb_cluster_connection->get_latest_error_msg());
376 return -1;
377 }
378 return 0;
379 }
380
ndbcluster_disconnect(void)381 void ndbcluster_disconnect(void) {
382 DBUG_TRACE;
383 if (g_ndb) delete g_ndb;
384 g_ndb = NULL;
385 {
386 if (g_pool) {
387 /* first in pool is the main one, wait with release */
388 for (uint i = 1; i < g_pool_alloc; i++) {
389 if (g_pool[i]) delete g_pool[i];
390 }
391 my_free(g_pool);
392 mysql_mutex_destroy(&g_pool_mutex);
393 g_pool = 0;
394 }
395 g_pool_alloc = 0;
396 g_pool_pos = 0;
397 }
398 if (g_ndb_cluster_connection) delete g_ndb_cluster_connection;
399 g_ndb_cluster_connection = NULL;
400 }
401
ndb_get_cluster_connection()402 Ndb_cluster_connection *ndb_get_cluster_connection() {
403 mysql_mutex_lock(&g_pool_mutex);
404 Ndb_cluster_connection *connection = g_pool[g_pool_pos];
405 g_pool_pos++;
406 if (g_pool_pos == g_pool_alloc) g_pool_pos = 0;
407 mysql_mutex_unlock(&g_pool_mutex);
408 return connection;
409 }
410
ndb_get_latest_trans_gci()411 ulonglong ndb_get_latest_trans_gci() {
412 ulonglong val = *g_ndb_cluster_connection->get_latest_trans_gci();
413 for (uint i = 1; i < g_pool_alloc; i++) {
414 ulonglong tmp = *g_pool[i]->get_latest_trans_gci();
415 if (tmp > val) val = tmp;
416 }
417 return val;
418 }
419
ndb_set_latest_trans_gci(ulonglong val)420 void ndb_set_latest_trans_gci(ulonglong val) {
421 for (uint i = 0; i < g_pool_alloc; i++) {
422 *g_pool[i]->get_latest_trans_gci() = val;
423 }
424 }
425
ndb_has_node_id(uint id)426 int ndb_has_node_id(uint id) {
427 for (uint i = 0; i < g_pool_alloc; i++) {
428 if (id == g_pool[i]->node_id()) return 1;
429 }
430 return 0;
431 }
432
ndb_set_recv_thread_activation_threshold(Uint32 threshold)433 int ndb_set_recv_thread_activation_threshold(Uint32 threshold) {
434 for (uint i = 0; i < g_pool_alloc; i++) {
435 g_pool[i]->set_recv_thread_activation_threshold(threshold);
436 }
437 return 0;
438 }
439
ndb_set_recv_thread_cpu(Uint16 * cpuid_array,Uint32 cpuid_array_size)440 int ndb_set_recv_thread_cpu(Uint16 *cpuid_array, Uint32 cpuid_array_size) {
441 int ret_code = 0;
442 Uint32 num_cpu_needed = g_pool_alloc;
443
444 if (cpuid_array_size == 0) {
445 for (Uint32 i = 0; i < g_pool_alloc; i++) {
446 ret_code = g_pool[i]->unset_recv_thread_cpu(0);
447 }
448 return ret_code;
449 }
450
451 if (cpuid_array_size < num_cpu_needed) {
452 /* Ignore cpu masks that is too short */
453 ndb_log_info(
454 "Ignored receive thread CPU mask, mask too short,"
455 " %u CPUs needed in mask, only %u CPUs provided",
456 num_cpu_needed, cpuid_array_size);
457 return 1;
458 }
459 for (Uint32 i = 0; i < g_pool_alloc; i++) {
460 ret_code = g_pool[i]->set_recv_thread_cpu(&cpuid_array[i], (Uint32)1, 0);
461 }
462 return ret_code;
463 }
464
ndb_set_data_node_neighbour(ulong data_node_neighbour)465 void ndb_set_data_node_neighbour(ulong data_node_neighbour) {
466 for (uint i = 0; i < g_pool_alloc; i++)
467 g_pool[i]->set_data_node_neighbour(data_node_neighbour);
468 }
469
ndb_get_connection_stats(Uint64 * statsArr)470 void ndb_get_connection_stats(Uint64 *statsArr) {
471 Uint64 connectionStats[Ndb::NumClientStatistics];
472 memset(statsArr, 0, sizeof(connectionStats));
473
474 for (uint i = 0; i < g_pool_alloc; i++) {
475 g_pool[i]->collect_client_stats(connectionStats, Ndb::NumClientStatistics);
476
477 for (Uint32 s = 0; s < Ndb::NumClientStatistics; s++)
478 statsArr[s] += connectionStats[s];
479 }
480 }
481
482 static ST_FIELD_INFO ndb_transid_mysql_connection_map_fields_info[] = {
483 {"mysql_connection_id", MY_INT64_NUM_DECIMAL_DIGITS, MYSQL_TYPE_LONGLONG, 0,
484 MY_I_S_UNSIGNED, "", 0},
485
486 {"node_id", MY_INT64_NUM_DECIMAL_DIGITS, MYSQL_TYPE_LONG, 0,
487 MY_I_S_UNSIGNED, "", 0},
488 {"ndb_transid", MY_INT64_NUM_DECIMAL_DIGITS, MYSQL_TYPE_LONGLONG, 0,
489 MY_I_S_UNSIGNED, "", 0},
490
491 {0, 0, MYSQL_TYPE_NULL, 0, 0, "", 0}};
492
ndb_transid_mysql_connection_map_fill_table(THD * thd,TABLE_LIST * tables,Item *)493 static int ndb_transid_mysql_connection_map_fill_table(THD *thd,
494 TABLE_LIST *tables,
495 Item *) {
496 DBUG_TRACE;
497
498 const bool all = (check_global_access(thd, PROCESS_ACL) == 0);
499 const ulonglong self = thd_get_thread_id(thd);
500
501 TABLE *table = tables->table;
502 for (uint i = 0; i < g_pool_alloc; i++) {
503 if (g_pool[i]) {
504 g_pool[i]->lock_ndb_objects();
505 const Ndb *p = g_pool[i]->get_next_ndb_object(0);
506 while (p) {
507 Uint64 connection_id = p->getCustomData64();
508 if ((connection_id == self) || all) {
509 table->field[0]->set_notnull();
510 table->field[0]->store(p->getCustomData64(), true);
511 table->field[1]->set_notnull();
512 table->field[1]->store(g_pool[i]->node_id());
513 table->field[2]->set_notnull();
514 table->field[2]->store(p->getNextTransactionId(), true);
515 schema_table_store_record(thd, table);
516 }
517 p = g_pool[i]->get_next_ndb_object(p);
518 }
519 g_pool[i]->unlock_ndb_objects();
520 }
521 }
522
523 return 0;
524 }
525
ndb_transid_mysql_connection_map_init(void * p)526 static int ndb_transid_mysql_connection_map_init(void *p) {
527 DBUG_TRACE;
528 ST_SCHEMA_TABLE *schema = reinterpret_cast<ST_SCHEMA_TABLE *>(p);
529 schema->fields_info = ndb_transid_mysql_connection_map_fields_info;
530 schema->fill_table = ndb_transid_mysql_connection_map_fill_table;
531 return 0;
532 }
533
534 static struct st_mysql_information_schema i_s_info = {
535 MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION};
536
537 /*
538 information_schema table plugin providing a list of MySQL
539 connection ID's and their corresponding NDB transaction ID
540 */
541 struct st_mysql_plugin ndb_transid_mysql_connection_map_table = {
542 MYSQL_INFORMATION_SCHEMA_PLUGIN,
543 &i_s_info,
544 "ndb_transid_mysql_connection_map",
545 "Oracle Corporation",
546 "Map between MySQL connection ID and NDB transaction ID",
547 PLUGIN_LICENSE_GPL,
548 ndb_transid_mysql_connection_map_init,
549 NULL,
550 NULL,
551 0x0001,
552 NULL,
553 NULL,
554 NULL,
555 0};
556