1 /*
2    Copyright (C) 2009 Sun Microsystems Inc.
3    All rights reserved. Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License, version 2.0,
7    as published by the Free Software Foundation.
8 
9    This program is also distributed with certain software (including
10    but not limited to OpenSSL) that is licensed under separate terms,
11    as designated in a particular file or component or in included license
12    documentation.  The authors of MySQL hereby grant you an additional
13    permission to link the program and your derivative works with the
14    separately licensed software that they have included with MySQL.
15 
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License, version 2.0, for more details.
20 
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 */
25 
26 #include "ha_ndbcluster_glue.h"
27 
28 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
29 #include "ha_ndbinfo.h"
30 #include "../storage/ndb/src/ndbapi/NdbInfo.hpp"
31 
32 
33 static MYSQL_THDVAR_UINT(
34   max_rows,                          /* name */
35   PLUGIN_VAR_RQCMDARG,
36   "Specify max number of rows to fetch per roundtrip to cluster",
37   NULL,                              /* check func. */
38   NULL,                              /* update func. */
39   10,                                /* default */
40   1,                                 /* min */
41   256,                               /* max */
42   0                                  /* block */
43 );
44 
45 static MYSQL_THDVAR_UINT(
46   max_bytes,                         /* name */
47   PLUGIN_VAR_RQCMDARG,
48   "Specify approx. max number of bytes to fetch per roundtrip to cluster",
49   NULL,                              /* check func. */
50   NULL,                              /* update func. */
51   0,                                 /* default */
52   0,                                 /* min */
53   65535,                             /* max */
54   0                                  /* block */
55 );
56 
57 static MYSQL_THDVAR_BOOL(
58   show_hidden,                       /* name */
59   PLUGIN_VAR_RQCMDARG,
60   "Control if tables should be visible or not",
61   NULL,                              /* check func. */
62   NULL,                              /* update func. */
63   FALSE                              /* default */
64 );
65 
66 static char* opt_ndbinfo_dbname = (char*)"ndbinfo";
67 static MYSQL_SYSVAR_STR(
68   database,                         /* name */
69   opt_ndbinfo_dbname,               /* var */
70   PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
71   "Name of the database used by ndbinfo",
72   NULL,                             /* check func. */
73   NULL,                             /* update func. */
74   NULL                              /* default */
75 );
76 
77 static char* opt_ndbinfo_table_prefix = (char*)"ndb$";
78 static MYSQL_SYSVAR_STR(
79   table_prefix,                     /* name */
80   opt_ndbinfo_table_prefix,         /* var */
81   PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
82   "Prefix to use for all virtual tables loaded from NDB",
83   NULL,                             /* check func. */
84   NULL,                             /* update func. */
85   NULL                              /* default */
86 );
87 
88 static Uint32 opt_ndbinfo_version = NDB_VERSION_D;
89 static MYSQL_SYSVAR_UINT(
90   version,                          /* name */
91   opt_ndbinfo_version,              /* var */
92   PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_READONLY,
93   "Compile version for ndbinfo",
94   NULL,                             /* check func. */
95   NULL,                             /* update func. */
96   0,                                /* default */
97   0,                                /* min */
98   0,                                /* max */
99   0                                 /* block */
100 );
101 
102 static my_bool opt_ndbinfo_offline;
103 
104 static
105 void
offline_update(THD * thd,struct st_mysql_sys_var * var,void * var_ptr,const void * save)106 offline_update(THD* thd, struct st_mysql_sys_var* var,
107                void* var_ptr, const void* save)
108 {
109   DBUG_ENTER("offline_update");
110 
111   const my_bool new_offline =
112     (*(static_cast<const my_bool*>(save)) != 0);
113   if (new_offline == opt_ndbinfo_offline)
114   {
115     // No change
116     DBUG_VOID_RETURN;
117   }
118 
119   // Set offline mode, any tables opened from here on will
120   // be opened in the new mode
121   opt_ndbinfo_offline = new_offline;
122 
123   // Close any open tables which may be in the old mode
124   (void)close_cached_tables(thd, NULL, false, true, false);
125 
126   DBUG_VOID_RETURN;
127 }
128 
129 static MYSQL_SYSVAR_BOOL(
130   offline,                          /* name */
131   opt_ndbinfo_offline,              /* var */
132   PLUGIN_VAR_NOCMDOPT,
133   "Set ndbinfo in offline mode, tables and views can "
134   "be opened even if they don't exist or have different "
135   "definition in NDB. No rows will be returned.",
136   NULL,                             /* check func. */
137   offline_update,                   /* update func. */
138   0                                 /* default */
139 );
140 
141 
142 static NdbInfo* g_ndbinfo;
143 
144 extern Ndb_cluster_connection* g_ndb_cluster_connection;
145 
146 static bool
ndbcluster_is_disabled(void)147 ndbcluster_is_disabled(void)
148 {
149   /*
150     ndbinfo uses the same connection as ndbcluster
151     to avoid using up another nodeid, this also means that
152     if ndbcluster is not enabled, ndbinfo won't start
153   */
154   if (g_ndb_cluster_connection)
155     return false;
156   assert(g_ndbinfo == NULL);
157   return true;
158 }
159 
160 static handler*
create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)161 create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
162 {
163   return new (mem_root) ha_ndbinfo(hton, table);
164 }
165 
166 struct ha_ndbinfo_impl
167 {
168   const NdbInfo::Table* m_table;
169   NdbInfoScanOperation* m_scan_op;
170   Vector<const NdbInfoRecAttr *> m_columns;
171   bool m_first_use;
172 
173   // Indicates if table has been opened in offline mode
174   // can only be reset by closing the table
175   bool m_offline;
176 
ha_ndbinfo_implha_ndbinfo_impl177   ha_ndbinfo_impl() :
178     m_table(NULL),
179     m_scan_op(NULL),
180     m_first_use(true),
181     m_offline(false)
182   {
183   }
184 };
185 
ha_ndbinfo(handlerton * hton,TABLE_SHARE * table_arg)186 ha_ndbinfo::ha_ndbinfo(handlerton *hton, TABLE_SHARE *table_arg)
187 : handler(hton, table_arg), m_impl(*new ha_ndbinfo_impl)
188 {
189 }
190 
~ha_ndbinfo()191 ha_ndbinfo::~ha_ndbinfo()
192 {
193   delete &m_impl;
194 }
195 
196 enum ndbinfo_error_codes {
197   ERR_INCOMPAT_TABLE_DEF = 40001
198 };
199 
200 struct error_message {
201   int error;
202   const char* message;
203 } error_messages[] = {
204   { ERR_INCOMPAT_TABLE_DEF, "Incompatible table definitions" },
205   { HA_ERR_NO_CONNECTION, "Connection to NDB failed" },
206 
207   { 0, 0 }
208 };
209 
210 static
find_error_message(int error)211 const char* find_error_message(int error)
212 {
213   struct error_message* err = error_messages;
214   while (err->error && err->message)
215   {
216     if (err->error == error)
217     {
218       assert(err->message);
219       return err->message;
220     }
221     err++;
222   }
223   return NULL;
224 }
225 
err2mysql(int error)226 static int err2mysql(int error)
227 {
228   DBUG_ENTER("err2mysql");
229   DBUG_PRINT("enter", ("error: %d", error));
230   assert(error != 0);
231   switch(error)
232   {
233   case NdbInfo::ERR_ClusterFailure:
234     DBUG_RETURN(HA_ERR_NO_CONNECTION);
235     break;
236   case NdbInfo::ERR_OutOfMemory:
237     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
238     break;
239   default:
240     break;
241   }
242   push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
243                       ER_GET_ERRNO, ER(ER_GET_ERRNO), error);
244   DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
245 }
246 
get_error_message(int error,String * buf)247 bool ha_ndbinfo::get_error_message(int error, String *buf)
248 {
249   DBUG_ENTER("ha_ndbinfo::get_error_message");
250   DBUG_PRINT("enter", ("error: %d", error));
251 
252   const char* message = find_error_message(error);
253   if (!message)
254     DBUG_RETURN(false);
255 
256   buf->set(message, strlen(message), &my_charset_bin);
257   DBUG_PRINT("exit", ("message: %s", buf->ptr()));
258   DBUG_RETURN(false);
259 }
260 
261 static void
generate_sql(const NdbInfo::Table * ndb_tab,BaseString & sql)262 generate_sql(const NdbInfo::Table* ndb_tab, BaseString& sql)
263 {
264   sql.appfmt("'CREATE TABLE `%s`.`%s%s` (",
265              opt_ndbinfo_dbname, opt_ndbinfo_table_prefix, ndb_tab->getName());
266 
267   const char* separator = "";
268   for (unsigned i = 0; i < ndb_tab->columns(); i++)
269   {
270     const NdbInfo::Column* col = ndb_tab->getColumn(i);
271 
272     sql.appfmt("%s", separator);
273     separator = ", ";
274 
275     sql.appfmt("`%s` ", col->m_name.c_str());
276 
277     switch(col->m_type)
278     {
279     case NdbInfo::Column::Number:
280       sql.appfmt("INT UNSIGNED");
281       break;
282     case NdbInfo::Column::Number64:
283       sql.appfmt("BIGINT UNSIGNED");
284       break;
285     case NdbInfo::Column::String:
286       sql.appfmt("VARCHAR(512)");
287       break;
288     default:
289       sql.appfmt("UNKNOWN");
290       assert(false);
291       break;
292     }
293   }
294   sql.appfmt(") ENGINE=NDBINFO'");
295 }
296 
297 /*
298   Push a warning with explanation of the problem as well as the
299   proper SQL so the user can regenerate the table definition
300 */
301 
302 static void
warn_incompatible(const NdbInfo::Table * ndb_tab,bool fatal,const char * format,...)303 warn_incompatible(const NdbInfo::Table* ndb_tab, bool fatal,
304              const char* format, ...)
305 {
306   BaseString msg;
307   DBUG_ENTER("warn_incompatible");
308   DBUG_PRINT("enter",("table_name: %s, fatal: %d", ndb_tab->getName(), fatal));
309   DBUG_ASSERT(format != NULL);
310 
311   va_list args;
312   char explanation[128];
313   va_start(args,format);
314   my_vsnprintf(explanation, sizeof(explanation), format, args);
315   va_end(args);
316 
317   msg.assfmt("Table '%s%s' is defined differently in NDB, %s. The "
318              "SQL to regenerate is: ",
319              opt_ndbinfo_table_prefix, ndb_tab->getName(), explanation);
320   generate_sql(ndb_tab, msg);
321 
322   const Sql_condition::enum_warning_level level =
323     (fatal ? Sql_condition::WARN_LEVEL_WARN : Sql_condition::WARN_LEVEL_NOTE);
324   push_warning(current_thd, level, ERR_INCOMPAT_TABLE_DEF, msg.c_str());
325 
326   DBUG_VOID_RETURN;
327 }
328 
create(const char * name,TABLE * form,HA_CREATE_INFO * create_info)329 int ha_ndbinfo::create(const char *name, TABLE *form,
330                        HA_CREATE_INFO *create_info)
331 {
332   DBUG_ENTER("ha_ndbinfo::create");
333   DBUG_PRINT("enter", ("name: %s", name));
334 
335   DBUG_RETURN(0);
336 }
337 
is_open(void) const338 bool ha_ndbinfo::is_open(void) const
339 {
340   return m_impl.m_table != NULL;
341 }
342 
is_offline(void) const343 bool ha_ndbinfo::is_offline(void) const
344 {
345   return m_impl.m_offline;
346 }
347 
open(const char * name,int mode,uint test_if_locked)348 int ha_ndbinfo::open(const char *name, int mode, uint test_if_locked)
349 {
350   DBUG_ENTER("ha_ndbinfo::open");
351   DBUG_PRINT("enter", ("name: %s, mode: %d", name, mode));
352 
353   assert(is_closed());
354   assert(!is_offline()); // Closed table can not be offline
355 
356   if (mode == O_RDWR)
357   {
358     if (table->db_stat & HA_TRY_READ_ONLY)
359     {
360       DBUG_PRINT("info", ("Telling server to use readonly mode"));
361       DBUG_RETURN(EROFS); // Read only fs
362     }
363     // Find any commands that does not allow open readonly
364     DBUG_ASSERT(false);
365   }
366 
367   if (opt_ndbinfo_offline ||
368       ndbcluster_is_disabled())
369   {
370     // Mark table as being offline and allow it to be opened
371     m_impl.m_offline = true;
372     DBUG_RETURN(0);
373   }
374 
375   int err = g_ndbinfo->openTable(name, &m_impl.m_table);
376   if (err)
377   {
378     assert(m_impl.m_table == 0);
379     if (err == NdbInfo::ERR_NoSuchTable)
380       DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
381     DBUG_RETURN(err2mysql(err));
382   }
383 
384   /*
385     Check table def. to detect incompatible differences which should
386     return an error. Differences which only generate a warning
387     is checked on first use
388   */
389   DBUG_PRINT("info", ("Comparing MySQL's table def against NDB"));
390   const NdbInfo::Table* ndb_tab = m_impl.m_table;
391   for (uint i = 0; i < table->s->fields; i++)
392   {
393     const Field* field = table->field[i];
394 
395     // Check if field is NULLable
396     if (const_cast<Field*>(field)->real_maybe_null() == false)
397     {
398       // Only NULLable fields supported
399       warn_incompatible(ndb_tab, true,
400                         "column '%s' is NOT NULL",
401                         field->field_name);
402       delete m_impl.m_table; m_impl.m_table= 0;
403       DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
404     }
405 
406     // Check if column exist in NDB
407     const NdbInfo::Column* col = ndb_tab->getColumn(field->field_name);
408     if (!col)
409     {
410       // The column didn't exist
411       continue;
412     }
413 
414     // Check compatible field and column type
415     bool compatible = false;
416     switch(col->m_type)
417     {
418     case NdbInfo::Column::Number:
419       if (field->type() == MYSQL_TYPE_LONG)
420         compatible = true;
421       break;
422     case NdbInfo::Column::Number64:
423       if (field->type() == MYSQL_TYPE_LONGLONG)
424         compatible = true;
425       break;
426     case NdbInfo::Column::String:
427       if (field->type() == MYSQL_TYPE_VARCHAR)
428         compatible = true;
429       break;
430     default:
431       assert(false);
432       break;
433     }
434     if (!compatible)
435     {
436       // The column type is not compatible
437       warn_incompatible(ndb_tab, true,
438                         "column '%s' is not compatible",
439                         field->field_name);
440       delete m_impl.m_table; m_impl.m_table= 0;
441       DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
442     }
443   }
444 
445   /* Increase "ref_length" to allow a whole row to be stored in "ref" */
446   ref_length = 0;
447   for (uint i = 0; i < table->s->fields; i++)
448     ref_length += table->field[i]->pack_length();
449   DBUG_PRINT("info", ("ref_length: %u", ref_length));
450 
451   DBUG_RETURN(0);
452 }
453 
close(void)454 int ha_ndbinfo::close(void)
455 {
456   DBUG_ENTER("ha_ndbinfo::close");
457 
458   if (is_offline())
459     DBUG_RETURN(0);
460 
461   assert(is_open());
462   if (m_impl.m_table)
463   {
464     g_ndbinfo->closeTable(m_impl.m_table);
465     m_impl.m_table = NULL;
466   }
467   DBUG_RETURN(0);
468 }
469 
rnd_init(bool scan)470 int ha_ndbinfo::rnd_init(bool scan)
471 {
472   DBUG_ENTER("ha_ndbinfo::rnd_init");
473   DBUG_PRINT("info", ("scan: %d", scan));
474 
475   if (is_offline())
476   {
477     push_warning(current_thd, Sql_condition::WARN_LEVEL_NOTE, 1,
478                  "'NDBINFO' has been started in offline mode "
479                  "since the 'NDBCLUSTER' engine is disabled "
480                  "or @@global.ndbinfo_offline is turned on "
481                  "- no rows can be returned");
482     DBUG_RETURN(0);
483   }
484 
485   assert(is_open());
486   assert(m_impl.m_scan_op == NULL); // No scan already ongoing
487 
488   if (m_impl.m_first_use)
489   {
490     m_impl.m_first_use = false;
491 
492     /*
493       Check table def. and generate warnings for incompatibilites
494       which is allowed but should generate a warning.
495       (Done this late due to different code paths in MySQL Server for
496       prepared statement protocol, where warnings from 'handler::open'
497       are lost).
498     */
499     uint fields_found_in_ndb = 0;
500     const NdbInfo::Table* ndb_tab = m_impl.m_table;
501     for (uint i = 0; i < table->s->fields; i++)
502     {
503       const Field* field = table->field[i];
504       const NdbInfo::Column* col = ndb_tab->getColumn(field->field_name);
505       if (!col)
506       {
507         // The column didn't exist
508         warn_incompatible(ndb_tab, true,
509                           "column '%s' does not exist",
510                           field->field_name);
511         continue;
512       }
513       fields_found_in_ndb++;
514     }
515 
516     if (fields_found_in_ndb < ndb_tab->columns())
517     {
518       // There are more columns available in NDB
519       warn_incompatible(ndb_tab, false,
520                         "there are more columns available");
521     }
522   }
523 
524   if (!scan)
525   {
526     // Just an init to read using 'rnd_pos'
527     DBUG_PRINT("info", ("not scan"));
528     DBUG_RETURN(0);
529   }
530 
531   THD* thd = current_thd;
532   int err;
533   NdbInfoScanOperation* scan_op = NULL;
534   if ((err = g_ndbinfo->createScanOperation(m_impl.m_table,
535                                             &scan_op,
536                                             THDVAR(thd, max_rows),
537                                             THDVAR(thd, max_bytes))) != 0)
538     DBUG_RETURN(err2mysql(err));
539 
540   if ((err = scan_op->readTuples()) != 0)
541     DBUG_RETURN(err2mysql(err));
542 
543   /* Read all columns specified in read_set */
544   for (uint i = 0; i < table->s->fields; i++)
545   {
546     Field *field = table->field[i];
547     if (bitmap_is_set(table->read_set, i))
548       m_impl.m_columns.push_back(scan_op->getValue(field->field_name));
549     else
550       m_impl.m_columns.push_back(NULL);
551   }
552 
553   if ((err = scan_op->execute()) != 0)
554     DBUG_RETURN(err2mysql(err));
555 
556   m_impl.m_scan_op = scan_op;
557   DBUG_RETURN(0);
558 }
559 
rnd_end()560 int ha_ndbinfo::rnd_end()
561 {
562   DBUG_ENTER("ha_ndbinfo::rnd_end");
563 
564   if (is_offline())
565     DBUG_RETURN(0);
566 
567   assert(is_open());
568 
569   if (m_impl.m_scan_op)
570   {
571     g_ndbinfo->releaseScanOperation(m_impl.m_scan_op);
572     m_impl.m_scan_op = NULL;
573   }
574   m_impl.m_columns.clear();
575 
576   DBUG_RETURN(0);
577 }
578 
rnd_next(uchar * buf)579 int ha_ndbinfo::rnd_next(uchar *buf)
580 {
581   int err;
582   DBUG_ENTER("ha_ndbinfo::rnd_next");
583 
584   if (is_offline())
585     DBUG_RETURN(HA_ERR_END_OF_FILE);
586 
587   assert(is_open());
588   assert(m_impl.m_scan_op);
589 
590   if ((err = m_impl.m_scan_op->nextResult()) == 0)
591     DBUG_RETURN(HA_ERR_END_OF_FILE);
592 
593   if (err != 1)
594     DBUG_RETURN(err2mysql(err));
595 
596   unpack_record(buf);
597 
598   DBUG_RETURN(0);
599 }
600 
rnd_pos(uchar * buf,uchar * pos)601 int ha_ndbinfo::rnd_pos(uchar *buf, uchar *pos)
602 {
603   DBUG_ENTER("ha_ndbinfo::rnd_pos");
604   assert(is_open());
605   assert(m_impl.m_scan_op == NULL); // No scan started
606 
607   /* Copy the saved row into "buf" and set all fields to not null */
608   memcpy(buf, pos, ref_length);
609   for (uint i = 0; i < table->s->fields; i++)
610     table->field[i]->set_notnull();
611 
612   DBUG_RETURN(0);
613 }
614 
position(const uchar * record)615 void ha_ndbinfo::position(const uchar *record)
616 {
617   DBUG_ENTER("ha_ndbinfo::position");
618   assert(is_open());
619   assert(m_impl.m_scan_op);
620 
621   /* Save away the whole row in "ref" */
622   memcpy(ref, record, ref_length);
623 
624   DBUG_VOID_RETURN;
625 }
626 
info(uint flag)627 int ha_ndbinfo::info(uint flag)
628 {
629   DBUG_ENTER("ha_ndbinfo::info");
630   DBUG_PRINT("enter", ("flag: %d", flag));
631   DBUG_RETURN(0);
632 }
633 
634 void
unpack_record(uchar * dst_row)635 ha_ndbinfo::unpack_record(uchar *dst_row)
636 {
637   DBUG_ENTER("ha_ndbinfo::unpack_record");
638   my_ptrdiff_t dst_offset = dst_row - table->record[0];
639 
640   for (uint i = 0; i < table->s->fields; i++)
641   {
642     Field *field = table->field[i];
643     const NdbInfoRecAttr* record = m_impl.m_columns[i];
644     if (record && !record->isNULL())
645     {
646       field->set_notnull();
647       field->move_field_offset(dst_offset);
648       switch (field->type()) {
649 
650       case (MYSQL_TYPE_VARCHAR):
651       {
652         DBUG_PRINT("info", ("str: %s", record->c_str()));
653         Field_varstring* vfield = (Field_varstring *) field;
654         /* Field_bit in DBUG requires the bit set in write_set for store(). */
655         my_bitmap_map *old_map =
656           dbug_tmp_use_all_columns(table, table->write_set);
657         (void)vfield->store(record->c_str(),
658                             MIN(record->length(), field->field_length)-1,
659                             field->charset());
660         dbug_tmp_restore_column_map(table->write_set, old_map);
661         break;
662       }
663 
664       case (MYSQL_TYPE_LONG):
665       {
666         memcpy(field->ptr, record->ptr(), sizeof(Uint32));
667         break;
668       }
669 
670       case (MYSQL_TYPE_LONGLONG):
671       {
672         memcpy(field->ptr, record->ptr(), sizeof(Uint64));
673         break;
674       }
675 
676       default:
677         sql_print_error("Found unexpected field type %u", field->type());
678         break;
679       }
680 
681       field->move_field_offset(-dst_offset);
682     }
683     else
684     {
685       field->set_null();
686     }
687   }
688   DBUG_VOID_RETURN;
689 }
690 
691 
692 static int
ndbinfo_find_files(handlerton * hton,THD * thd,const char * db,const char * path,const char * wild,bool dir,List<LEX_STRING> * files)693 ndbinfo_find_files(handlerton *hton, THD *thd,
694                    const char *db, const char *path,
695                    const char *wild, bool dir, List<LEX_STRING> *files)
696 {
697   DBUG_ENTER("ndbinfo_find_files");
698   DBUG_PRINT("enter", ("db: '%s', dir: %d, path: '%s'", db, dir, path));
699 
700   const bool show_hidden = THDVAR(thd, show_hidden);
701 
702   if(show_hidden)
703     DBUG_RETURN(0); // Don't filter out anything
704 
705   if (dir)
706   {
707     if (!ndbcluster_is_disabled())
708       DBUG_RETURN(0);
709 
710     // Hide our database when ndbcluster is disabled
711     LEX_STRING *dir_name;
712     List_iterator<LEX_STRING> it(*files);
713     while ((dir_name=it++))
714     {
715       if (strcmp(dir_name->str, opt_ndbinfo_dbname))
716         continue;
717 
718       DBUG_PRINT("info", ("Hiding own databse '%s'", dir_name->str));
719       it.remove();
720     }
721 
722     DBUG_RETURN(0);
723   }
724 
725   DBUG_ASSERT(db);
726   if (strcmp(db, opt_ndbinfo_dbname))
727     DBUG_RETURN(0); // Only hide files in "our" db
728 
729   /* Hide all files that start with "our" prefix */
730   LEX_STRING *file_name;
731   List_iterator<LEX_STRING> it(*files);
732   while ((file_name=it++))
733   {
734     if (is_prefix(file_name->str, opt_ndbinfo_table_prefix))
735     {
736       DBUG_PRINT("info", ("Hiding '%s'", file_name->str));
737       it.remove();
738     }
739   }
740 
741   DBUG_RETURN(0);
742 }
743 
744 
745 handlerton* ndbinfo_hton;
746 
ndbinfo_init(void * plugin)747 int ndbinfo_init(void *plugin)
748 {
749   DBUG_ENTER("ndbinfo_init");
750 
751   handlerton *hton = (handlerton *) plugin;
752   hton->create = create_handler;
753   hton->flags =
754     HTON_TEMPORARY_NOT_SUPPORTED |
755     HTON_ALTER_NOT_SUPPORTED;
756   hton->find_files = ndbinfo_find_files;
757 
758   ndbinfo_hton = hton;
759 
760   if (ndbcluster_is_disabled())
761   {
762     // Starting in limited mode since ndbcluster is disabled
763      DBUG_RETURN(0);
764   }
765 
766   char prefix[FN_REFLEN];
767   build_table_filename(prefix, sizeof(prefix) - 1,
768                        opt_ndbinfo_dbname, opt_ndbinfo_table_prefix, "", 0);
769   DBUG_PRINT("info", ("prefix: '%s'", prefix));
770   assert(g_ndb_cluster_connection);
771   g_ndbinfo = new NdbInfo(g_ndb_cluster_connection, prefix,
772                           opt_ndbinfo_dbname, opt_ndbinfo_table_prefix);
773   if (!g_ndbinfo)
774   {
775     sql_print_error("Failed to create NdbInfo");
776     DBUG_RETURN(1);
777   }
778 
779   if (!g_ndbinfo->init())
780   {
781     sql_print_error("Failed to init NdbInfo");
782 
783     delete g_ndbinfo;
784     g_ndbinfo = NULL;
785 
786     DBUG_RETURN(1);
787   }
788 
789   DBUG_RETURN(0);
790 }
791 
ndbinfo_deinit(void * plugin)792 int ndbinfo_deinit(void *plugin)
793 {
794   DBUG_ENTER("ndbinfo_deinit");
795 
796   if (g_ndbinfo)
797   {
798     delete g_ndbinfo;
799     g_ndbinfo = NULL;
800   }
801 
802   DBUG_RETURN(0);
803 }
804 
805 struct st_mysql_sys_var* ndbinfo_system_variables[]= {
806   MYSQL_SYSVAR(max_rows),
807   MYSQL_SYSVAR(max_bytes),
808   MYSQL_SYSVAR(show_hidden),
809   MYSQL_SYSVAR(database),
810   MYSQL_SYSVAR(table_prefix),
811   MYSQL_SYSVAR(version),
812   MYSQL_SYSVAR(offline),
813 
814   NULL
815 };
816 
817 template class Vector<const NdbInfoRecAttr*>;
818 
819 #endif
820