1 /*
2    Copyright (c) 2009, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "ha_ndbcluster_glue.h"
26 #include "ha_ndbinfo.h"
27 #include "ndb_tdc.h"
28 #include "../storage/ndb/src/ndbapi/NdbInfo.hpp"
29 
30 
31 static MYSQL_THDVAR_UINT(
32   max_rows,                          /* name */
33   PLUGIN_VAR_RQCMDARG,
34   "Specify max number of rows to fetch per roundtrip to cluster",
35   NULL,                              /* check func. */
36   NULL,                              /* update func. */
37   10,                                /* default */
38   1,                                 /* min */
39   256,                               /* max */
40   0                                  /* block */
41 );
42 
43 static MYSQL_THDVAR_UINT(
44   max_bytes,                         /* name */
45   PLUGIN_VAR_RQCMDARG,
46   "Specify approx. max number of bytes to fetch per roundtrip to cluster",
47   NULL,                              /* check func. */
48   NULL,                              /* update func. */
49   0,                                 /* default */
50   0,                                 /* min */
51   65535,                             /* max */
52   0                                  /* block */
53 );
54 
55 static MYSQL_THDVAR_BOOL(
56   show_hidden,                       /* name */
57   PLUGIN_VAR_RQCMDARG,
58   "Control if tables should be visible or not",
59   NULL,                              /* check func. */
60   NULL,                              /* update func. */
61   FALSE                              /* default */
62 );
63 
64 static char* opt_ndbinfo_dbname = (char*)"ndbinfo";
65 static MYSQL_SYSVAR_STR(
66   database,                         /* name */
67   opt_ndbinfo_dbname,               /* var */
68   PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
69   "Name of the database used by ndbinfo",
70   NULL,                             /* check func. */
71   NULL,                             /* update func. */
72   NULL                              /* default */
73 );
74 
75 static char* opt_ndbinfo_table_prefix = (char*)"ndb$";
76 static MYSQL_SYSVAR_STR(
77   table_prefix,                     /* name */
78   opt_ndbinfo_table_prefix,         /* var */
79   PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
80   "Prefix to use for all virtual tables loaded from NDB",
81   NULL,                             /* check func. */
82   NULL,                             /* update func. */
83   NULL                              /* default */
84 );
85 
86 static Uint32 opt_ndbinfo_version = NDB_VERSION_D;
87 static MYSQL_SYSVAR_UINT(
88   version,                          /* name */
89   opt_ndbinfo_version,              /* var */
90   PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_READONLY,
91   "Compile version for ndbinfo",
92   NULL,                             /* check func. */
93   NULL,                             /* update func. */
94   0,                                /* default */
95   0,                                /* min */
96   0,                                /* max */
97   0                                 /* block */
98 );
99 
100 static my_bool opt_ndbinfo_offline;
101 
102 static
103 void
offline_update(THD * thd,struct st_mysql_sys_var * var,void * var_ptr,const void * save)104 offline_update(THD* thd, struct st_mysql_sys_var* var,
105                void* var_ptr, const void* save)
106 {
107   DBUG_ENTER("offline_update");
108 
109   const my_bool new_offline =
110     (*(static_cast<const my_bool*>(save)) != 0);
111   if (new_offline == opt_ndbinfo_offline)
112   {
113     // No change
114     DBUG_VOID_RETURN;
115   }
116 
117   // Set offline mode, any tables opened from here on will
118   // be opened in the new mode
119   opt_ndbinfo_offline = new_offline;
120 
121   // Close any open tables which may be in the old mode
122   (void)ndb_tdc_close_cached_tables();
123 
124   DBUG_VOID_RETURN;
125 }
126 
127 static MYSQL_SYSVAR_BOOL(
128   offline,                          /* name */
129   opt_ndbinfo_offline,              /* var */
130   PLUGIN_VAR_NOCMDOPT,
131   "Set ndbinfo in offline mode, tables and views can "
132   "be opened even if they don't exist or have different "
133   "definition in NDB. No rows will be returned.",
134   NULL,                             /* check func. */
135   offline_update,                   /* update func. */
136   0                                 /* default */
137 );
138 
139 
140 static NdbInfo* g_ndbinfo;
141 
142 extern Ndb_cluster_connection* g_ndb_cluster_connection;
143 
144 static bool
ndbcluster_is_disabled(void)145 ndbcluster_is_disabled(void)
146 {
147   /*
148     ndbinfo uses the same connection as ndbcluster
149     to avoid using up another nodeid, this also means that
150     if ndbcluster is not enabled, ndbinfo won't start
151   */
152   if (g_ndb_cluster_connection)
153     return false;
154   assert(g_ndbinfo == NULL);
155   return true;
156 }
157 
158 static handler*
create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)159 create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
160 {
161   return new (mem_root) ha_ndbinfo(hton, table);
162 }
163 
164 struct ha_ndbinfo_impl
165 {
166   const NdbInfo::Table* m_table;
167   NdbInfoScanOperation* m_scan_op;
168   Vector<const NdbInfoRecAttr *> m_columns;
169   bool m_first_use;
170 
171   // Indicates if table has been opened in offline mode
172   // can only be reset by closing the table
173   bool m_offline;
174 
ha_ndbinfo_implha_ndbinfo_impl175   ha_ndbinfo_impl() :
176     m_table(NULL),
177     m_scan_op(NULL),
178     m_first_use(true),
179     m_offline(false)
180   {
181   }
182 };
183 
ha_ndbinfo(handlerton * hton,TABLE_SHARE * table_arg)184 ha_ndbinfo::ha_ndbinfo(handlerton *hton, TABLE_SHARE *table_arg)
185 : handler(hton, table_arg), m_impl(*new ha_ndbinfo_impl)
186 {
187 }
188 
~ha_ndbinfo()189 ha_ndbinfo::~ha_ndbinfo()
190 {
191   delete &m_impl;
192 }
193 
194 enum ndbinfo_error_codes {
195   ERR_INCOMPAT_TABLE_DEF = 40001
196 };
197 
198 static
199 struct error_message {
200   int error;
201   const char* message;
202 } error_messages[] = {
203   { ERR_INCOMPAT_TABLE_DEF, "Incompatible table definitions" },
204   { HA_ERR_NO_CONNECTION, "Connection to NDB failed" },
205 
206   { 0, 0 }
207 };
208 
209 static
find_error_message(int error)210 const char* find_error_message(int error)
211 {
212   struct error_message* err = error_messages;
213   while (err->error && err->message)
214   {
215     if (err->error == error)
216     {
217       assert(err->message);
218       return err->message;
219     }
220     err++;
221   }
222   return NULL;
223 }
224 
err2mysql(int error)225 static int err2mysql(int error)
226 {
227   DBUG_ENTER("err2mysql");
228   DBUG_PRINT("enter", ("error: %d", error));
229   assert(error != 0);
230   switch(error)
231   {
232   case NdbInfo::ERR_ClusterFailure:
233     DBUG_RETURN(HA_ERR_NO_CONNECTION);
234     break;
235   case NdbInfo::ERR_OutOfMemory:
236     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
237     break;
238   default:
239     break;
240   }
241   push_warning_printf(current_thd, Sql_condition::SL_WARNING,
242                       ER_GET_ERRNO, ER(ER_GET_ERRNO), error);
243   DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
244 }
245 
get_error_message(int error,String * buf)246 bool ha_ndbinfo::get_error_message(int error, String *buf)
247 {
248   DBUG_ENTER("ha_ndbinfo::get_error_message");
249   DBUG_PRINT("enter", ("error: %d", error));
250 
251   const char* message = find_error_message(error);
252   if (!message)
253     DBUG_RETURN(false);
254 
255   buf->set(message, (uint32)strlen(message), &my_charset_bin);
256   DBUG_PRINT("exit", ("message: %s", buf->ptr()));
257   DBUG_RETURN(false);
258 }
259 
260 static void
generate_sql(const NdbInfo::Table * ndb_tab,BaseString & sql)261 generate_sql(const NdbInfo::Table* ndb_tab, BaseString& sql)
262 {
263   sql.appfmt("'CREATE TABLE `%s`.`%s%s` (",
264              opt_ndbinfo_dbname, opt_ndbinfo_table_prefix, ndb_tab->getName());
265 
266   const char* separator = "";
267   for (unsigned i = 0; i < ndb_tab->columns(); i++)
268   {
269     const NdbInfo::Column* col = ndb_tab->getColumn(i);
270 
271     sql.appfmt("%s", separator);
272     separator = ", ";
273 
274     sql.appfmt("`%s` ", col->m_name.c_str());
275 
276     switch(col->m_type)
277     {
278     case NdbInfo::Column::Number:
279       sql.appfmt("INT UNSIGNED");
280       break;
281     case NdbInfo::Column::Number64:
282       sql.appfmt("BIGINT UNSIGNED");
283       break;
284     case NdbInfo::Column::String:
285       sql.appfmt("VARCHAR(512)");
286       break;
287     default:
288       sql.appfmt("UNKNOWN");
289       assert(false);
290       break;
291     }
292   }
293   sql.appfmt(") ENGINE=NDBINFO'");
294 }
295 
296 /*
297   Push a warning with explanation of the problem as well as the
298   proper SQL so the user can regenerate the table definition
299 */
300 
301 static void
warn_incompatible(const NdbInfo::Table * ndb_tab,bool fatal,const char * format,...)302 warn_incompatible(const NdbInfo::Table* ndb_tab, bool fatal,
303              const char* format, ...)
304 {
305   BaseString msg;
306   DBUG_ENTER("warn_incompatible");
307   DBUG_PRINT("enter",("table_name: %s, fatal: %d", ndb_tab->getName(), fatal));
308   assert(format != NULL);
309 
310   va_list args;
311   char explanation[128];
312   va_start(args,format);
313   my_vsnprintf(explanation, sizeof(explanation), format, args);
314   va_end(args);
315 
316   msg.assfmt("Table '%s%s' is defined differently in NDB, %s. The "
317              "SQL to regenerate is: ",
318              opt_ndbinfo_table_prefix, ndb_tab->getName(), explanation);
319   generate_sql(ndb_tab, msg);
320 
321   const Sql_condition::enum_severity_level level =
322     (fatal ? Sql_condition::SL_WARNING : Sql_condition::SL_NOTE);
323   push_warning(current_thd, level, ERR_INCOMPAT_TABLE_DEF, msg.c_str());
324 
325   DBUG_VOID_RETURN;
326 }
327 
create(const char * name,TABLE * form,HA_CREATE_INFO * create_info)328 int ha_ndbinfo::create(const char *name, TABLE *form,
329                        HA_CREATE_INFO *create_info)
330 {
331   DBUG_ENTER("ha_ndbinfo::create");
332   DBUG_PRINT("enter", ("name: %s", name));
333 
334   DBUG_RETURN(0);
335 }
336 
is_open(void) const337 bool ha_ndbinfo::is_open(void) const
338 {
339   return m_impl.m_table != NULL;
340 }
341 
is_offline(void) const342 bool ha_ndbinfo::is_offline(void) const
343 {
344   return m_impl.m_offline;
345 }
346 
open(const char * name,int mode,uint test_if_locked)347 int ha_ndbinfo::open(const char *name, int mode, uint test_if_locked)
348 {
349   DBUG_ENTER("ha_ndbinfo::open");
350   DBUG_PRINT("enter", ("name: %s, mode: %d", name, mode));
351 
352   assert(is_closed());
353   assert(!is_offline()); // Closed table can not be offline
354 
355   if (mode == O_RDWR)
356   {
357     if (table->db_stat & HA_TRY_READ_ONLY)
358     {
359       DBUG_PRINT("info", ("Telling server to use readonly mode"));
360       DBUG_RETURN(EROFS); // Read only fs
361     }
362     // Find any commands that does not allow open readonly
363     assert(false);
364   }
365 
366   if (opt_ndbinfo_offline ||
367       ndbcluster_is_disabled())
368   {
369     // Mark table as being offline and allow it to be opened
370     m_impl.m_offline = true;
371     DBUG_RETURN(0);
372   }
373 
374   int err = g_ndbinfo->openTable(name, &m_impl.m_table);
375   if (err)
376   {
377     assert(m_impl.m_table == 0);
378     if (err == NdbInfo::ERR_NoSuchTable)
379       DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
380     DBUG_RETURN(err2mysql(err));
381   }
382 
383   /*
384     Check table def. to detect incompatible differences which should
385     return an error. Differences which only generate a warning
386     is checked on first use
387   */
388   DBUG_PRINT("info", ("Comparing MySQL's table def against NDB"));
389   const NdbInfo::Table* ndb_tab = m_impl.m_table;
390   for (uint i = 0; i < table->s->fields; i++)
391   {
392     const Field* field = table->field[i];
393 
394     // Check if field is NULLable
395     if (const_cast<Field*>(field)->real_maybe_null() == false)
396     {
397       // Only NULLable fields supported
398       warn_incompatible(ndb_tab, true,
399                         "column '%s' is NOT NULL",
400                         field->field_name);
401       delete m_impl.m_table; m_impl.m_table= 0;
402       DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
403     }
404 
405     // Check if column exist in NDB
406     const NdbInfo::Column* col = ndb_tab->getColumn(field->field_name);
407     if (!col)
408     {
409       // The column didn't exist
410       continue;
411     }
412 
413     // Check compatible field and column type
414     bool compatible = false;
415     switch(col->m_type)
416     {
417     case NdbInfo::Column::Number:
418       if (field->type() == MYSQL_TYPE_LONG)
419         compatible = true;
420       break;
421     case NdbInfo::Column::Number64:
422       if (field->type() == MYSQL_TYPE_LONGLONG)
423         compatible = true;
424       break;
425     case NdbInfo::Column::String:
426       if (field->type() == MYSQL_TYPE_VARCHAR)
427         compatible = true;
428       break;
429     default:
430       assert(false);
431       break;
432     }
433     if (!compatible)
434     {
435       // The column type is not compatible
436       warn_incompatible(ndb_tab, true,
437                         "column '%s' is not compatible",
438                         field->field_name);
439       delete m_impl.m_table; m_impl.m_table= 0;
440       DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
441     }
442   }
443 
444   /* Increase "ref_length" to allow a whole row to be stored in "ref" */
445   ref_length = 0;
446   for (uint i = 0; i < table->s->fields; i++)
447     ref_length += table->field[i]->pack_length();
448   DBUG_PRINT("info", ("ref_length: %u", ref_length));
449 
450   DBUG_RETURN(0);
451 }
452 
close(void)453 int ha_ndbinfo::close(void)
454 {
455   DBUG_ENTER("ha_ndbinfo::close");
456 
457   if (is_offline())
458     DBUG_RETURN(0);
459 
460   assert(is_open());
461   if (m_impl.m_table)
462   {
463     g_ndbinfo->closeTable(m_impl.m_table);
464     m_impl.m_table = NULL;
465   }
466   DBUG_RETURN(0);
467 }
468 
rnd_init(bool scan)469 int ha_ndbinfo::rnd_init(bool scan)
470 {
471   DBUG_ENTER("ha_ndbinfo::rnd_init");
472   DBUG_PRINT("info", ("scan: %d", scan));
473 
474   if (is_offline())
475   {
476     push_warning(current_thd, Sql_condition::SL_NOTE, 1,
477                  "'NDBINFO' has been started in offline mode "
478                  "since the 'NDBCLUSTER' engine is disabled "
479                  "or @@global.ndbinfo_offline is turned on "
480                  "- no rows can be returned");
481     DBUG_RETURN(0);
482   }
483 
484   assert(is_open());
485 
486   if (m_impl.m_scan_op)
487   {
488     /*
489       It should be impossible to come here with an already open
490       scan, assumption is that rnd_end() would be called to indicate
491       that the previous scan should be closed or perhaps like it says
492       in decsription of rnd_init() that it "may be called two times". Once
493       to open the cursor and once to position te cursor at first row.
494 
495       Unfortunately the assumption and description of rnd_init() is not
496       correct. The rnd_init function is used on an open scan to reposition
497       it back to first row. For ha_ndbinfo this means closing
498       the scan and letting it be reopened.
499     */
500     assert(scan); // "only makes sense if scan=1" (from rnd_init() description)
501 
502     DBUG_PRINT("info", ("Closing scan to position it back to first row"));
503 
504     // Release the scan operation
505     g_ndbinfo->releaseScanOperation(m_impl.m_scan_op);
506     m_impl.m_scan_op = NULL;
507 
508     // Release pointers to the columns
509     m_impl.m_columns.clear();
510   }
511 
512   assert(m_impl.m_scan_op == NULL); // No scan already ongoing
513 
514   if (m_impl.m_first_use)
515   {
516     m_impl.m_first_use = false;
517 
518     /*
519       Check table def. and generate warnings for incompatibilites
520       which is allowed but should generate a warning.
521       (Done this late due to different code paths in MySQL Server for
522       prepared statement protocol, where warnings from 'handler::open'
523       are lost).
524     */
525     uint fields_found_in_ndb = 0;
526     const NdbInfo::Table* ndb_tab = m_impl.m_table;
527     for (uint i = 0; i < table->s->fields; i++)
528     {
529       const Field* field = table->field[i];
530       const NdbInfo::Column* col = ndb_tab->getColumn(field->field_name);
531       if (!col)
532       {
533         // The column didn't exist
534         warn_incompatible(ndb_tab, true,
535                           "column '%s' does not exist",
536                           field->field_name);
537         continue;
538       }
539       fields_found_in_ndb++;
540     }
541 
542     if (fields_found_in_ndb < ndb_tab->columns())
543     {
544       // There are more columns available in NDB
545       warn_incompatible(ndb_tab, false,
546                         "there are more columns available");
547     }
548   }
549 
550   if (!scan)
551   {
552     // Just an init to read using 'rnd_pos'
553     DBUG_PRINT("info", ("not scan"));
554     DBUG_RETURN(0);
555   }
556 
557   THD* thd = current_thd;
558   int err;
559   NdbInfoScanOperation* scan_op = NULL;
560   if ((err = g_ndbinfo->createScanOperation(m_impl.m_table,
561                                             &scan_op,
562                                             THDVAR(thd, max_rows),
563                                             THDVAR(thd, max_bytes))) != 0)
564     DBUG_RETURN(err2mysql(err));
565 
566   if ((err = scan_op->readTuples()) != 0)
567   {
568     // Release the scan operation
569     g_ndbinfo->releaseScanOperation(scan_op);
570     DBUG_RETURN(err2mysql(err));
571   }
572 
573   /* Read all columns specified in read_set */
574   for (uint i = 0; i < table->s->fields; i++)
575   {
576     Field *field = table->field[i];
577     if (bitmap_is_set(table->read_set, i))
578       m_impl.m_columns.push_back(scan_op->getValue(field->field_name));
579     else
580       m_impl.m_columns.push_back(NULL);
581   }
582 
583   if ((err = scan_op->execute()) != 0)
584   {
585     // Release pointers to the columns
586     m_impl.m_columns.clear();
587     // Release the scan operation
588     g_ndbinfo->releaseScanOperation(scan_op);
589     DBUG_RETURN(err2mysql(err));
590   }
591 
592   m_impl.m_scan_op = scan_op;
593   DBUG_RETURN(0);
594 }
595 
rnd_end()596 int ha_ndbinfo::rnd_end()
597 {
598   DBUG_ENTER("ha_ndbinfo::rnd_end");
599 
600   if (is_offline())
601     DBUG_RETURN(0);
602 
603   assert(is_open());
604 
605   if (m_impl.m_scan_op)
606   {
607     g_ndbinfo->releaseScanOperation(m_impl.m_scan_op);
608     m_impl.m_scan_op = NULL;
609   }
610   m_impl.m_columns.clear();
611 
612   DBUG_RETURN(0);
613 }
614 
rnd_next(uchar * buf)615 int ha_ndbinfo::rnd_next(uchar *buf)
616 {
617   int err;
618   DBUG_ENTER("ha_ndbinfo::rnd_next");
619 
620   if (is_offline())
621     DBUG_RETURN(HA_ERR_END_OF_FILE);
622 
623   assert(is_open());
624 
625   if (!m_impl.m_scan_op)
626   {
627     /*
628      It should be impossible to come here without a scan operation.
629      But apparently it's not safe to assume that rnd_next() isn't
630      called even though rnd_init() returned an error. Thus double check
631      that the scan operation exists and bail out in case it doesn't.
632     */
633     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
634   }
635 
636   if ((err = m_impl.m_scan_op->nextResult()) == 0)
637     DBUG_RETURN(HA_ERR_END_OF_FILE);
638 
639   if (err != 1)
640     DBUG_RETURN(err2mysql(err));
641 
642   unpack_record(buf);
643 
644   DBUG_RETURN(0);
645 }
646 
rnd_pos(uchar * buf,uchar * pos)647 int ha_ndbinfo::rnd_pos(uchar *buf, uchar *pos)
648 {
649   DBUG_ENTER("ha_ndbinfo::rnd_pos");
650   assert(is_open());
651   assert(m_impl.m_scan_op == NULL); // No scan started
652 
653   /* Copy the saved row into "buf" and set all fields to not null */
654   memcpy(buf, pos, ref_length);
655   for (uint i = 0; i < table->s->fields; i++)
656     table->field[i]->set_notnull();
657 
658   DBUG_RETURN(0);
659 }
660 
position(const uchar * record)661 void ha_ndbinfo::position(const uchar *record)
662 {
663   DBUG_ENTER("ha_ndbinfo::position");
664   assert(is_open());
665   assert(m_impl.m_scan_op);
666 
667   /* Save away the whole row in "ref" */
668   memcpy(ref, record, ref_length);
669 
670   DBUG_VOID_RETURN;
671 }
672 
info(uint flag)673 int ha_ndbinfo::info(uint flag)
674 {
675   DBUG_ENTER("ha_ndbinfo::info");
676   DBUG_PRINT("enter", ("flag: %d", flag));
677   DBUG_RETURN(0);
678 }
679 
680 void
unpack_record(uchar * dst_row)681 ha_ndbinfo::unpack_record(uchar *dst_row)
682 {
683   DBUG_ENTER("ha_ndbinfo::unpack_record");
684   my_ptrdiff_t dst_offset = dst_row - table->record[0];
685 
686   for (uint i = 0; i < table->s->fields; i++)
687   {
688     Field *field = table->field[i];
689     const NdbInfoRecAttr* record = m_impl.m_columns[i];
690     if (record && !record->isNULL())
691     {
692       field->set_notnull();
693       field->move_field_offset(dst_offset);
694       switch (field->type()) {
695 
696       case (MYSQL_TYPE_VARCHAR):
697       {
698         DBUG_PRINT("info", ("str: %s", record->c_str()));
699         Field_varstring* vfield = (Field_varstring *) field;
700         /* Field_bit in DBUG requires the bit set in write_set for store(). */
701         my_bitmap_map *old_map =
702           dbug_tmp_use_all_columns(table, table->write_set);
703         (void)vfield->store(record->c_str(),
704                             MIN(record->length(), field->field_length)-1,
705                             field->charset());
706         dbug_tmp_restore_column_map(table->write_set, old_map);
707         break;
708       }
709 
710       case (MYSQL_TYPE_LONG):
711       {
712         memcpy(field->ptr, record->ptr(), sizeof(Uint32));
713         break;
714       }
715 
716       case (MYSQL_TYPE_LONGLONG):
717       {
718         memcpy(field->ptr, record->ptr(), sizeof(Uint64));
719         break;
720       }
721 
722       default:
723         sql_print_error("Found unexpected field type %u", field->type());
724         break;
725       }
726 
727       field->move_field_offset(-dst_offset);
728     }
729     else
730     {
731       field->set_null();
732     }
733   }
734   DBUG_VOID_RETURN;
735 }
736 
737 
738 static int
ndbinfo_find_files(handlerton * hton,THD * thd,const char * db,const char * path,const char * wild,bool dir,List<LEX_STRING> * files)739 ndbinfo_find_files(handlerton *hton, THD *thd,
740                    const char *db, const char *path,
741                    const char *wild, bool dir, List<LEX_STRING> *files)
742 {
743   DBUG_ENTER("ndbinfo_find_files");
744   DBUG_PRINT("enter", ("db: '%s', dir: %d, path: '%s'", db, dir, path));
745 
746   const bool show_hidden = THDVAR(thd, show_hidden);
747 
748   if(show_hidden)
749     DBUG_RETURN(0); // Don't filter out anything
750 
751   if (dir)
752   {
753     if (!ndbcluster_is_disabled())
754       DBUG_RETURN(0);
755 
756     // Hide our database when ndbcluster is disabled
757     LEX_STRING *dir_name;
758     List_iterator<LEX_STRING> it(*files);
759     while ((dir_name=it++))
760     {
761       if (strcmp(dir_name->str, opt_ndbinfo_dbname))
762         continue;
763 
764       DBUG_PRINT("info", ("Hiding own databse '%s'", dir_name->str));
765       it.remove();
766     }
767 
768     DBUG_RETURN(0);
769   }
770 
771   assert(db);
772   if (strcmp(db, opt_ndbinfo_dbname))
773     DBUG_RETURN(0); // Only hide files in "our" db
774 
775   /* Hide all files that start with "our" prefix */
776   LEX_STRING *file_name;
777   List_iterator<LEX_STRING> it(*files);
778   while ((file_name=it++))
779   {
780     if (is_prefix(file_name->str, opt_ndbinfo_table_prefix))
781     {
782       DBUG_PRINT("info", ("Hiding '%s'", file_name->str));
783       it.remove();
784     }
785   }
786 
787   DBUG_RETURN(0);
788 }
789 
790 
791 handlerton* ndbinfo_hton;
792 
793 static
794 int
ndbinfo_init(void * plugin)795 ndbinfo_init(void *plugin)
796 {
797   DBUG_ENTER("ndbinfo_init");
798 
799   handlerton *hton = (handlerton *) plugin;
800   hton->create = create_handler;
801   hton->flags =
802     HTON_TEMPORARY_NOT_SUPPORTED |
803     HTON_ALTER_NOT_SUPPORTED;
804   hton->find_files = ndbinfo_find_files;
805 
806   ndbinfo_hton = hton;
807 
808   if (ndbcluster_is_disabled())
809   {
810     // Starting in limited mode since ndbcluster is disabled
811      DBUG_RETURN(0);
812   }
813 
814   char prefix[FN_REFLEN];
815   build_table_filename(prefix, sizeof(prefix) - 1,
816                        opt_ndbinfo_dbname, opt_ndbinfo_table_prefix, "", 0);
817   DBUG_PRINT("info", ("prefix: '%s'", prefix));
818   assert(g_ndb_cluster_connection);
819   g_ndbinfo = new NdbInfo(g_ndb_cluster_connection, prefix,
820                           opt_ndbinfo_dbname, opt_ndbinfo_table_prefix);
821   if (!g_ndbinfo)
822   {
823     sql_print_error("Failed to create NdbInfo");
824     DBUG_RETURN(1);
825   }
826 
827   if (!g_ndbinfo->init())
828   {
829     sql_print_error("Failed to init NdbInfo");
830 
831     delete g_ndbinfo;
832     g_ndbinfo = NULL;
833 
834     DBUG_RETURN(1);
835   }
836 
837   DBUG_RETURN(0);
838 }
839 
840 static
841 int
ndbinfo_deinit(void * plugin)842 ndbinfo_deinit(void *plugin)
843 {
844   DBUG_ENTER("ndbinfo_deinit");
845 
846   if (g_ndbinfo)
847   {
848     delete g_ndbinfo;
849     g_ndbinfo = NULL;
850   }
851 
852   DBUG_RETURN(0);
853 }
854 
855 struct st_mysql_sys_var* ndbinfo_system_variables[]= {
856   MYSQL_SYSVAR(max_rows),
857   MYSQL_SYSVAR(max_bytes),
858   MYSQL_SYSVAR(show_hidden),
859   MYSQL_SYSVAR(database),
860   MYSQL_SYSVAR(table_prefix),
861   MYSQL_SYSVAR(version),
862   MYSQL_SYSVAR(offline),
863 
864   NULL
865 };
866 
867 struct st_mysql_storage_engine ndbinfo_storage_engine=
868 {
869   MYSQL_HANDLERTON_INTERFACE_VERSION
870 };
871 
872 struct st_mysql_plugin ndbinfo_plugin =
873 {
874   MYSQL_STORAGE_ENGINE_PLUGIN,
875   &ndbinfo_storage_engine,
876   "ndbinfo",
877   "Sun Microsystems Inc.",
878   "MySQL Cluster system information storage engine",
879   PLUGIN_LICENSE_GPL,
880   ndbinfo_init,               /* plugin init */
881   ndbinfo_deinit,             /* plugin deinit */
882   0x0001,                     /* plugin version */
883   NULL,                       /* status variables */
884   ndbinfo_system_variables,   /* system variables */
885   NULL,                       /* config options */
886   0
887 };
888 
889 template class Vector<const NdbInfoRecAttr*>;
890