1 /*
2 Copyright (c) 2009, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "ha_ndbcluster_glue.h"
26 #include "ha_ndbinfo.h"
27 #include "ndb_tdc.h"
28 #include "../storage/ndb/src/ndbapi/NdbInfo.hpp"
29
30
31 static MYSQL_THDVAR_UINT(
32 max_rows, /* name */
33 PLUGIN_VAR_RQCMDARG,
34 "Specify max number of rows to fetch per roundtrip to cluster",
35 NULL, /* check func. */
36 NULL, /* update func. */
37 10, /* default */
38 1, /* min */
39 256, /* max */
40 0 /* block */
41 );
42
43 static MYSQL_THDVAR_UINT(
44 max_bytes, /* name */
45 PLUGIN_VAR_RQCMDARG,
46 "Specify approx. max number of bytes to fetch per roundtrip to cluster",
47 NULL, /* check func. */
48 NULL, /* update func. */
49 0, /* default */
50 0, /* min */
51 65535, /* max */
52 0 /* block */
53 );
54
55 static MYSQL_THDVAR_BOOL(
56 show_hidden, /* name */
57 PLUGIN_VAR_RQCMDARG,
58 "Control if tables should be visible or not",
59 NULL, /* check func. */
60 NULL, /* update func. */
61 FALSE /* default */
62 );
63
64 static char* opt_ndbinfo_dbname = (char*)"ndbinfo";
65 static MYSQL_SYSVAR_STR(
66 database, /* name */
67 opt_ndbinfo_dbname, /* var */
68 PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
69 "Name of the database used by ndbinfo",
70 NULL, /* check func. */
71 NULL, /* update func. */
72 NULL /* default */
73 );
74
75 static char* opt_ndbinfo_table_prefix = (char*)"ndb$";
76 static MYSQL_SYSVAR_STR(
77 table_prefix, /* name */
78 opt_ndbinfo_table_prefix, /* var */
79 PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
80 "Prefix to use for all virtual tables loaded from NDB",
81 NULL, /* check func. */
82 NULL, /* update func. */
83 NULL /* default */
84 );
85
86 static Uint32 opt_ndbinfo_version = NDB_VERSION_D;
87 static MYSQL_SYSVAR_UINT(
88 version, /* name */
89 opt_ndbinfo_version, /* var */
90 PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_READONLY,
91 "Compile version for ndbinfo",
92 NULL, /* check func. */
93 NULL, /* update func. */
94 0, /* default */
95 0, /* min */
96 0, /* max */
97 0 /* block */
98 );
99
100 static my_bool opt_ndbinfo_offline;
101
102 static
103 void
offline_update(THD * thd,struct st_mysql_sys_var * var,void * var_ptr,const void * save)104 offline_update(THD* thd, struct st_mysql_sys_var* var,
105 void* var_ptr, const void* save)
106 {
107 DBUG_ENTER("offline_update");
108
109 const my_bool new_offline =
110 (*(static_cast<const my_bool*>(save)) != 0);
111 if (new_offline == opt_ndbinfo_offline)
112 {
113 // No change
114 DBUG_VOID_RETURN;
115 }
116
117 // Set offline mode, any tables opened from here on will
118 // be opened in the new mode
119 opt_ndbinfo_offline = new_offline;
120
121 // Close any open tables which may be in the old mode
122 (void)ndb_tdc_close_cached_tables();
123
124 DBUG_VOID_RETURN;
125 }
126
127 static MYSQL_SYSVAR_BOOL(
128 offline, /* name */
129 opt_ndbinfo_offline, /* var */
130 PLUGIN_VAR_NOCMDOPT,
131 "Set ndbinfo in offline mode, tables and views can "
132 "be opened even if they don't exist or have different "
133 "definition in NDB. No rows will be returned.",
134 NULL, /* check func. */
135 offline_update, /* update func. */
136 0 /* default */
137 );
138
139
140 static NdbInfo* g_ndbinfo;
141
142 extern Ndb_cluster_connection* g_ndb_cluster_connection;
143
144 static bool
ndbcluster_is_disabled(void)145 ndbcluster_is_disabled(void)
146 {
147 /*
148 ndbinfo uses the same connection as ndbcluster
149 to avoid using up another nodeid, this also means that
150 if ndbcluster is not enabled, ndbinfo won't start
151 */
152 if (g_ndb_cluster_connection)
153 return false;
154 assert(g_ndbinfo == NULL);
155 return true;
156 }
157
158 static handler*
create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)159 create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
160 {
161 return new (mem_root) ha_ndbinfo(hton, table);
162 }
163
164 struct ha_ndbinfo_impl
165 {
166 const NdbInfo::Table* m_table;
167 NdbInfoScanOperation* m_scan_op;
168 Vector<const NdbInfoRecAttr *> m_columns;
169 bool m_first_use;
170
171 // Indicates if table has been opened in offline mode
172 // can only be reset by closing the table
173 bool m_offline;
174
ha_ndbinfo_implha_ndbinfo_impl175 ha_ndbinfo_impl() :
176 m_table(NULL),
177 m_scan_op(NULL),
178 m_first_use(true),
179 m_offline(false)
180 {
181 }
182 };
183
ha_ndbinfo(handlerton * hton,TABLE_SHARE * table_arg)184 ha_ndbinfo::ha_ndbinfo(handlerton *hton, TABLE_SHARE *table_arg)
185 : handler(hton, table_arg), m_impl(*new ha_ndbinfo_impl)
186 {
187 }
188
~ha_ndbinfo()189 ha_ndbinfo::~ha_ndbinfo()
190 {
191 delete &m_impl;
192 }
193
194 enum ndbinfo_error_codes {
195 ERR_INCOMPAT_TABLE_DEF = 40001
196 };
197
198 static
199 struct error_message {
200 int error;
201 const char* message;
202 } error_messages[] = {
203 { ERR_INCOMPAT_TABLE_DEF, "Incompatible table definitions" },
204 { HA_ERR_NO_CONNECTION, "Connection to NDB failed" },
205
206 { 0, 0 }
207 };
208
209 static
find_error_message(int error)210 const char* find_error_message(int error)
211 {
212 struct error_message* err = error_messages;
213 while (err->error && err->message)
214 {
215 if (err->error == error)
216 {
217 assert(err->message);
218 return err->message;
219 }
220 err++;
221 }
222 return NULL;
223 }
224
err2mysql(int error)225 static int err2mysql(int error)
226 {
227 DBUG_ENTER("err2mysql");
228 DBUG_PRINT("enter", ("error: %d", error));
229 assert(error != 0);
230 switch(error)
231 {
232 case NdbInfo::ERR_ClusterFailure:
233 DBUG_RETURN(HA_ERR_NO_CONNECTION);
234 break;
235 case NdbInfo::ERR_OutOfMemory:
236 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
237 break;
238 default:
239 break;
240 }
241 push_warning_printf(current_thd, Sql_condition::SL_WARNING,
242 ER_GET_ERRNO, ER(ER_GET_ERRNO), error);
243 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
244 }
245
get_error_message(int error,String * buf)246 bool ha_ndbinfo::get_error_message(int error, String *buf)
247 {
248 DBUG_ENTER("ha_ndbinfo::get_error_message");
249 DBUG_PRINT("enter", ("error: %d", error));
250
251 const char* message = find_error_message(error);
252 if (!message)
253 DBUG_RETURN(false);
254
255 buf->set(message, (uint32)strlen(message), &my_charset_bin);
256 DBUG_PRINT("exit", ("message: %s", buf->ptr()));
257 DBUG_RETURN(false);
258 }
259
260 static void
generate_sql(const NdbInfo::Table * ndb_tab,BaseString & sql)261 generate_sql(const NdbInfo::Table* ndb_tab, BaseString& sql)
262 {
263 sql.appfmt("'CREATE TABLE `%s`.`%s%s` (",
264 opt_ndbinfo_dbname, opt_ndbinfo_table_prefix, ndb_tab->getName());
265
266 const char* separator = "";
267 for (unsigned i = 0; i < ndb_tab->columns(); i++)
268 {
269 const NdbInfo::Column* col = ndb_tab->getColumn(i);
270
271 sql.appfmt("%s", separator);
272 separator = ", ";
273
274 sql.appfmt("`%s` ", col->m_name.c_str());
275
276 switch(col->m_type)
277 {
278 case NdbInfo::Column::Number:
279 sql.appfmt("INT UNSIGNED");
280 break;
281 case NdbInfo::Column::Number64:
282 sql.appfmt("BIGINT UNSIGNED");
283 break;
284 case NdbInfo::Column::String:
285 sql.appfmt("VARCHAR(512)");
286 break;
287 default:
288 sql.appfmt("UNKNOWN");
289 assert(false);
290 break;
291 }
292 }
293 sql.appfmt(") ENGINE=NDBINFO'");
294 }
295
296 /*
297 Push a warning with explanation of the problem as well as the
298 proper SQL so the user can regenerate the table definition
299 */
300
301 static void
warn_incompatible(const NdbInfo::Table * ndb_tab,bool fatal,const char * format,...)302 warn_incompatible(const NdbInfo::Table* ndb_tab, bool fatal,
303 const char* format, ...)
304 {
305 BaseString msg;
306 DBUG_ENTER("warn_incompatible");
307 DBUG_PRINT("enter",("table_name: %s, fatal: %d", ndb_tab->getName(), fatal));
308 assert(format != NULL);
309
310 va_list args;
311 char explanation[128];
312 va_start(args,format);
313 my_vsnprintf(explanation, sizeof(explanation), format, args);
314 va_end(args);
315
316 msg.assfmt("Table '%s%s' is defined differently in NDB, %s. The "
317 "SQL to regenerate is: ",
318 opt_ndbinfo_table_prefix, ndb_tab->getName(), explanation);
319 generate_sql(ndb_tab, msg);
320
321 const Sql_condition::enum_severity_level level =
322 (fatal ? Sql_condition::SL_WARNING : Sql_condition::SL_NOTE);
323 push_warning(current_thd, level, ERR_INCOMPAT_TABLE_DEF, msg.c_str());
324
325 DBUG_VOID_RETURN;
326 }
327
create(const char * name,TABLE * form,HA_CREATE_INFO * create_info)328 int ha_ndbinfo::create(const char *name, TABLE *form,
329 HA_CREATE_INFO *create_info)
330 {
331 DBUG_ENTER("ha_ndbinfo::create");
332 DBUG_PRINT("enter", ("name: %s", name));
333
334 DBUG_RETURN(0);
335 }
336
is_open(void) const337 bool ha_ndbinfo::is_open(void) const
338 {
339 return m_impl.m_table != NULL;
340 }
341
is_offline(void) const342 bool ha_ndbinfo::is_offline(void) const
343 {
344 return m_impl.m_offline;
345 }
346
open(const char * name,int mode,uint test_if_locked)347 int ha_ndbinfo::open(const char *name, int mode, uint test_if_locked)
348 {
349 DBUG_ENTER("ha_ndbinfo::open");
350 DBUG_PRINT("enter", ("name: %s, mode: %d", name, mode));
351
352 assert(is_closed());
353 assert(!is_offline()); // Closed table can not be offline
354
355 if (mode == O_RDWR)
356 {
357 if (table->db_stat & HA_TRY_READ_ONLY)
358 {
359 DBUG_PRINT("info", ("Telling server to use readonly mode"));
360 DBUG_RETURN(EROFS); // Read only fs
361 }
362 // Find any commands that does not allow open readonly
363 assert(false);
364 }
365
366 if (opt_ndbinfo_offline ||
367 ndbcluster_is_disabled())
368 {
369 // Mark table as being offline and allow it to be opened
370 m_impl.m_offline = true;
371 DBUG_RETURN(0);
372 }
373
374 int err = g_ndbinfo->openTable(name, &m_impl.m_table);
375 if (err)
376 {
377 assert(m_impl.m_table == 0);
378 if (err == NdbInfo::ERR_NoSuchTable)
379 DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
380 DBUG_RETURN(err2mysql(err));
381 }
382
383 /*
384 Check table def. to detect incompatible differences which should
385 return an error. Differences which only generate a warning
386 is checked on first use
387 */
388 DBUG_PRINT("info", ("Comparing MySQL's table def against NDB"));
389 const NdbInfo::Table* ndb_tab = m_impl.m_table;
390 for (uint i = 0; i < table->s->fields; i++)
391 {
392 const Field* field = table->field[i];
393
394 // Check if field is NULLable
395 if (const_cast<Field*>(field)->real_maybe_null() == false)
396 {
397 // Only NULLable fields supported
398 warn_incompatible(ndb_tab, true,
399 "column '%s' is NOT NULL",
400 field->field_name);
401 delete m_impl.m_table; m_impl.m_table= 0;
402 DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
403 }
404
405 // Check if column exist in NDB
406 const NdbInfo::Column* col = ndb_tab->getColumn(field->field_name);
407 if (!col)
408 {
409 // The column didn't exist
410 continue;
411 }
412
413 // Check compatible field and column type
414 bool compatible = false;
415 switch(col->m_type)
416 {
417 case NdbInfo::Column::Number:
418 if (field->type() == MYSQL_TYPE_LONG)
419 compatible = true;
420 break;
421 case NdbInfo::Column::Number64:
422 if (field->type() == MYSQL_TYPE_LONGLONG)
423 compatible = true;
424 break;
425 case NdbInfo::Column::String:
426 if (field->type() == MYSQL_TYPE_VARCHAR)
427 compatible = true;
428 break;
429 default:
430 assert(false);
431 break;
432 }
433 if (!compatible)
434 {
435 // The column type is not compatible
436 warn_incompatible(ndb_tab, true,
437 "column '%s' is not compatible",
438 field->field_name);
439 delete m_impl.m_table; m_impl.m_table= 0;
440 DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
441 }
442 }
443
444 /* Increase "ref_length" to allow a whole row to be stored in "ref" */
445 ref_length = 0;
446 for (uint i = 0; i < table->s->fields; i++)
447 ref_length += table->field[i]->pack_length();
448 DBUG_PRINT("info", ("ref_length: %u", ref_length));
449
450 DBUG_RETURN(0);
451 }
452
close(void)453 int ha_ndbinfo::close(void)
454 {
455 DBUG_ENTER("ha_ndbinfo::close");
456
457 if (is_offline())
458 DBUG_RETURN(0);
459
460 assert(is_open());
461 if (m_impl.m_table)
462 {
463 g_ndbinfo->closeTable(m_impl.m_table);
464 m_impl.m_table = NULL;
465 }
466 DBUG_RETURN(0);
467 }
468
rnd_init(bool scan)469 int ha_ndbinfo::rnd_init(bool scan)
470 {
471 DBUG_ENTER("ha_ndbinfo::rnd_init");
472 DBUG_PRINT("info", ("scan: %d", scan));
473
474 if (is_offline())
475 {
476 push_warning(current_thd, Sql_condition::SL_NOTE, 1,
477 "'NDBINFO' has been started in offline mode "
478 "since the 'NDBCLUSTER' engine is disabled "
479 "or @@global.ndbinfo_offline is turned on "
480 "- no rows can be returned");
481 DBUG_RETURN(0);
482 }
483
484 assert(is_open());
485
486 if (m_impl.m_scan_op)
487 {
488 /*
489 It should be impossible to come here with an already open
490 scan, assumption is that rnd_end() would be called to indicate
491 that the previous scan should be closed or perhaps like it says
492 in decsription of rnd_init() that it "may be called two times". Once
493 to open the cursor and once to position te cursor at first row.
494
495 Unfortunately the assumption and description of rnd_init() is not
496 correct. The rnd_init function is used on an open scan to reposition
497 it back to first row. For ha_ndbinfo this means closing
498 the scan and letting it be reopened.
499 */
500 assert(scan); // "only makes sense if scan=1" (from rnd_init() description)
501
502 DBUG_PRINT("info", ("Closing scan to position it back to first row"));
503
504 // Release the scan operation
505 g_ndbinfo->releaseScanOperation(m_impl.m_scan_op);
506 m_impl.m_scan_op = NULL;
507
508 // Release pointers to the columns
509 m_impl.m_columns.clear();
510 }
511
512 assert(m_impl.m_scan_op == NULL); // No scan already ongoing
513
514 if (m_impl.m_first_use)
515 {
516 m_impl.m_first_use = false;
517
518 /*
519 Check table def. and generate warnings for incompatibilites
520 which is allowed but should generate a warning.
521 (Done this late due to different code paths in MySQL Server for
522 prepared statement protocol, where warnings from 'handler::open'
523 are lost).
524 */
525 uint fields_found_in_ndb = 0;
526 const NdbInfo::Table* ndb_tab = m_impl.m_table;
527 for (uint i = 0; i < table->s->fields; i++)
528 {
529 const Field* field = table->field[i];
530 const NdbInfo::Column* col = ndb_tab->getColumn(field->field_name);
531 if (!col)
532 {
533 // The column didn't exist
534 warn_incompatible(ndb_tab, true,
535 "column '%s' does not exist",
536 field->field_name);
537 continue;
538 }
539 fields_found_in_ndb++;
540 }
541
542 if (fields_found_in_ndb < ndb_tab->columns())
543 {
544 // There are more columns available in NDB
545 warn_incompatible(ndb_tab, false,
546 "there are more columns available");
547 }
548 }
549
550 if (!scan)
551 {
552 // Just an init to read using 'rnd_pos'
553 DBUG_PRINT("info", ("not scan"));
554 DBUG_RETURN(0);
555 }
556
557 THD* thd = current_thd;
558 int err;
559 NdbInfoScanOperation* scan_op = NULL;
560 if ((err = g_ndbinfo->createScanOperation(m_impl.m_table,
561 &scan_op,
562 THDVAR(thd, max_rows),
563 THDVAR(thd, max_bytes))) != 0)
564 DBUG_RETURN(err2mysql(err));
565
566 if ((err = scan_op->readTuples()) != 0)
567 {
568 // Release the scan operation
569 g_ndbinfo->releaseScanOperation(scan_op);
570 DBUG_RETURN(err2mysql(err));
571 }
572
573 /* Read all columns specified in read_set */
574 for (uint i = 0; i < table->s->fields; i++)
575 {
576 Field *field = table->field[i];
577 if (bitmap_is_set(table->read_set, i))
578 m_impl.m_columns.push_back(scan_op->getValue(field->field_name));
579 else
580 m_impl.m_columns.push_back(NULL);
581 }
582
583 if ((err = scan_op->execute()) != 0)
584 {
585 // Release pointers to the columns
586 m_impl.m_columns.clear();
587 // Release the scan operation
588 g_ndbinfo->releaseScanOperation(scan_op);
589 DBUG_RETURN(err2mysql(err));
590 }
591
592 m_impl.m_scan_op = scan_op;
593 DBUG_RETURN(0);
594 }
595
rnd_end()596 int ha_ndbinfo::rnd_end()
597 {
598 DBUG_ENTER("ha_ndbinfo::rnd_end");
599
600 if (is_offline())
601 DBUG_RETURN(0);
602
603 assert(is_open());
604
605 if (m_impl.m_scan_op)
606 {
607 g_ndbinfo->releaseScanOperation(m_impl.m_scan_op);
608 m_impl.m_scan_op = NULL;
609 }
610 m_impl.m_columns.clear();
611
612 DBUG_RETURN(0);
613 }
614
rnd_next(uchar * buf)615 int ha_ndbinfo::rnd_next(uchar *buf)
616 {
617 int err;
618 DBUG_ENTER("ha_ndbinfo::rnd_next");
619
620 if (is_offline())
621 DBUG_RETURN(HA_ERR_END_OF_FILE);
622
623 assert(is_open());
624
625 if (!m_impl.m_scan_op)
626 {
627 /*
628 It should be impossible to come here without a scan operation.
629 But apparently it's not safe to assume that rnd_next() isn't
630 called even though rnd_init() returned an error. Thus double check
631 that the scan operation exists and bail out in case it doesn't.
632 */
633 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
634 }
635
636 if ((err = m_impl.m_scan_op->nextResult()) == 0)
637 DBUG_RETURN(HA_ERR_END_OF_FILE);
638
639 if (err != 1)
640 DBUG_RETURN(err2mysql(err));
641
642 unpack_record(buf);
643
644 DBUG_RETURN(0);
645 }
646
rnd_pos(uchar * buf,uchar * pos)647 int ha_ndbinfo::rnd_pos(uchar *buf, uchar *pos)
648 {
649 DBUG_ENTER("ha_ndbinfo::rnd_pos");
650 assert(is_open());
651 assert(m_impl.m_scan_op == NULL); // No scan started
652
653 /* Copy the saved row into "buf" and set all fields to not null */
654 memcpy(buf, pos, ref_length);
655 for (uint i = 0; i < table->s->fields; i++)
656 table->field[i]->set_notnull();
657
658 DBUG_RETURN(0);
659 }
660
position(const uchar * record)661 void ha_ndbinfo::position(const uchar *record)
662 {
663 DBUG_ENTER("ha_ndbinfo::position");
664 assert(is_open());
665 assert(m_impl.m_scan_op);
666
667 /* Save away the whole row in "ref" */
668 memcpy(ref, record, ref_length);
669
670 DBUG_VOID_RETURN;
671 }
672
info(uint flag)673 int ha_ndbinfo::info(uint flag)
674 {
675 DBUG_ENTER("ha_ndbinfo::info");
676 DBUG_PRINT("enter", ("flag: %d", flag));
677 DBUG_RETURN(0);
678 }
679
680 void
unpack_record(uchar * dst_row)681 ha_ndbinfo::unpack_record(uchar *dst_row)
682 {
683 DBUG_ENTER("ha_ndbinfo::unpack_record");
684 my_ptrdiff_t dst_offset = dst_row - table->record[0];
685
686 for (uint i = 0; i < table->s->fields; i++)
687 {
688 Field *field = table->field[i];
689 const NdbInfoRecAttr* record = m_impl.m_columns[i];
690 if (record && !record->isNULL())
691 {
692 field->set_notnull();
693 field->move_field_offset(dst_offset);
694 switch (field->type()) {
695
696 case (MYSQL_TYPE_VARCHAR):
697 {
698 DBUG_PRINT("info", ("str: %s", record->c_str()));
699 Field_varstring* vfield = (Field_varstring *) field;
700 /* Field_bit in DBUG requires the bit set in write_set for store(). */
701 my_bitmap_map *old_map =
702 dbug_tmp_use_all_columns(table, table->write_set);
703 (void)vfield->store(record->c_str(),
704 MIN(record->length(), field->field_length)-1,
705 field->charset());
706 dbug_tmp_restore_column_map(table->write_set, old_map);
707 break;
708 }
709
710 case (MYSQL_TYPE_LONG):
711 {
712 memcpy(field->ptr, record->ptr(), sizeof(Uint32));
713 break;
714 }
715
716 case (MYSQL_TYPE_LONGLONG):
717 {
718 memcpy(field->ptr, record->ptr(), sizeof(Uint64));
719 break;
720 }
721
722 default:
723 sql_print_error("Found unexpected field type %u", field->type());
724 break;
725 }
726
727 field->move_field_offset(-dst_offset);
728 }
729 else
730 {
731 field->set_null();
732 }
733 }
734 DBUG_VOID_RETURN;
735 }
736
737
738 static int
ndbinfo_find_files(handlerton * hton,THD * thd,const char * db,const char * path,const char * wild,bool dir,List<LEX_STRING> * files)739 ndbinfo_find_files(handlerton *hton, THD *thd,
740 const char *db, const char *path,
741 const char *wild, bool dir, List<LEX_STRING> *files)
742 {
743 DBUG_ENTER("ndbinfo_find_files");
744 DBUG_PRINT("enter", ("db: '%s', dir: %d, path: '%s'", db, dir, path));
745
746 const bool show_hidden = THDVAR(thd, show_hidden);
747
748 if(show_hidden)
749 DBUG_RETURN(0); // Don't filter out anything
750
751 if (dir)
752 {
753 if (!ndbcluster_is_disabled())
754 DBUG_RETURN(0);
755
756 // Hide our database when ndbcluster is disabled
757 LEX_STRING *dir_name;
758 List_iterator<LEX_STRING> it(*files);
759 while ((dir_name=it++))
760 {
761 if (strcmp(dir_name->str, opt_ndbinfo_dbname))
762 continue;
763
764 DBUG_PRINT("info", ("Hiding own databse '%s'", dir_name->str));
765 it.remove();
766 }
767
768 DBUG_RETURN(0);
769 }
770
771 assert(db);
772 if (strcmp(db, opt_ndbinfo_dbname))
773 DBUG_RETURN(0); // Only hide files in "our" db
774
775 /* Hide all files that start with "our" prefix */
776 LEX_STRING *file_name;
777 List_iterator<LEX_STRING> it(*files);
778 while ((file_name=it++))
779 {
780 if (is_prefix(file_name->str, opt_ndbinfo_table_prefix))
781 {
782 DBUG_PRINT("info", ("Hiding '%s'", file_name->str));
783 it.remove();
784 }
785 }
786
787 DBUG_RETURN(0);
788 }
789
790
791 handlerton* ndbinfo_hton;
792
793 static
794 int
ndbinfo_init(void * plugin)795 ndbinfo_init(void *plugin)
796 {
797 DBUG_ENTER("ndbinfo_init");
798
799 handlerton *hton = (handlerton *) plugin;
800 hton->create = create_handler;
801 hton->flags =
802 HTON_TEMPORARY_NOT_SUPPORTED |
803 HTON_ALTER_NOT_SUPPORTED;
804 hton->find_files = ndbinfo_find_files;
805
806 ndbinfo_hton = hton;
807
808 if (ndbcluster_is_disabled())
809 {
810 // Starting in limited mode since ndbcluster is disabled
811 DBUG_RETURN(0);
812 }
813
814 char prefix[FN_REFLEN];
815 build_table_filename(prefix, sizeof(prefix) - 1,
816 opt_ndbinfo_dbname, opt_ndbinfo_table_prefix, "", 0);
817 DBUG_PRINT("info", ("prefix: '%s'", prefix));
818 assert(g_ndb_cluster_connection);
819 g_ndbinfo = new NdbInfo(g_ndb_cluster_connection, prefix,
820 opt_ndbinfo_dbname, opt_ndbinfo_table_prefix);
821 if (!g_ndbinfo)
822 {
823 sql_print_error("Failed to create NdbInfo");
824 DBUG_RETURN(1);
825 }
826
827 if (!g_ndbinfo->init())
828 {
829 sql_print_error("Failed to init NdbInfo");
830
831 delete g_ndbinfo;
832 g_ndbinfo = NULL;
833
834 DBUG_RETURN(1);
835 }
836
837 DBUG_RETURN(0);
838 }
839
840 static
841 int
ndbinfo_deinit(void * plugin)842 ndbinfo_deinit(void *plugin)
843 {
844 DBUG_ENTER("ndbinfo_deinit");
845
846 if (g_ndbinfo)
847 {
848 delete g_ndbinfo;
849 g_ndbinfo = NULL;
850 }
851
852 DBUG_RETURN(0);
853 }
854
855 struct st_mysql_sys_var* ndbinfo_system_variables[]= {
856 MYSQL_SYSVAR(max_rows),
857 MYSQL_SYSVAR(max_bytes),
858 MYSQL_SYSVAR(show_hidden),
859 MYSQL_SYSVAR(database),
860 MYSQL_SYSVAR(table_prefix),
861 MYSQL_SYSVAR(version),
862 MYSQL_SYSVAR(offline),
863
864 NULL
865 };
866
867 struct st_mysql_storage_engine ndbinfo_storage_engine=
868 {
869 MYSQL_HANDLERTON_INTERFACE_VERSION
870 };
871
872 struct st_mysql_plugin ndbinfo_plugin =
873 {
874 MYSQL_STORAGE_ENGINE_PLUGIN,
875 &ndbinfo_storage_engine,
876 "ndbinfo",
877 "Sun Microsystems Inc.",
878 "MySQL Cluster system information storage engine",
879 PLUGIN_LICENSE_GPL,
880 ndbinfo_init, /* plugin init */
881 ndbinfo_deinit, /* plugin deinit */
882 0x0001, /* plugin version */
883 NULL, /* status variables */
884 ndbinfo_system_variables, /* system variables */
885 NULL, /* config options */
886 0
887 };
888
889 template class Vector<const NdbInfoRecAttr*>;
890