1 /*
2    Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "ha_ndbcluster_glue.h"
26 #include <ndbapi/NdbApi.hpp>
27 #include <portlib/NdbTick.h>
28 #include "ha_ndbcluster_connection.h"
29 
30 Ndb* g_ndb= NULL;
31 Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
32 static Ndb_cluster_connection **g_pool= NULL;
33 static uint g_pool_alloc= 0;
34 static uint g_pool_pos= 0;
35 static native_mutex_t g_pool_mutex;
36 
37 /*
38   Global flag in ndbapi to specify if api should wait to connect
39   until dict cache is clean.
40 
41   Set to 1 below to not wait, as ndb handler makes sure that no
42   old ndb objects are used.
43 */
44 extern int global_flag_skip_waiting_for_clean_cache;
45 
46 int
ndbcluster_connect(int (* connect_callback)(void),ulong wait_connected,uint connection_pool_size,bool optimized_node_select,const char * connect_string,uint force_nodeid,uint recv_thread_activation_threshold)47 ndbcluster_connect(int (*connect_callback)(void),
48                    ulong wait_connected, // Timeout in seconds
49                    uint connection_pool_size,
50                    bool optimized_node_select,
51                    const char* connect_string,
52                    uint force_nodeid,
53                    uint recv_thread_activation_threshold)
54 {
55 #ifndef EMBEDDED_LIBRARY
56   const char mysqld_name[]= "mysqld";
57 #else
58   const char mysqld_name[]= "libmysqld";
59 #endif
60   int res;
61   DBUG_ENTER("ndbcluster_connect");
62   DBUG_PRINT("enter", ("connect_string: %s, force_nodeid: %d",
63                        connect_string, force_nodeid));
64 
65   global_flag_skip_waiting_for_clean_cache= 1;
66 
67   g_ndb_cluster_connection=
68     new Ndb_cluster_connection(connect_string, force_nodeid);
69   if (!g_ndb_cluster_connection)
70   {
71     sql_print_error("NDB: failed to allocate global ndb cluster connection");
72     DBUG_PRINT("error", ("Ndb_cluster_connection(%s)", connect_string));
73     set_my_errno(HA_ERR_OUT_OF_MEM);
74     DBUG_RETURN(-1);
75   }
76   {
77     char buf[128];
78     my_snprintf(buf, sizeof(buf), "%s --server-id=%lu",
79                 mysqld_name, server_id);
80     g_ndb_cluster_connection->set_name(buf);
81   }
82   g_ndb_cluster_connection->set_optimized_node_selection(optimized_node_select);
83   g_ndb_cluster_connection->set_recv_thread_activation_threshold(
84                                       recv_thread_activation_threshold);
85 
86   // Create a Ndb object to open the connection  to NDB
87   if ( (g_ndb= new Ndb(g_ndb_cluster_connection, "sys")) == 0 )
88   {
89     sql_print_error("NDB: failed to allocate global ndb object");
90     DBUG_PRINT("error", ("failed to create global ndb object"));
91     set_my_errno(HA_ERR_OUT_OF_MEM);
92     DBUG_RETURN(-1);
93   }
94   if (g_ndb->init() != 0)
95   {
96     DBUG_PRINT("error", ("%d  message: %s",
97                          g_ndb->getNdbError().code,
98                          g_ndb->getNdbError().message));
99     DBUG_RETURN(-1);
100   }
101 
102   /* Connect to management server */
103 
104   const NDB_TICKS start= NdbTick_getCurrentTicks();
105 
106   while ((res= g_ndb_cluster_connection->connect(0,0,0)) == 1)
107   {
108     const NDB_TICKS now = NdbTick_getCurrentTicks();
109     if (NdbTick_Elapsed(start,now).seconds() > wait_connected)
110       break;
111     do_retry_sleep(100);
112     if (abort_loop)
113       DBUG_RETURN(-1);
114   }
115 
116   {
117     g_pool_alloc= connection_pool_size;
118     g_pool= (Ndb_cluster_connection**)
119       my_malloc(PSI_INSTRUMENT_ME,
120                 g_pool_alloc * sizeof(Ndb_cluster_connection*),
121                 MYF(MY_WME | MY_ZEROFILL));
122     native_mutex_init(&g_pool_mutex,
123                        MY_MUTEX_INIT_FAST);
124     g_pool[0]= g_ndb_cluster_connection;
125     for (uint i= 1; i < g_pool_alloc; i++)
126     {
127       if ((g_pool[i]=
128            new Ndb_cluster_connection(connect_string,
129                                       g_ndb_cluster_connection)) == 0)
130       {
131         sql_print_error("NDB[%u]: failed to allocate cluster connect object",
132                         i);
133         DBUG_PRINT("error",("Ndb_cluster_connection[%u](%s)",
134                             i, connect_string));
135         DBUG_RETURN(-1);
136       }
137       {
138         char buf[128];
139         my_snprintf(buf, sizeof(buf), "%s --server-id=%lu (connection %u)",
140                     mysqld_name, server_id, i+1);
141         g_pool[i]->set_name(buf);
142       }
143       g_pool[i]->set_optimized_node_selection(optimized_node_select);
144       g_pool[i]->set_recv_thread_activation_threshold(recv_thread_activation_threshold);
145     }
146   }
147 
148   if (res == 0)
149   {
150     connect_callback();
151     for (uint i= 0; i < g_pool_alloc; i++)
152     {
153       int node_id= g_pool[i]->node_id();
154       if (node_id == 0)
155       {
156         // not connected to mgmd yet, try again
157         g_pool[i]->connect(0,0,0);
158         if (g_pool[i]->node_id() == 0)
159         {
160           sql_print_warning("NDB[%u]: starting connect thread", i);
161           g_pool[i]->start_connect_thread();
162           continue;
163         }
164         node_id= g_pool[i]->node_id();
165       }
166       DBUG_PRINT("info",
167                  ("NDBCLUSTER storage engine (%u) at %s on port %d", i,
168                   g_pool[i]->get_connected_host(),
169                   g_pool[i]->get_connected_port()));
170 
171       Uint64 waited;
172       do
173       {
174         res= g_pool[i]->wait_until_ready(1, 1);
175         const NDB_TICKS now = NdbTick_getCurrentTicks();
176         waited = NdbTick_Elapsed(start,now).seconds();
177       } while (res != 0 && waited < wait_connected);
178 
179       const char *msg= 0;
180       if (res == 0)
181       {
182         msg= "all storage nodes connected";
183       }
184       else if (res > 0)
185       {
186         msg= "some storage nodes connected";
187       }
188       else if (res < 0)
189       {
190         msg= "no storage nodes connected (timed out)";
191       }
192       sql_print_information("NDB[%u]: NodeID: %d, %s",
193                             i, node_id, msg);
194     }
195   }
196   else if (res == 1)
197   {
198     for (uint i= 0; i < g_pool_alloc; i++)
199     {
200       if (g_pool[i]->
201           start_connect_thread(i == 0 ? connect_callback :  NULL))
202       {
203         sql_print_error("NDB[%u]: failed to start connect thread", i);
204         DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
205         DBUG_RETURN(-1);
206       }
207     }
208 #ifndef DBUG_OFF
209     {
210       char buf[1024];
211       DBUG_PRINT("info",
212                  ("NDBCLUSTER storage engine not started, "
213                   "will connect using %s",
214                   g_ndb_cluster_connection->
215                   get_connectstring(buf,sizeof(buf))));
216     }
217 #endif
218   }
219   else
220   {
221     DBUG_ASSERT(res == -1);
222     DBUG_PRINT("error", ("permanent error"));
223     sql_print_error("NDB: error (%u) %s",
224                     g_ndb_cluster_connection->get_latest_error(),
225                     g_ndb_cluster_connection->get_latest_error_msg());
226     DBUG_RETURN(-1);
227   }
228   DBUG_RETURN(0);
229 }
230 
ndbcluster_disconnect(void)231 void ndbcluster_disconnect(void)
232 {
233   DBUG_ENTER("ndbcluster_disconnect");
234   if (g_ndb)
235     delete g_ndb;
236   g_ndb= NULL;
237   {
238     if (g_pool)
239     {
240       /* first in pool is the main one, wait with release */
241       for (uint i= 1; i < g_pool_alloc; i++)
242       {
243         if (g_pool[i])
244           delete g_pool[i];
245       }
246       my_free((uchar*) g_pool, MYF(MY_ALLOW_ZERO_PTR));
247       native_mutex_destroy(&g_pool_mutex);
248       g_pool= 0;
249     }
250     g_pool_alloc= 0;
251     g_pool_pos= 0;
252   }
253   if (g_ndb_cluster_connection)
254     delete g_ndb_cluster_connection;
255   g_ndb_cluster_connection= NULL;
256   DBUG_VOID_RETURN;
257 }
258 
ndb_get_cluster_connection()259 Ndb_cluster_connection *ndb_get_cluster_connection()
260 {
261   native_mutex_lock(&g_pool_mutex);
262   Ndb_cluster_connection *connection= g_pool[g_pool_pos];
263   g_pool_pos++;
264   if (g_pool_pos == g_pool_alloc)
265     g_pool_pos= 0;
266   native_mutex_unlock(&g_pool_mutex);
267   return connection;
268 }
269 
ndb_get_latest_trans_gci()270 ulonglong ndb_get_latest_trans_gci()
271 {
272   ulonglong val= *g_ndb_cluster_connection->get_latest_trans_gci();
273   for (uint i= 1; i < g_pool_alloc; i++)
274   {
275     ulonglong tmp= *g_pool[i]->get_latest_trans_gci();
276     if (tmp > val)
277       val= tmp;
278   }
279   return val;
280 }
281 
ndb_set_latest_trans_gci(ulonglong val)282 void ndb_set_latest_trans_gci(ulonglong val)
283 {
284   for (uint i= 0; i < g_pool_alloc; i++)
285   {
286     *g_pool[i]->get_latest_trans_gci()= val;
287   }
288 }
289 
ndb_has_node_id(uint id)290 int ndb_has_node_id(uint id)
291 {
292   for (uint i= 0; i < g_pool_alloc; i++)
293   {
294     if (id == g_pool[i]->node_id())
295       return 1;
296   }
297   return 0;
298 }
299 
ndb_set_recv_thread_activation_threshold(Uint32 threshold)300 int ndb_set_recv_thread_activation_threshold(Uint32 threshold)
301 {
302   for (uint i= 0; i < g_pool_alloc; i++)
303   {
304     g_pool[i]->set_recv_thread_activation_threshold(threshold);
305   }
306   return 0;
307 }
308 
309 int
ndb_set_recv_thread_cpu(Uint16 * cpuid_array,Uint32 cpuid_array_size)310 ndb_set_recv_thread_cpu(Uint16 *cpuid_array,
311                         Uint32 cpuid_array_size)
312 {
313   Uint32 num_cpu_needed = g_pool_alloc;
314 
315   if (cpuid_array_size == 0)
316   {
317     for (Uint32 i = 0; i < g_pool_alloc; i++)
318     {
319       g_pool[i]->unset_recv_thread_cpu(0);
320     }
321     return 0;
322   }
323 
324   if (cpuid_array_size < num_cpu_needed)
325   {
326     /* Ignore cpu masks that is too short */
327     sql_print_information(
328       "Ignored receive thread CPU mask, mask too short,"
329       " %u CPUs needed in mask, only %u CPUs provided",
330       num_cpu_needed, cpuid_array_size);
331     return 0;
332   }
333   for (Uint32 i = 0; i < g_pool_alloc; i++)
334   {
335     g_pool[i]->set_recv_thread_cpu(&cpuid_array[i],
336                                    (Uint32)1,
337                                    0);
338   }
339   return 0;
340 }
341 
ndb_get_connection_stats(Uint64 * statsArr)342 void ndb_get_connection_stats(Uint64* statsArr)
343 {
344   Uint64 connectionStats[ Ndb::NumClientStatistics ];
345   memset(statsArr, 0, sizeof(connectionStats));
346 
347   for (uint i=0; i < g_pool_alloc; i++)
348   {
349     g_pool[i]->collect_client_stats(connectionStats, Ndb::NumClientStatistics);
350 
351     for (Uint32 s=0; s < Ndb::NumClientStatistics; s++)
352       statsArr[s]+= connectionStats[s];
353   }
354 }
355 
356 static ST_FIELD_INFO ndb_transid_mysql_connection_map_fields_info[] =
357 {
358   {
359     "mysql_connection_id",
360     MY_INT64_NUM_DECIMAL_DIGITS,
361     MYSQL_TYPE_LONGLONG,
362     0,
363     MY_I_S_UNSIGNED,
364     "",
365     SKIP_OPEN_TABLE
366   },
367 
368   {
369     "node_id",
370     MY_INT64_NUM_DECIMAL_DIGITS,
371     MYSQL_TYPE_LONG,
372     0,
373     MY_I_S_UNSIGNED,
374     "",
375     SKIP_OPEN_TABLE
376   },
377   {
378     "ndb_transid",
379     MY_INT64_NUM_DECIMAL_DIGITS,
380     MYSQL_TYPE_LONGLONG,
381     0,
382     MY_I_S_UNSIGNED,
383     "",
384     SKIP_OPEN_TABLE
385   },
386 
387   { 0, 0, MYSQL_TYPE_NULL, 0, 0, "", SKIP_OPEN_TABLE }
388 };
389 
390 #include <mysql/innodb_priv.h>
391 
392 static
393 int
ndb_transid_mysql_connection_map_fill_table(THD * thd,TABLE_LIST * tables,Item *)394 ndb_transid_mysql_connection_map_fill_table(THD* thd, TABLE_LIST* tables,
395                                             Item*)
396 {
397   DBUG_ENTER("ndb_transid_mysql_connection_map_fill_table");
398 
399   const bool all = (check_global_access(thd, PROCESS_ACL) == 0);
400   const ulonglong self = thd_get_thread_id(thd);
401 
402   TABLE* table= tables->table;
403   for (uint i = 0; i<g_pool_alloc; i++)
404   {
405     if (g_pool[i])
406     {
407       g_pool[i]->lock_ndb_objects();
408       const Ndb * p = g_pool[i]->get_next_ndb_object(0);
409       while (p)
410       {
411         Uint64 connection_id = p->getCustomData64();
412         if ((connection_id == self) || all)
413         {
414           table->field[0]->set_notnull();
415           table->field[0]->store(p->getCustomData64(), true);
416           table->field[1]->set_notnull();
417           table->field[1]->store(g_pool[i]->node_id());
418           table->field[2]->set_notnull();
419           table->field[2]->store(p->getNextTransactionId(), true);
420           schema_table_store_record(thd, table);
421         }
422         p = g_pool[i]->get_next_ndb_object(p);
423       }
424       g_pool[i]->unlock_ndb_objects();
425     }
426   }
427 
428   DBUG_RETURN(0);
429 }
430 
431 static
432 int
ndb_transid_mysql_connection_map_init(void * p)433 ndb_transid_mysql_connection_map_init(void *p)
434 {
435   DBUG_ENTER("ndb_transid_mysql_connection_map_init");
436   ST_SCHEMA_TABLE* schema = reinterpret_cast<ST_SCHEMA_TABLE*>(p);
437   schema->fields_info = ndb_transid_mysql_connection_map_fields_info;
438   schema->fill_table = ndb_transid_mysql_connection_map_fill_table;
439   DBUG_RETURN(0);
440 }
441 
442 static
443 int
ndb_transid_mysql_connection_map_deinit(void * p)444 ndb_transid_mysql_connection_map_deinit(void *p)
445 {
446   DBUG_ENTER("ndb_transid_mysql_connection_map_deinit");
447   DBUG_RETURN(0);
448 }
449 
450 #include <mysql/plugin.h>
451 static struct st_mysql_information_schema i_s_info =
452 {
453   MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION
454 };
455 
456 struct st_mysql_plugin i_s_ndb_transid_mysql_connection_map_plugin =
457 {
458   MYSQL_INFORMATION_SCHEMA_PLUGIN,
459   &i_s_info,
460   "ndb_transid_mysql_connection_map",
461   "Oracle Corporation",
462   "Map between mysql connection id and ndb transaction id",
463   PLUGIN_LICENSE_GPL,
464   ndb_transid_mysql_connection_map_init,
465   ndb_transid_mysql_connection_map_deinit,
466   0x0001,
467   NULL,
468   NULL,
469   NULL,
470   0
471 };
472