1 /*
2 Copyright (C) 2009 Sun Microsystems Inc.
3 All rights reserved. Use is subject to license terms.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26 #include "ha_ndbcluster_glue.h"
27
28 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
29 #include "ha_ndbinfo.h"
30 #include "../storage/ndb/src/ndbapi/NdbInfo.hpp"
31
32
33 static MYSQL_THDVAR_UINT(
34 max_rows, /* name */
35 PLUGIN_VAR_RQCMDARG,
36 "Specify max number of rows to fetch per roundtrip to cluster",
37 NULL, /* check func. */
38 NULL, /* update func. */
39 10, /* default */
40 1, /* min */
41 256, /* max */
42 0 /* block */
43 );
44
45 static MYSQL_THDVAR_UINT(
46 max_bytes, /* name */
47 PLUGIN_VAR_RQCMDARG,
48 "Specify approx. max number of bytes to fetch per roundtrip to cluster",
49 NULL, /* check func. */
50 NULL, /* update func. */
51 0, /* default */
52 0, /* min */
53 65535, /* max */
54 0 /* block */
55 );
56
57 static MYSQL_THDVAR_BOOL(
58 show_hidden, /* name */
59 PLUGIN_VAR_RQCMDARG,
60 "Control if tables should be visible or not",
61 NULL, /* check func. */
62 NULL, /* update func. */
63 FALSE /* default */
64 );
65
66 static char* opt_ndbinfo_dbname = (char*)"ndbinfo";
67 static MYSQL_SYSVAR_STR(
68 database, /* name */
69 opt_ndbinfo_dbname, /* var */
70 PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
71 "Name of the database used by ndbinfo",
72 NULL, /* check func. */
73 NULL, /* update func. */
74 NULL /* default */
75 );
76
77 static char* opt_ndbinfo_table_prefix = (char*)"ndb$";
78 static MYSQL_SYSVAR_STR(
79 table_prefix, /* name */
80 opt_ndbinfo_table_prefix, /* var */
81 PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
82 "Prefix to use for all virtual tables loaded from NDB",
83 NULL, /* check func. */
84 NULL, /* update func. */
85 NULL /* default */
86 );
87
88 static Uint32 opt_ndbinfo_version = NDB_VERSION_D;
89 static MYSQL_SYSVAR_UINT(
90 version, /* name */
91 opt_ndbinfo_version, /* var */
92 PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_READONLY,
93 "Compile version for ndbinfo",
94 NULL, /* check func. */
95 NULL, /* update func. */
96 0, /* default */
97 0, /* min */
98 0, /* max */
99 0 /* block */
100 );
101
102 static my_bool opt_ndbinfo_offline;
103
104 static
105 void
offline_update(THD * thd,struct st_mysql_sys_var * var,void * var_ptr,const void * save)106 offline_update(THD* thd, struct st_mysql_sys_var* var,
107 void* var_ptr, const void* save)
108 {
109 DBUG_ENTER("offline_update");
110
111 const my_bool new_offline =
112 (*(static_cast<const my_bool*>(save)) != 0);
113 if (new_offline == opt_ndbinfo_offline)
114 {
115 // No change
116 DBUG_VOID_RETURN;
117 }
118
119 // Set offline mode, any tables opened from here on will
120 // be opened in the new mode
121 opt_ndbinfo_offline = new_offline;
122
123 // Close any open tables which may be in the old mode
124 (void)close_cached_tables(thd, NULL, false, true, false);
125
126 DBUG_VOID_RETURN;
127 }
128
129 static MYSQL_SYSVAR_BOOL(
130 offline, /* name */
131 opt_ndbinfo_offline, /* var */
132 PLUGIN_VAR_NOCMDOPT,
133 "Set ndbinfo in offline mode, tables and views can "
134 "be opened even if they don't exist or have different "
135 "definition in NDB. No rows will be returned.",
136 NULL, /* check func. */
137 offline_update, /* update func. */
138 0 /* default */
139 );
140
141
142 static NdbInfo* g_ndbinfo;
143
144 extern Ndb_cluster_connection* g_ndb_cluster_connection;
145
146 static bool
ndbcluster_is_disabled(void)147 ndbcluster_is_disabled(void)
148 {
149 /*
150 ndbinfo uses the same connection as ndbcluster
151 to avoid using up another nodeid, this also means that
152 if ndbcluster is not enabled, ndbinfo won't start
153 */
154 if (g_ndb_cluster_connection)
155 return false;
156 assert(g_ndbinfo == NULL);
157 return true;
158 }
159
160 static handler*
create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)161 create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
162 {
163 return new (mem_root) ha_ndbinfo(hton, table);
164 }
165
166 struct ha_ndbinfo_impl
167 {
168 const NdbInfo::Table* m_table;
169 NdbInfoScanOperation* m_scan_op;
170 Vector<const NdbInfoRecAttr *> m_columns;
171 bool m_first_use;
172
173 // Indicates if table has been opened in offline mode
174 // can only be reset by closing the table
175 bool m_offline;
176
ha_ndbinfo_implha_ndbinfo_impl177 ha_ndbinfo_impl() :
178 m_table(NULL),
179 m_scan_op(NULL),
180 m_first_use(true),
181 m_offline(false)
182 {
183 }
184 };
185
ha_ndbinfo(handlerton * hton,TABLE_SHARE * table_arg)186 ha_ndbinfo::ha_ndbinfo(handlerton *hton, TABLE_SHARE *table_arg)
187 : handler(hton, table_arg), m_impl(*new ha_ndbinfo_impl)
188 {
189 }
190
~ha_ndbinfo()191 ha_ndbinfo::~ha_ndbinfo()
192 {
193 delete &m_impl;
194 }
195
196 enum ndbinfo_error_codes {
197 ERR_INCOMPAT_TABLE_DEF = 40001
198 };
199
200 struct error_message {
201 int error;
202 const char* message;
203 } error_messages[] = {
204 { ERR_INCOMPAT_TABLE_DEF, "Incompatible table definitions" },
205 { HA_ERR_NO_CONNECTION, "Connection to NDB failed" },
206
207 { 0, 0 }
208 };
209
210 static
find_error_message(int error)211 const char* find_error_message(int error)
212 {
213 struct error_message* err = error_messages;
214 while (err->error && err->message)
215 {
216 if (err->error == error)
217 {
218 assert(err->message);
219 return err->message;
220 }
221 err++;
222 }
223 return NULL;
224 }
225
err2mysql(int error)226 static int err2mysql(int error)
227 {
228 DBUG_ENTER("err2mysql");
229 DBUG_PRINT("enter", ("error: %d", error));
230 assert(error != 0);
231 switch(error)
232 {
233 case NdbInfo::ERR_ClusterFailure:
234 DBUG_RETURN(HA_ERR_NO_CONNECTION);
235 break;
236 case NdbInfo::ERR_OutOfMemory:
237 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
238 break;
239 default:
240 break;
241 }
242 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
243 ER_GET_ERRNO, ER(ER_GET_ERRNO), error);
244 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
245 }
246
get_error_message(int error,String * buf)247 bool ha_ndbinfo::get_error_message(int error, String *buf)
248 {
249 DBUG_ENTER("ha_ndbinfo::get_error_message");
250 DBUG_PRINT("enter", ("error: %d", error));
251
252 const char* message = find_error_message(error);
253 if (!message)
254 DBUG_RETURN(false);
255
256 buf->set(message, strlen(message), &my_charset_bin);
257 DBUG_PRINT("exit", ("message: %s", buf->ptr()));
258 DBUG_RETURN(false);
259 }
260
261 static void
generate_sql(const NdbInfo::Table * ndb_tab,BaseString & sql)262 generate_sql(const NdbInfo::Table* ndb_tab, BaseString& sql)
263 {
264 sql.appfmt("'CREATE TABLE `%s`.`%s%s` (",
265 opt_ndbinfo_dbname, opt_ndbinfo_table_prefix, ndb_tab->getName());
266
267 const char* separator = "";
268 for (unsigned i = 0; i < ndb_tab->columns(); i++)
269 {
270 const NdbInfo::Column* col = ndb_tab->getColumn(i);
271
272 sql.appfmt("%s", separator);
273 separator = ", ";
274
275 sql.appfmt("`%s` ", col->m_name.c_str());
276
277 switch(col->m_type)
278 {
279 case NdbInfo::Column::Number:
280 sql.appfmt("INT UNSIGNED");
281 break;
282 case NdbInfo::Column::Number64:
283 sql.appfmt("BIGINT UNSIGNED");
284 break;
285 case NdbInfo::Column::String:
286 sql.appfmt("VARCHAR(512)");
287 break;
288 default:
289 sql.appfmt("UNKNOWN");
290 assert(false);
291 break;
292 }
293 }
294 sql.appfmt(") ENGINE=NDBINFO'");
295 }
296
297 /*
298 Push a warning with explanation of the problem as well as the
299 proper SQL so the user can regenerate the table definition
300 */
301
302 static void
warn_incompatible(const NdbInfo::Table * ndb_tab,bool fatal,const char * format,...)303 warn_incompatible(const NdbInfo::Table* ndb_tab, bool fatal,
304 const char* format, ...)
305 {
306 BaseString msg;
307 DBUG_ENTER("warn_incompatible");
308 DBUG_PRINT("enter",("table_name: %s, fatal: %d", ndb_tab->getName(), fatal));
309 DBUG_ASSERT(format != NULL);
310
311 va_list args;
312 char explanation[128];
313 va_start(args,format);
314 my_vsnprintf(explanation, sizeof(explanation), format, args);
315 va_end(args);
316
317 msg.assfmt("Table '%s%s' is defined differently in NDB, %s. The "
318 "SQL to regenerate is: ",
319 opt_ndbinfo_table_prefix, ndb_tab->getName(), explanation);
320 generate_sql(ndb_tab, msg);
321
322 const Sql_condition::enum_warning_level level =
323 (fatal ? Sql_condition::WARN_LEVEL_WARN : Sql_condition::WARN_LEVEL_NOTE);
324 push_warning(current_thd, level, ERR_INCOMPAT_TABLE_DEF, msg.c_str());
325
326 DBUG_VOID_RETURN;
327 }
328
create(const char * name,TABLE * form,HA_CREATE_INFO * create_info)329 int ha_ndbinfo::create(const char *name, TABLE *form,
330 HA_CREATE_INFO *create_info)
331 {
332 DBUG_ENTER("ha_ndbinfo::create");
333 DBUG_PRINT("enter", ("name: %s", name));
334
335 DBUG_RETURN(0);
336 }
337
is_open(void) const338 bool ha_ndbinfo::is_open(void) const
339 {
340 return m_impl.m_table != NULL;
341 }
342
is_offline(void) const343 bool ha_ndbinfo::is_offline(void) const
344 {
345 return m_impl.m_offline;
346 }
347
open(const char * name,int mode,uint test_if_locked)348 int ha_ndbinfo::open(const char *name, int mode, uint test_if_locked)
349 {
350 DBUG_ENTER("ha_ndbinfo::open");
351 DBUG_PRINT("enter", ("name: %s, mode: %d", name, mode));
352
353 assert(is_closed());
354 assert(!is_offline()); // Closed table can not be offline
355
356 if (mode == O_RDWR)
357 {
358 if (table->db_stat & HA_TRY_READ_ONLY)
359 {
360 DBUG_PRINT("info", ("Telling server to use readonly mode"));
361 DBUG_RETURN(EROFS); // Read only fs
362 }
363 // Find any commands that does not allow open readonly
364 DBUG_ASSERT(false);
365 }
366
367 if (opt_ndbinfo_offline ||
368 ndbcluster_is_disabled())
369 {
370 // Mark table as being offline and allow it to be opened
371 m_impl.m_offline = true;
372 DBUG_RETURN(0);
373 }
374
375 int err = g_ndbinfo->openTable(name, &m_impl.m_table);
376 if (err)
377 {
378 assert(m_impl.m_table == 0);
379 if (err == NdbInfo::ERR_NoSuchTable)
380 DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
381 DBUG_RETURN(err2mysql(err));
382 }
383
384 /*
385 Check table def. to detect incompatible differences which should
386 return an error. Differences which only generate a warning
387 is checked on first use
388 */
389 DBUG_PRINT("info", ("Comparing MySQL's table def against NDB"));
390 const NdbInfo::Table* ndb_tab = m_impl.m_table;
391 for (uint i = 0; i < table->s->fields; i++)
392 {
393 const Field* field = table->field[i];
394
395 // Check if field is NULLable
396 if (const_cast<Field*>(field)->real_maybe_null() == false)
397 {
398 // Only NULLable fields supported
399 warn_incompatible(ndb_tab, true,
400 "column '%s' is NOT NULL",
401 field->field_name);
402 delete m_impl.m_table; m_impl.m_table= 0;
403 DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
404 }
405
406 // Check if column exist in NDB
407 const NdbInfo::Column* col = ndb_tab->getColumn(field->field_name);
408 if (!col)
409 {
410 // The column didn't exist
411 continue;
412 }
413
414 // Check compatible field and column type
415 bool compatible = false;
416 switch(col->m_type)
417 {
418 case NdbInfo::Column::Number:
419 if (field->type() == MYSQL_TYPE_LONG)
420 compatible = true;
421 break;
422 case NdbInfo::Column::Number64:
423 if (field->type() == MYSQL_TYPE_LONGLONG)
424 compatible = true;
425 break;
426 case NdbInfo::Column::String:
427 if (field->type() == MYSQL_TYPE_VARCHAR)
428 compatible = true;
429 break;
430 default:
431 assert(false);
432 break;
433 }
434 if (!compatible)
435 {
436 // The column type is not compatible
437 warn_incompatible(ndb_tab, true,
438 "column '%s' is not compatible",
439 field->field_name);
440 delete m_impl.m_table; m_impl.m_table= 0;
441 DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
442 }
443 }
444
445 /* Increase "ref_length" to allow a whole row to be stored in "ref" */
446 ref_length = 0;
447 for (uint i = 0; i < table->s->fields; i++)
448 ref_length += table->field[i]->pack_length();
449 DBUG_PRINT("info", ("ref_length: %u", ref_length));
450
451 DBUG_RETURN(0);
452 }
453
close(void)454 int ha_ndbinfo::close(void)
455 {
456 DBUG_ENTER("ha_ndbinfo::close");
457
458 if (is_offline())
459 DBUG_RETURN(0);
460
461 assert(is_open());
462 if (m_impl.m_table)
463 {
464 g_ndbinfo->closeTable(m_impl.m_table);
465 m_impl.m_table = NULL;
466 }
467 DBUG_RETURN(0);
468 }
469
rnd_init(bool scan)470 int ha_ndbinfo::rnd_init(bool scan)
471 {
472 DBUG_ENTER("ha_ndbinfo::rnd_init");
473 DBUG_PRINT("info", ("scan: %d", scan));
474
475 if (is_offline())
476 {
477 push_warning(current_thd, Sql_condition::WARN_LEVEL_NOTE, 1,
478 "'NDBINFO' has been started in offline mode "
479 "since the 'NDBCLUSTER' engine is disabled "
480 "or @@global.ndbinfo_offline is turned on "
481 "- no rows can be returned");
482 DBUG_RETURN(0);
483 }
484
485 assert(is_open());
486 assert(m_impl.m_scan_op == NULL); // No scan already ongoing
487
488 if (m_impl.m_first_use)
489 {
490 m_impl.m_first_use = false;
491
492 /*
493 Check table def. and generate warnings for incompatibilites
494 which is allowed but should generate a warning.
495 (Done this late due to different code paths in MySQL Server for
496 prepared statement protocol, where warnings from 'handler::open'
497 are lost).
498 */
499 uint fields_found_in_ndb = 0;
500 const NdbInfo::Table* ndb_tab = m_impl.m_table;
501 for (uint i = 0; i < table->s->fields; i++)
502 {
503 const Field* field = table->field[i];
504 const NdbInfo::Column* col = ndb_tab->getColumn(field->field_name);
505 if (!col)
506 {
507 // The column didn't exist
508 warn_incompatible(ndb_tab, true,
509 "column '%s' does not exist",
510 field->field_name);
511 continue;
512 }
513 fields_found_in_ndb++;
514 }
515
516 if (fields_found_in_ndb < ndb_tab->columns())
517 {
518 // There are more columns available in NDB
519 warn_incompatible(ndb_tab, false,
520 "there are more columns available");
521 }
522 }
523
524 if (!scan)
525 {
526 // Just an init to read using 'rnd_pos'
527 DBUG_PRINT("info", ("not scan"));
528 DBUG_RETURN(0);
529 }
530
531 THD* thd = current_thd;
532 int err;
533 NdbInfoScanOperation* scan_op = NULL;
534 if ((err = g_ndbinfo->createScanOperation(m_impl.m_table,
535 &scan_op,
536 THDVAR(thd, max_rows),
537 THDVAR(thd, max_bytes))) != 0)
538 DBUG_RETURN(err2mysql(err));
539
540 if ((err = scan_op->readTuples()) != 0)
541 DBUG_RETURN(err2mysql(err));
542
543 /* Read all columns specified in read_set */
544 for (uint i = 0; i < table->s->fields; i++)
545 {
546 Field *field = table->field[i];
547 if (bitmap_is_set(table->read_set, i))
548 m_impl.m_columns.push_back(scan_op->getValue(field->field_name));
549 else
550 m_impl.m_columns.push_back(NULL);
551 }
552
553 if ((err = scan_op->execute()) != 0)
554 DBUG_RETURN(err2mysql(err));
555
556 m_impl.m_scan_op = scan_op;
557 DBUG_RETURN(0);
558 }
559
rnd_end()560 int ha_ndbinfo::rnd_end()
561 {
562 DBUG_ENTER("ha_ndbinfo::rnd_end");
563
564 if (is_offline())
565 DBUG_RETURN(0);
566
567 assert(is_open());
568
569 if (m_impl.m_scan_op)
570 {
571 g_ndbinfo->releaseScanOperation(m_impl.m_scan_op);
572 m_impl.m_scan_op = NULL;
573 }
574 m_impl.m_columns.clear();
575
576 DBUG_RETURN(0);
577 }
578
rnd_next(uchar * buf)579 int ha_ndbinfo::rnd_next(uchar *buf)
580 {
581 int err;
582 DBUG_ENTER("ha_ndbinfo::rnd_next");
583
584 if (is_offline())
585 DBUG_RETURN(HA_ERR_END_OF_FILE);
586
587 assert(is_open());
588 assert(m_impl.m_scan_op);
589
590 if ((err = m_impl.m_scan_op->nextResult()) == 0)
591 DBUG_RETURN(HA_ERR_END_OF_FILE);
592
593 if (err != 1)
594 DBUG_RETURN(err2mysql(err));
595
596 unpack_record(buf);
597
598 DBUG_RETURN(0);
599 }
600
rnd_pos(uchar * buf,uchar * pos)601 int ha_ndbinfo::rnd_pos(uchar *buf, uchar *pos)
602 {
603 DBUG_ENTER("ha_ndbinfo::rnd_pos");
604 assert(is_open());
605 assert(m_impl.m_scan_op == NULL); // No scan started
606
607 /* Copy the saved row into "buf" and set all fields to not null */
608 memcpy(buf, pos, ref_length);
609 for (uint i = 0; i < table->s->fields; i++)
610 table->field[i]->set_notnull();
611
612 DBUG_RETURN(0);
613 }
614
position(const uchar * record)615 void ha_ndbinfo::position(const uchar *record)
616 {
617 DBUG_ENTER("ha_ndbinfo::position");
618 assert(is_open());
619 assert(m_impl.m_scan_op);
620
621 /* Save away the whole row in "ref" */
622 memcpy(ref, record, ref_length);
623
624 DBUG_VOID_RETURN;
625 }
626
info(uint flag)627 int ha_ndbinfo::info(uint flag)
628 {
629 DBUG_ENTER("ha_ndbinfo::info");
630 DBUG_PRINT("enter", ("flag: %d", flag));
631 DBUG_RETURN(0);
632 }
633
634 void
unpack_record(uchar * dst_row)635 ha_ndbinfo::unpack_record(uchar *dst_row)
636 {
637 DBUG_ENTER("ha_ndbinfo::unpack_record");
638 my_ptrdiff_t dst_offset = dst_row - table->record[0];
639
640 for (uint i = 0; i < table->s->fields; i++)
641 {
642 Field *field = table->field[i];
643 const NdbInfoRecAttr* record = m_impl.m_columns[i];
644 if (record && !record->isNULL())
645 {
646 field->set_notnull();
647 field->move_field_offset(dst_offset);
648 switch (field->type()) {
649
650 case (MYSQL_TYPE_VARCHAR):
651 {
652 DBUG_PRINT("info", ("str: %s", record->c_str()));
653 Field_varstring* vfield = (Field_varstring *) field;
654 /* Field_bit in DBUG requires the bit set in write_set for store(). */
655 my_bitmap_map *old_map =
656 dbug_tmp_use_all_columns(table, table->write_set);
657 (void)vfield->store(record->c_str(),
658 MIN(record->length(), field->field_length)-1,
659 field->charset());
660 dbug_tmp_restore_column_map(table->write_set, old_map);
661 break;
662 }
663
664 case (MYSQL_TYPE_LONG):
665 {
666 memcpy(field->ptr, record->ptr(), sizeof(Uint32));
667 break;
668 }
669
670 case (MYSQL_TYPE_LONGLONG):
671 {
672 memcpy(field->ptr, record->ptr(), sizeof(Uint64));
673 break;
674 }
675
676 default:
677 sql_print_error("Found unexpected field type %u", field->type());
678 break;
679 }
680
681 field->move_field_offset(-dst_offset);
682 }
683 else
684 {
685 field->set_null();
686 }
687 }
688 DBUG_VOID_RETURN;
689 }
690
691
692 static int
ndbinfo_find_files(handlerton * hton,THD * thd,const char * db,const char * path,const char * wild,bool dir,List<LEX_STRING> * files)693 ndbinfo_find_files(handlerton *hton, THD *thd,
694 const char *db, const char *path,
695 const char *wild, bool dir, List<LEX_STRING> *files)
696 {
697 DBUG_ENTER("ndbinfo_find_files");
698 DBUG_PRINT("enter", ("db: '%s', dir: %d, path: '%s'", db, dir, path));
699
700 const bool show_hidden = THDVAR(thd, show_hidden);
701
702 if(show_hidden)
703 DBUG_RETURN(0); // Don't filter out anything
704
705 if (dir)
706 {
707 if (!ndbcluster_is_disabled())
708 DBUG_RETURN(0);
709
710 // Hide our database when ndbcluster is disabled
711 LEX_STRING *dir_name;
712 List_iterator<LEX_STRING> it(*files);
713 while ((dir_name=it++))
714 {
715 if (strcmp(dir_name->str, opt_ndbinfo_dbname))
716 continue;
717
718 DBUG_PRINT("info", ("Hiding own databse '%s'", dir_name->str));
719 it.remove();
720 }
721
722 DBUG_RETURN(0);
723 }
724
725 DBUG_ASSERT(db);
726 if (strcmp(db, opt_ndbinfo_dbname))
727 DBUG_RETURN(0); // Only hide files in "our" db
728
729 /* Hide all files that start with "our" prefix */
730 LEX_STRING *file_name;
731 List_iterator<LEX_STRING> it(*files);
732 while ((file_name=it++))
733 {
734 if (is_prefix(file_name->str, opt_ndbinfo_table_prefix))
735 {
736 DBUG_PRINT("info", ("Hiding '%s'", file_name->str));
737 it.remove();
738 }
739 }
740
741 DBUG_RETURN(0);
742 }
743
744
745 handlerton* ndbinfo_hton;
746
ndbinfo_init(void * plugin)747 int ndbinfo_init(void *plugin)
748 {
749 DBUG_ENTER("ndbinfo_init");
750
751 handlerton *hton = (handlerton *) plugin;
752 hton->create = create_handler;
753 hton->flags =
754 HTON_TEMPORARY_NOT_SUPPORTED |
755 HTON_ALTER_NOT_SUPPORTED;
756 hton->find_files = ndbinfo_find_files;
757
758 ndbinfo_hton = hton;
759
760 if (ndbcluster_is_disabled())
761 {
762 // Starting in limited mode since ndbcluster is disabled
763 DBUG_RETURN(0);
764 }
765
766 char prefix[FN_REFLEN];
767 build_table_filename(prefix, sizeof(prefix) - 1,
768 opt_ndbinfo_dbname, opt_ndbinfo_table_prefix, "", 0);
769 DBUG_PRINT("info", ("prefix: '%s'", prefix));
770 assert(g_ndb_cluster_connection);
771 g_ndbinfo = new NdbInfo(g_ndb_cluster_connection, prefix,
772 opt_ndbinfo_dbname, opt_ndbinfo_table_prefix);
773 if (!g_ndbinfo)
774 {
775 sql_print_error("Failed to create NdbInfo");
776 DBUG_RETURN(1);
777 }
778
779 if (!g_ndbinfo->init())
780 {
781 sql_print_error("Failed to init NdbInfo");
782
783 delete g_ndbinfo;
784 g_ndbinfo = NULL;
785
786 DBUG_RETURN(1);
787 }
788
789 DBUG_RETURN(0);
790 }
791
ndbinfo_deinit(void * plugin)792 int ndbinfo_deinit(void *plugin)
793 {
794 DBUG_ENTER("ndbinfo_deinit");
795
796 if (g_ndbinfo)
797 {
798 delete g_ndbinfo;
799 g_ndbinfo = NULL;
800 }
801
802 DBUG_RETURN(0);
803 }
804
805 struct st_mysql_sys_var* ndbinfo_system_variables[]= {
806 MYSQL_SYSVAR(max_rows),
807 MYSQL_SYSVAR(max_bytes),
808 MYSQL_SYSVAR(show_hidden),
809 MYSQL_SYSVAR(database),
810 MYSQL_SYSVAR(table_prefix),
811 MYSQL_SYSVAR(version),
812 MYSQL_SYSVAR(offline),
813
814 NULL
815 };
816
817 template class Vector<const NdbInfoRecAttr*>;
818
819 #endif
820