1 /*
2 Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "ha_ndbcluster_glue.h"
26 #include <ndbapi/NdbApi.hpp>
27 #include <portlib/NdbTick.h>
28 #include "ha_ndbcluster_connection.h"
29
30 Ndb* g_ndb= NULL;
31 Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
32 static Ndb_cluster_connection **g_pool= NULL;
33 static uint g_pool_alloc= 0;
34 static uint g_pool_pos= 0;
35 static native_mutex_t g_pool_mutex;
36
37 /*
38 Global flag in ndbapi to specify if api should wait to connect
39 until dict cache is clean.
40
41 Set to 1 below to not wait, as ndb handler makes sure that no
42 old ndb objects are used.
43 */
44 extern int global_flag_skip_waiting_for_clean_cache;
45
46 int
ndbcluster_connect(int (* connect_callback)(void),ulong wait_connected,uint connection_pool_size,bool optimized_node_select,const char * connect_string,uint force_nodeid,uint recv_thread_activation_threshold)47 ndbcluster_connect(int (*connect_callback)(void),
48 ulong wait_connected, // Timeout in seconds
49 uint connection_pool_size,
50 bool optimized_node_select,
51 const char* connect_string,
52 uint force_nodeid,
53 uint recv_thread_activation_threshold)
54 {
55 #ifndef EMBEDDED_LIBRARY
56 const char mysqld_name[]= "mysqld";
57 #else
58 const char mysqld_name[]= "libmysqld";
59 #endif
60 int res;
61 DBUG_ENTER("ndbcluster_connect");
62 DBUG_PRINT("enter", ("connect_string: %s, force_nodeid: %d",
63 connect_string, force_nodeid));
64
65 global_flag_skip_waiting_for_clean_cache= 1;
66
67 g_ndb_cluster_connection=
68 new Ndb_cluster_connection(connect_string, force_nodeid);
69 if (!g_ndb_cluster_connection)
70 {
71 sql_print_error("NDB: failed to allocate global ndb cluster connection");
72 DBUG_PRINT("error", ("Ndb_cluster_connection(%s)", connect_string));
73 set_my_errno(HA_ERR_OUT_OF_MEM);
74 DBUG_RETURN(-1);
75 }
76 {
77 char buf[128];
78 my_snprintf(buf, sizeof(buf), "%s --server-id=%lu",
79 mysqld_name, server_id);
80 g_ndb_cluster_connection->set_name(buf);
81 }
82 g_ndb_cluster_connection->set_optimized_node_selection(optimized_node_select);
83 g_ndb_cluster_connection->set_recv_thread_activation_threshold(
84 recv_thread_activation_threshold);
85
86 // Create a Ndb object to open the connection to NDB
87 if ( (g_ndb= new Ndb(g_ndb_cluster_connection, "sys")) == 0 )
88 {
89 sql_print_error("NDB: failed to allocate global ndb object");
90 DBUG_PRINT("error", ("failed to create global ndb object"));
91 set_my_errno(HA_ERR_OUT_OF_MEM);
92 DBUG_RETURN(-1);
93 }
94 if (g_ndb->init() != 0)
95 {
96 DBUG_PRINT("error", ("%d message: %s",
97 g_ndb->getNdbError().code,
98 g_ndb->getNdbError().message));
99 DBUG_RETURN(-1);
100 }
101
102 /* Connect to management server */
103
104 const NDB_TICKS start= NdbTick_getCurrentTicks();
105
106 while ((res= g_ndb_cluster_connection->connect(0,0,0)) == 1)
107 {
108 const NDB_TICKS now = NdbTick_getCurrentTicks();
109 if (NdbTick_Elapsed(start,now).seconds() > wait_connected)
110 break;
111 do_retry_sleep(100);
112 if (abort_loop)
113 DBUG_RETURN(-1);
114 }
115
116 {
117 g_pool_alloc= connection_pool_size;
118 g_pool= (Ndb_cluster_connection**)
119 my_malloc(PSI_INSTRUMENT_ME,
120 g_pool_alloc * sizeof(Ndb_cluster_connection*),
121 MYF(MY_WME | MY_ZEROFILL));
122 native_mutex_init(&g_pool_mutex,
123 MY_MUTEX_INIT_FAST);
124 g_pool[0]= g_ndb_cluster_connection;
125 for (uint i= 1; i < g_pool_alloc; i++)
126 {
127 if ((g_pool[i]=
128 new Ndb_cluster_connection(connect_string,
129 g_ndb_cluster_connection)) == 0)
130 {
131 sql_print_error("NDB[%u]: failed to allocate cluster connect object",
132 i);
133 DBUG_PRINT("error",("Ndb_cluster_connection[%u](%s)",
134 i, connect_string));
135 DBUG_RETURN(-1);
136 }
137 {
138 char buf[128];
139 my_snprintf(buf, sizeof(buf), "%s --server-id=%lu (connection %u)",
140 mysqld_name, server_id, i+1);
141 g_pool[i]->set_name(buf);
142 }
143 g_pool[i]->set_optimized_node_selection(optimized_node_select);
144 g_pool[i]->set_recv_thread_activation_threshold(recv_thread_activation_threshold);
145 }
146 }
147
148 if (res == 0)
149 {
150 connect_callback();
151 for (uint i= 0; i < g_pool_alloc; i++)
152 {
153 int node_id= g_pool[i]->node_id();
154 if (node_id == 0)
155 {
156 // not connected to mgmd yet, try again
157 g_pool[i]->connect(0,0,0);
158 if (g_pool[i]->node_id() == 0)
159 {
160 sql_print_warning("NDB[%u]: starting connect thread", i);
161 g_pool[i]->start_connect_thread();
162 continue;
163 }
164 node_id= g_pool[i]->node_id();
165 }
166 DBUG_PRINT("info",
167 ("NDBCLUSTER storage engine (%u) at %s on port %d", i,
168 g_pool[i]->get_connected_host(),
169 g_pool[i]->get_connected_port()));
170
171 Uint64 waited;
172 do
173 {
174 res= g_pool[i]->wait_until_ready(1, 1);
175 const NDB_TICKS now = NdbTick_getCurrentTicks();
176 waited = NdbTick_Elapsed(start,now).seconds();
177 } while (res != 0 && waited < wait_connected);
178
179 const char *msg= 0;
180 if (res == 0)
181 {
182 msg= "all storage nodes connected";
183 }
184 else if (res > 0)
185 {
186 msg= "some storage nodes connected";
187 }
188 else if (res < 0)
189 {
190 msg= "no storage nodes connected (timed out)";
191 }
192 sql_print_information("NDB[%u]: NodeID: %d, %s",
193 i, node_id, msg);
194 }
195 }
196 else if (res == 1)
197 {
198 for (uint i= 0; i < g_pool_alloc; i++)
199 {
200 if (g_pool[i]->
201 start_connect_thread(i == 0 ? connect_callback : NULL))
202 {
203 sql_print_error("NDB[%u]: failed to start connect thread", i);
204 DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
205 DBUG_RETURN(-1);
206 }
207 }
208 #ifndef DBUG_OFF
209 {
210 char buf[1024];
211 DBUG_PRINT("info",
212 ("NDBCLUSTER storage engine not started, "
213 "will connect using %s",
214 g_ndb_cluster_connection->
215 get_connectstring(buf,sizeof(buf))));
216 }
217 #endif
218 }
219 else
220 {
221 DBUG_ASSERT(res == -1);
222 DBUG_PRINT("error", ("permanent error"));
223 sql_print_error("NDB: error (%u) %s",
224 g_ndb_cluster_connection->get_latest_error(),
225 g_ndb_cluster_connection->get_latest_error_msg());
226 DBUG_RETURN(-1);
227 }
228 DBUG_RETURN(0);
229 }
230
ndbcluster_disconnect(void)231 void ndbcluster_disconnect(void)
232 {
233 DBUG_ENTER("ndbcluster_disconnect");
234 if (g_ndb)
235 delete g_ndb;
236 g_ndb= NULL;
237 {
238 if (g_pool)
239 {
240 /* first in pool is the main one, wait with release */
241 for (uint i= 1; i < g_pool_alloc; i++)
242 {
243 if (g_pool[i])
244 delete g_pool[i];
245 }
246 my_free((uchar*) g_pool, MYF(MY_ALLOW_ZERO_PTR));
247 native_mutex_destroy(&g_pool_mutex);
248 g_pool= 0;
249 }
250 g_pool_alloc= 0;
251 g_pool_pos= 0;
252 }
253 if (g_ndb_cluster_connection)
254 delete g_ndb_cluster_connection;
255 g_ndb_cluster_connection= NULL;
256 DBUG_VOID_RETURN;
257 }
258
ndb_get_cluster_connection()259 Ndb_cluster_connection *ndb_get_cluster_connection()
260 {
261 native_mutex_lock(&g_pool_mutex);
262 Ndb_cluster_connection *connection= g_pool[g_pool_pos];
263 g_pool_pos++;
264 if (g_pool_pos == g_pool_alloc)
265 g_pool_pos= 0;
266 native_mutex_unlock(&g_pool_mutex);
267 return connection;
268 }
269
ndb_get_latest_trans_gci()270 ulonglong ndb_get_latest_trans_gci()
271 {
272 ulonglong val= *g_ndb_cluster_connection->get_latest_trans_gci();
273 for (uint i= 1; i < g_pool_alloc; i++)
274 {
275 ulonglong tmp= *g_pool[i]->get_latest_trans_gci();
276 if (tmp > val)
277 val= tmp;
278 }
279 return val;
280 }
281
ndb_set_latest_trans_gci(ulonglong val)282 void ndb_set_latest_trans_gci(ulonglong val)
283 {
284 for (uint i= 0; i < g_pool_alloc; i++)
285 {
286 *g_pool[i]->get_latest_trans_gci()= val;
287 }
288 }
289
ndb_has_node_id(uint id)290 int ndb_has_node_id(uint id)
291 {
292 for (uint i= 0; i < g_pool_alloc; i++)
293 {
294 if (id == g_pool[i]->node_id())
295 return 1;
296 }
297 return 0;
298 }
299
ndb_set_recv_thread_activation_threshold(Uint32 threshold)300 int ndb_set_recv_thread_activation_threshold(Uint32 threshold)
301 {
302 for (uint i= 0; i < g_pool_alloc; i++)
303 {
304 g_pool[i]->set_recv_thread_activation_threshold(threshold);
305 }
306 return 0;
307 }
308
309 int
ndb_set_recv_thread_cpu(Uint16 * cpuid_array,Uint32 cpuid_array_size)310 ndb_set_recv_thread_cpu(Uint16 *cpuid_array,
311 Uint32 cpuid_array_size)
312 {
313 Uint32 num_cpu_needed = g_pool_alloc;
314
315 if (cpuid_array_size == 0)
316 {
317 for (Uint32 i = 0; i < g_pool_alloc; i++)
318 {
319 g_pool[i]->unset_recv_thread_cpu(0);
320 }
321 return 0;
322 }
323
324 if (cpuid_array_size < num_cpu_needed)
325 {
326 /* Ignore cpu masks that is too short */
327 sql_print_information(
328 "Ignored receive thread CPU mask, mask too short,"
329 " %u CPUs needed in mask, only %u CPUs provided",
330 num_cpu_needed, cpuid_array_size);
331 return 0;
332 }
333 for (Uint32 i = 0; i < g_pool_alloc; i++)
334 {
335 g_pool[i]->set_recv_thread_cpu(&cpuid_array[i],
336 (Uint32)1,
337 0);
338 }
339 return 0;
340 }
341
ndb_get_connection_stats(Uint64 * statsArr)342 void ndb_get_connection_stats(Uint64* statsArr)
343 {
344 Uint64 connectionStats[ Ndb::NumClientStatistics ];
345 memset(statsArr, 0, sizeof(connectionStats));
346
347 for (uint i=0; i < g_pool_alloc; i++)
348 {
349 g_pool[i]->collect_client_stats(connectionStats, Ndb::NumClientStatistics);
350
351 for (Uint32 s=0; s < Ndb::NumClientStatistics; s++)
352 statsArr[s]+= connectionStats[s];
353 }
354 }
355
356 static ST_FIELD_INFO ndb_transid_mysql_connection_map_fields_info[] =
357 {
358 {
359 "mysql_connection_id",
360 MY_INT64_NUM_DECIMAL_DIGITS,
361 MYSQL_TYPE_LONGLONG,
362 0,
363 MY_I_S_UNSIGNED,
364 "",
365 SKIP_OPEN_TABLE
366 },
367
368 {
369 "node_id",
370 MY_INT64_NUM_DECIMAL_DIGITS,
371 MYSQL_TYPE_LONG,
372 0,
373 MY_I_S_UNSIGNED,
374 "",
375 SKIP_OPEN_TABLE
376 },
377 {
378 "ndb_transid",
379 MY_INT64_NUM_DECIMAL_DIGITS,
380 MYSQL_TYPE_LONGLONG,
381 0,
382 MY_I_S_UNSIGNED,
383 "",
384 SKIP_OPEN_TABLE
385 },
386
387 { 0, 0, MYSQL_TYPE_NULL, 0, 0, "", SKIP_OPEN_TABLE }
388 };
389
390 #include <mysql/innodb_priv.h>
391
392 static
393 int
ndb_transid_mysql_connection_map_fill_table(THD * thd,TABLE_LIST * tables,Item *)394 ndb_transid_mysql_connection_map_fill_table(THD* thd, TABLE_LIST* tables,
395 Item*)
396 {
397 DBUG_ENTER("ndb_transid_mysql_connection_map_fill_table");
398
399 const bool all = (check_global_access(thd, PROCESS_ACL) == 0);
400 const ulonglong self = thd_get_thread_id(thd);
401
402 TABLE* table= tables->table;
403 for (uint i = 0; i<g_pool_alloc; i++)
404 {
405 if (g_pool[i])
406 {
407 g_pool[i]->lock_ndb_objects();
408 const Ndb * p = g_pool[i]->get_next_ndb_object(0);
409 while (p)
410 {
411 Uint64 connection_id = p->getCustomData64();
412 if ((connection_id == self) || all)
413 {
414 table->field[0]->set_notnull();
415 table->field[0]->store(p->getCustomData64(), true);
416 table->field[1]->set_notnull();
417 table->field[1]->store(g_pool[i]->node_id());
418 table->field[2]->set_notnull();
419 table->field[2]->store(p->getNextTransactionId(), true);
420 schema_table_store_record(thd, table);
421 }
422 p = g_pool[i]->get_next_ndb_object(p);
423 }
424 g_pool[i]->unlock_ndb_objects();
425 }
426 }
427
428 DBUG_RETURN(0);
429 }
430
431 static
432 int
ndb_transid_mysql_connection_map_init(void * p)433 ndb_transid_mysql_connection_map_init(void *p)
434 {
435 DBUG_ENTER("ndb_transid_mysql_connection_map_init");
436 ST_SCHEMA_TABLE* schema = reinterpret_cast<ST_SCHEMA_TABLE*>(p);
437 schema->fields_info = ndb_transid_mysql_connection_map_fields_info;
438 schema->fill_table = ndb_transid_mysql_connection_map_fill_table;
439 DBUG_RETURN(0);
440 }
441
442 static
443 int
ndb_transid_mysql_connection_map_deinit(void * p)444 ndb_transid_mysql_connection_map_deinit(void *p)
445 {
446 DBUG_ENTER("ndb_transid_mysql_connection_map_deinit");
447 DBUG_RETURN(0);
448 }
449
450 #include <mysql/plugin.h>
451 static struct st_mysql_information_schema i_s_info =
452 {
453 MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION
454 };
455
456 struct st_mysql_plugin i_s_ndb_transid_mysql_connection_map_plugin =
457 {
458 MYSQL_INFORMATION_SCHEMA_PLUGIN,
459 &i_s_info,
460 "ndb_transid_mysql_connection_map",
461 "Oracle Corporation",
462 "Map between mysql connection id and ndb transaction id",
463 PLUGIN_LICENSE_GPL,
464 ndb_transid_mysql_connection_map_init,
465 ndb_transid_mysql_connection_map_deinit,
466 0x0001,
467 NULL,
468 NULL,
469 NULL,
470 0
471 };
472