1 /*
2 Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "ha_ndbcluster_glue.h"
26 #include "ha_ndbcluster.h"
27 #include "ndb_table_guard.h"
28 #include "mysql/service_thd_alloc.h"
29
30 #define ERR_RETURN(err) \
31 { \
32 const NdbError& tmp= err; \
33 DBUG_RETURN(ndb_to_mysql_error(&tmp)); \
34 }
35
36 // Typedefs for long names
37 typedef NdbDictionary::Dictionary NDBDICT;
38 typedef NdbDictionary::Table NDBTAB;
39 typedef NdbDictionary::Column NDBCOL;
40 typedef NdbDictionary::Index NDBINDEX;
41 typedef NdbDictionary::ForeignKey NDBFK;
42
43 /*
44 Foreign key data where this table is child or parent or both.
45 Like indexes, these are cached under each handler instance.
46 Unlike indexes, no references to global dictionary are kept.
47 */
48
49 struct Ndb_fk_item : Sql_alloc
50 {
51 FOREIGN_KEY_INFO f_key_info;
52 int update_action; // NDBFK::FkAction
53 int delete_action;
54 bool is_child;
55 bool is_parent;
56 };
57
58 struct Ndb_fk_data : Sql_alloc
59 {
60 List<Ndb_fk_item> list;
61 uint cnt_child;
62 uint cnt_parent;
63 };
64
65 // Forward decl
66 static
67 const char *
68 fk_split_name(char dst[], const char * src, bool index= false);
69
70 /*
71 Create all the fks for a table.
72
73 The actual foreign keys are not passed in handler interface
74 so gets them from thd->lex :-(
75 */
76 static
77 const NDBINDEX*
find_matching_index(NDBDICT * dict,const NDBTAB * tab,const NDBCOL * columns[],bool & matches_primary_key)78 find_matching_index(NDBDICT* dict,
79 const NDBTAB * tab,
80 const NDBCOL * columns[],
81 /* OUT */ bool & matches_primary_key)
82 {
83 /**
84 * First check if it matches primary key
85 */
86 {
87 matches_primary_key= FALSE;
88
89 uint cnt_pk= 0, cnt_col= 0;
90 for (unsigned i = 0; columns[i] != 0; i++)
91 {
92 cnt_col++;
93 if (columns[i]->getPrimaryKey())
94 cnt_pk++;
95 }
96
97 // check if all columns was part of full primary key
98 if (cnt_col == (uint)tab->getNoOfPrimaryKeys() &&
99 cnt_col == cnt_pk)
100 {
101 matches_primary_key= TRUE;
102 return 0;
103 }
104 }
105
106 /**
107 * check indexes...
108 * first choice is unique index
109 * second choice is ordered index...with as many columns as possible
110 */
111 const int noinvalidate= 0;
112 uint best_matching_columns= 0;
113 const NDBINDEX* best_matching_index= 0;
114
115 NDBDICT::List index_list;
116 dict->listIndexes(index_list, *tab);
117 for (unsigned i = 0; i < index_list.count; i++)
118 {
119 const char * index_name= index_list.elements[i].name;
120 const NDBINDEX* index= dict->getIndexGlobal(index_name, *tab);
121 if (index->getType() == NDBINDEX::UniqueHashIndex)
122 {
123 uint cnt= 0;
124 for (unsigned j = 0; columns[j] != 0; j++)
125 {
126 /*
127 * Search for matching columns in any order
128 * since order does not matter for unique index
129 */
130 bool found= FALSE;
131 for (unsigned c = 0; c < index->getNoOfColumns(); c++)
132 {
133 if (!strcmp(columns[j]->getName(), index->getColumn(c)->getName()))
134 {
135 found= TRUE;
136 break;
137 }
138 }
139 if (found)
140 cnt++;
141 else
142 break;
143 }
144 if (cnt == index->getNoOfColumns())
145 {
146 /**
147 * Full match...return this index, no need to look further
148 */
149 if (best_matching_index)
150 {
151 // release ref to previous best candidate
152 dict->removeIndexGlobal(* best_matching_index, noinvalidate);
153 }
154 return index; // NOTE: also returns reference
155 }
156
157 /**
158 * Not full match...i.e not usable
159 */
160 dict->removeIndexGlobal(* index, noinvalidate);
161 continue;
162 }
163 else if (index->getType() == NDBINDEX::OrderedIndex)
164 {
165 uint cnt= 0;
166 for (; columns[cnt] != 0; cnt++)
167 {
168 const NDBCOL * ndbcol= index->getColumn(cnt);
169 if (ndbcol == 0)
170 break;
171
172 if (strcmp(columns[cnt]->getName(), ndbcol->getName()) != 0)
173 break;
174 }
175
176 if (cnt > best_matching_columns)
177 {
178 /**
179 * better match...
180 */
181 if (best_matching_index)
182 {
183 dict->removeIndexGlobal(* best_matching_index, noinvalidate);
184 }
185 best_matching_index= index;
186 best_matching_columns= cnt;
187 }
188 else
189 {
190 dict->removeIndexGlobal(* index, noinvalidate);
191 }
192 }
193 else
194 {
195 // what ?? unknown index type
196 assert(false);
197 dict->removeIndexGlobal(* index, noinvalidate);
198 continue;
199 }
200 }
201
202 return best_matching_index; // NOTE: also returns reference
203 }
204
205 static
206 void
setDbName(Ndb * ndb,const char * name)207 setDbName(Ndb* ndb, const char * name)
208 {
209 if (name && strlen(name) != 0)
210 {
211 ndb->setDatabaseName(name);
212 }
213 }
214
215 struct Ndb_db_guard
216 {
Ndb_db_guardNdb_db_guard217 Ndb_db_guard(Ndb* ndb) {
218 this->ndb = ndb;
219 strcpy(save_db, ndb->getDatabaseName());
220 }
221
restoreNdb_db_guard222 void restore() {
223 ndb->setDatabaseName(save_db);
224 }
225
~Ndb_db_guardNdb_db_guard226 ~Ndb_db_guard() {
227 ndb->setDatabaseName(save_db);
228 }
229 private:
230 Ndb* ndb;
231 char save_db[FN_REFLEN + 1];
232 };
233
234 /**
235 * ndbapi want's c-strings (null terminated)
236 * mysql frequently uses LEX-string...(ptr + len)
237 *
238 * also...they have changed between 5.1 and 5.5...
239 * add a small compability-kit
240 */
241 static inline
242 const char *
lex2str(const LEX_STRING & str,char buf[],size_t len)243 lex2str(const LEX_STRING& str, char buf[], size_t len)
244 {
245 my_snprintf(buf, len, "%.*s", (int)str.length, str.str);
246 return buf;
247 }
248
249 static inline
250 const char *
lex2str(const char * str,char buf[],size_t len)251 lex2str(const char * str, char buf[], size_t len)
252 {
253 return str;
254 }
255
256 static inline
257 bool
isnull(const LEX_STRING & str)258 isnull(const LEX_STRING& str)
259 {
260 return str.str == 0 || str.length == 0;
261 }
262
263 static inline
264 bool
isnull(const char * str)265 isnull(const char * str)
266 {
267 return str == 0;
268 }
269
270 // copied from unused table_case_convert() in mysqld.h
271 static void
ndb_fk_casedn(char * name)272 ndb_fk_casedn(char *name)
273 {
274 DBUG_ASSERT(name != 0);
275 uint length = (uint)strlen(name);
276 DBUG_ASSERT(files_charset_info != 0 &&
277 files_charset_info->casedn_multiply == 1);
278 files_charset_info->cset->casedn(files_charset_info,
279 name, length, name, length);
280 }
281
282 static int
ndb_fk_casecmp(const char * name1,const char * name2)283 ndb_fk_casecmp(const char* name1, const char* name2)
284 {
285 if (!lower_case_table_names)
286 {
287 return strcmp(name1, name2);
288 }
289 char tmp1[FN_LEN + 1];
290 char tmp2[FN_LEN + 1];
291 strcpy(tmp1, name1);
292 strcpy(tmp2, name2);
293 ndb_fk_casedn(tmp1);
294 ndb_fk_casedn(tmp2);
295 return strcmp(tmp1, tmp2);
296 }
297
298 extern bool ndb_show_foreign_key_mock_tables(THD* thd);
299
300 class Fk_util
301 {
302 THD* m_thd;
303
304 void
info(const char * fmt,...) const305 info(const char* fmt, ...) const
306 {
307 va_list args;
308 char msg[MYSQL_ERRMSG_SIZE];
309 va_start(args,fmt);
310 my_vsnprintf(msg, sizeof(msg), fmt, args);
311 va_end(args);
312
313 // Push as warning if user has turned on ndb_show_foreign_key_mock_tables
314 if (ndb_show_foreign_key_mock_tables(m_thd))
315 {
316 push_warning(m_thd, Sql_condition::SL_WARNING, ER_YES, msg);
317 }
318
319 // Print info to log
320 sql_print_information("NDB FK: %s", msg);
321 }
322
323
324 void
warn(const char * fmt,...) const325 warn(const char* fmt, ...) const
326 {
327 va_list args;
328 char msg[MYSQL_ERRMSG_SIZE];
329 va_start(args,fmt);
330 my_vsnprintf(msg, sizeof(msg), fmt, args);
331 va_end(args);
332 push_warning(m_thd, Sql_condition::SL_WARNING, ER_CANNOT_ADD_FOREIGN, msg);
333
334 // Print warning to log
335 sql_print_warning("NDB FK: %s", msg);
336 }
337
338
339 void
error(const NdbDictionary::Dictionary * dict,const char * fmt,...) const340 error(const NdbDictionary::Dictionary* dict, const char* fmt, ...) const
341 {
342 va_list args;
343 char msg[MYSQL_ERRMSG_SIZE];
344 va_start(args,fmt);
345 my_vsnprintf(msg, sizeof(msg), fmt, args);
346 va_end(args);
347 push_warning(m_thd, Sql_condition::SL_WARNING,
348 ER_CANNOT_ADD_FOREIGN, msg);
349
350 char ndb_msg[MYSQL_ERRMSG_SIZE] = {0};
351 if (dict)
352 {
353 // Extract message from Ndb
354 const NdbError& error = dict->getNdbError();
355 my_snprintf(ndb_msg, sizeof(ndb_msg),
356 "%d '%s'", error.code, error.message);
357 push_warning_printf(m_thd, Sql_condition::SL_WARNING,
358 ER_CANNOT_ADD_FOREIGN, "Ndb error: %s", ndb_msg);
359 }
360 // Print error to log
361 sql_print_error("NDB FK: %s, Ndb error: %s", msg, ndb_msg);
362 }
363
364
365 void
remove_index_global(NdbDictionary::Dictionary * dict,const NdbDictionary::Index * index) const366 remove_index_global(NdbDictionary::Dictionary* dict, const NdbDictionary::Index* index) const
367 {
368 if (!index)
369 return;
370
371 dict->removeIndexGlobal(*index, 0);
372 }
373
374
375 bool
copy_fk_to_new_parent(NdbDictionary::Dictionary * dict,NdbDictionary::ForeignKey & fk,const char * new_parent_name,const char * column_names[]) const376 copy_fk_to_new_parent(NdbDictionary::Dictionary* dict, NdbDictionary::ForeignKey& fk,
377 const char* new_parent_name, const char* column_names[]) const
378 {
379 DBUG_ENTER("copy_fk_to_new_parent");
380 DBUG_PRINT("info", ("new_parent_name: %s", new_parent_name));
381
382 // Load up the new parent table
383 Ndb_table_guard new_parent_tab(dict, new_parent_name);
384 if (!new_parent_tab.get_table())
385 {
386 error(dict, "Failed to load potentially new parent '%s'", new_parent_name);
387 DBUG_RETURN(false);
388 }
389
390 // Build new parent column list from parent column names
391 const NdbDictionary::Column* columns[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
392 {
393 unsigned num_columns = 0;
394 for (unsigned i = 0; column_names[i] != 0; i++)
395 {
396 DBUG_PRINT("info", ("column: %s", column_names[i]));
397 const NdbDictionary::Column* col =
398 new_parent_tab.get_table()->getColumn(column_names[i]);
399 if (!col)
400 {
401 // Parent table didn't have any column with the given name, can happen
402 warn("Could not resolve '%s' as fk parent for '%s' since it didn't have "
403 "all the referenced columns", new_parent_name, fk.getChildTable());
404 DBUG_RETURN(false);
405 }
406 columns[num_columns++]= col;
407 }
408 columns[num_columns]= 0;
409 }
410
411 NdbDictionary::ForeignKey new_fk(fk);
412
413 // Create name for the new fk by splitting the fk's name and replacing
414 // the <parent id> part in format "<parent_id>/<child_id>/<name>"
415 {
416 char name[FN_REFLEN+1];
417 unsigned parent_id, child_id;
418 if (sscanf(fk.getName(), "%u/%u/%s",
419 &parent_id, &child_id, name) != 3)
420 {
421 warn("Skip, failed to parse name of fk: %s", fk.getName());
422 DBUG_RETURN(false);
423 }
424
425 char fk_name[FN_REFLEN+1];
426 my_snprintf(fk_name, sizeof(fk_name), "%s",
427 name);
428 DBUG_PRINT("info", ("Setting new fk name: %s", fk_name));
429 new_fk.setName(fk_name);
430 }
431
432 // Find matching index
433 bool parent_primary_key= FALSE;
434 const NdbDictionary::Index* parent_index= find_matching_index(dict,
435 new_parent_tab.get_table(),
436 columns,
437 parent_primary_key);
438 DBUG_PRINT("info", ("parent_primary_key: %d", parent_primary_key));
439
440 // Check if either pk or index matched
441 if (!parent_primary_key && parent_index == 0)
442 {
443 warn("Could not resolve '%s' as fk parent for '%s' since no matching index "
444 "could be found", new_parent_name, fk.getChildTable());
445 DBUG_RETURN(false);
446 }
447
448 if (parent_index != 0)
449 {
450 DBUG_PRINT("info", ("Setting parent with index %s", parent_index->getName()));
451 new_fk.setParent(*new_parent_tab.get_table(), parent_index, columns);
452 }
453 else
454 {
455 DBUG_PRINT("info", ("Setting parent without index"));
456 new_fk.setParent(*new_parent_tab.get_table(), 0, columns);
457 }
458
459 // Old fk is dropped by cascading when the mock table is dropped
460
461 // Create new fk referencing the new table
462 DBUG_PRINT("info", ("Create new fk: %s", new_fk.getName()));
463 int flags = 0;
464 if (thd_test_options(m_thd, OPTION_NO_FOREIGN_KEY_CHECKS))
465 {
466 flags |= NdbDictionary::Dictionary::CreateFK_NoVerify;
467 }
468 NdbDictionary::ObjectId objid;
469 if (dict->createForeignKey(new_fk, &objid, flags) != 0)
470 {
471 error(dict, "Failed to create foreign key '%s'", new_fk.getName());
472 remove_index_global(dict, parent_index);
473 DBUG_RETURN(false);
474 }
475
476 remove_index_global(dict, parent_index);
477 DBUG_RETURN(true);
478 }
479
480
481 void
resolve_mock(NdbDictionary::Dictionary * dict,const char * new_parent_name,const char * mock_name) const482 resolve_mock(NdbDictionary::Dictionary* dict,
483 const char* new_parent_name, const char* mock_name) const
484 {
485 DBUG_ENTER("resolve_mock");
486 DBUG_PRINT("enter", ("mock_name '%s'", mock_name));
487 DBUG_ASSERT(is_mock_name(mock_name));
488
489 // Load up the mock table
490 Ndb_table_guard mock_tab(dict, mock_name);
491 if (!mock_tab.get_table())
492 {
493 error(dict, "Failed to load the listed mock table '%s'", mock_name);
494 DBUG_ASSERT(false);
495 DBUG_VOID_RETURN;
496 }
497
498 // List dependent objects of mock table
499 NdbDictionary::Dictionary::List list;
500 if (dict->listDependentObjects(list, *mock_tab.get_table()) != 0)
501 {
502 error(dict, "Failed to list dependent objects for mock table '%s'", mock_name);
503 DBUG_VOID_RETURN;
504 }
505
506 for (unsigned i = 0; i < list.count; i++)
507 {
508 const NdbDictionary::Dictionary::List::Element& element = list.elements[i];
509 if (element.type != NdbDictionary::Object::ForeignKey)
510 continue;
511
512 DBUG_PRINT("info", ("fk: %s", element.name));
513
514 NdbDictionary::ForeignKey fk;
515 if (dict->getForeignKey(fk, element.name) != 0)
516 {
517 error(dict, "Could not find the listed fk '%s'", element.name);
518 continue;
519 }
520
521 // Build column name list for parent
522 const char* col_names[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
523 {
524 unsigned num_columns = 0;
525 for (unsigned j = 0; j < fk.getParentColumnCount(); j++)
526 {
527 const NdbDictionary::Column* col =
528 mock_tab.get_table()->getColumn(fk.getParentColumnNo(j));
529 if (!col)
530 {
531 error(NULL, "Could not find column '%s' in mock table '%s'",
532 fk.getParentColumnNo(j), mock_name);
533 continue;
534 }
535 col_names[num_columns++]= col->getName();
536 }
537 col_names[num_columns]= 0;
538
539 if (num_columns != fk.getParentColumnCount())
540 {
541 error(NULL, "Could not find all columns referenced by fk in mock table '%s'",
542 mock_name);
543 continue;
544 }
545 }
546
547 if (!copy_fk_to_new_parent(dict, fk, new_parent_name, col_names))
548 continue;
549
550 // New fk has been created between child and new parent, drop the mock
551 // table and it's related fk
552 const int drop_flags= NDBDICT::DropTableCascadeConstraints;
553 if (dict->dropTableGlobal(*mock_tab.get_table(), drop_flags) != 0)
554 {
555 error(dict, "Failed to drop mock table '%s'", mock_name);
556 continue;
557 }
558 info("Dropped mock table '%s' - resolved by '%s'", mock_name, new_parent_name);
559 }
560 DBUG_VOID_RETURN;
561 }
562
563
564 bool
create_mock_tables_and_drop(Ndb * ndb,NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table)565 create_mock_tables_and_drop(Ndb* ndb, NdbDictionary::Dictionary* dict,
566 const NdbDictionary::Table* table)
567 {
568 DBUG_ENTER("create_mock_tables_and_drop");
569 DBUG_PRINT("enter", ("table: %s", table->getName()));
570
571 /*
572 List all foreign keys referencing the table to be dropped
573 and recreate those to point at a new mock
574 */
575 NdbDictionary::Dictionary::List list;
576 if (dict->listDependentObjects(list, *table) != 0)
577 {
578 error(dict, "Failed to list dependent objects for table '%s'", table->getName());
579 DBUG_RETURN(false);
580 }
581
582 uint fk_index = 0;
583 for (unsigned i = 0; i < list.count; i++)
584 {
585 const NdbDictionary::Dictionary::List::Element& element = list.elements[i];
586
587 if (element.type != NdbDictionary::Object::ForeignKey)
588 continue;
589
590 DBUG_PRINT("fk", ("name: %s, type: %d", element.name, element.type));
591
592 NdbDictionary::ForeignKey fk;
593 if (dict->getForeignKey(fk, element.name) != 0)
594 {
595 // Could not find the listed fk
596 DBUG_ASSERT(false);
597 continue;
598 }
599
600 // Parent of the found fk should be the table to be dropped
601 DBUG_PRINT("info", ("fk.parent: %s", fk.getParentTable()));
602 char parent_db_and_name[FN_LEN + 1];
603 const char * parent_name = fk_split_name(parent_db_and_name, fk.getParentTable());
604
605 if (strcmp(parent_db_and_name, ndb->getDatabaseName()) != 0 ||
606 strcmp(parent_name, table->getName()) != 0)
607 {
608 DBUG_PRINT("info", ("fk is not parent, skip"));
609 continue;
610 }
611
612 DBUG_PRINT("info", ("fk.child: %s", fk.getChildTable()));
613 char child_db_and_name[FN_LEN + 1];
614 const char * child_name = fk_split_name(child_db_and_name, fk.getChildTable());
615
616 // Open child table
617 Ndb_db_guard db_guard(ndb);
618 setDbName(ndb, child_db_and_name);
619 Ndb_table_guard child_tab(dict, child_name);
620 if (child_tab.get_table() == 0)
621 {
622 error(dict, "Failed to open child table '%s'", child_name);
623 DBUG_RETURN(false);
624 }
625
626 /* Format mock table name */
627 char mock_name[FN_REFLEN];
628 if (!format_name(mock_name, sizeof(mock_name),
629 child_tab.get_table()->getObjectId(),
630 fk_index, parent_name))
631 {
632 error(NULL, "Failed to create mock parent table, too long mock name");
633 DBUG_RETURN(false);
634 }
635
636 // Build both column name and column type list from parent(which will be dropped)
637 const char* col_names[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
638 const NdbDictionary::Column* col_types[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
639 {
640 unsigned num_columns = 0;
641 for (unsigned j = 0; j < fk.getParentColumnCount(); j++)
642 {
643 const NdbDictionary::Column* col =
644 table->getColumn(fk.getParentColumnNo(j));
645 DBUG_PRINT("col", ("[%u] %s", i, col->getName()));
646 if (!col)
647 {
648 error(NULL, "Could not find column '%s' in parent table '%s'",
649 fk.getParentColumnNo(j), table->getName());
650 continue;
651 }
652 col_names[num_columns] = col->getName();
653 col_types[num_columns] = col;
654 num_columns++;
655 }
656 col_names[num_columns]= 0;
657 col_types[num_columns] = 0;
658
659 if (num_columns != fk.getParentColumnCount())
660 {
661 error(NULL, "Could not find all columns referenced by fk in parent table '%s'",
662 table->getName());
663 continue;
664 }
665 }
666 db_guard.restore(); // restore db
667
668 // Create new mock
669 if (!create(dict, mock_name, child_name,
670 col_names, col_types))
671 {
672 error(dict, "Failed to create mock parent table '%s", mock_name);
673 DBUG_ASSERT(false);
674 DBUG_RETURN(false);
675 }
676
677 // Recreate fks to point at new mock
678 if (!copy_fk_to_new_parent(dict, fk, mock_name, col_names))
679 {
680 DBUG_RETURN(false);
681 }
682
683 fk_index++;
684 }
685
686 // Drop the requested table and all foreign keys refering to it
687 // i.e the old fks
688 const int drop_flags= NDBDICT::DropTableCascadeConstraints;
689 if (dict->dropTableGlobal(*table, drop_flags) != 0)
690 {
691 error(dict, "Failed to drop the requested table");
692 DBUG_RETURN(false);
693 }
694
695 DBUG_RETURN(true);
696 }
697
698 public:
Fk_util(THD * thd)699 Fk_util(THD* thd) : m_thd(thd) {}
700
701 static
split_mock_name(const char * name,unsigned * child_id_ptr=NULL,unsigned * child_index_ptr=NULL,const char ** parent_name=NULL)702 bool split_mock_name(const char* name,
703 unsigned* child_id_ptr = NULL,
704 unsigned* child_index_ptr = NULL,
705 const char** parent_name = NULL)
706 {
707 const struct {
708 const char* str;
709 size_t len;
710 } prefix = { STRING_WITH_LEN("NDB$FKM_") };
711
712 if (strncmp(name, prefix.str, prefix.len) != 0)
713 return false;
714
715 char* end;
716 const char* ptr= name + prefix.len + 1;
717
718 // Parse child id
719 long child_id = strtol(ptr, &end, 10);
720 if (ptr == end || child_id < 0 || *end == 0 || *end != '_')
721 return false;
722 ptr = end+1;
723
724 // Parse child index
725 long child_index = strtol(ptr, &end, 10);
726 if (ptr == end || child_id < 0 || *end == 0 || *end != '_')
727 return false;
728 ptr = end+1;
729
730 // Assign and return OK
731 if (child_id_ptr)
732 *child_id_ptr = child_id;
733 if (child_index_ptr)
734 *child_index_ptr = child_index;
735 if (parent_name)
736 *parent_name = ptr;
737 return true;
738 }
739
740 static
is_mock_name(const char * name)741 bool is_mock_name(const char* name)
742 {
743 return split_mock_name(name);
744 }
745
746 static
format_name(char buf[],size_t buf_size,int child_id,uint fk_index,const char * parent_name)747 const char* format_name(char buf[], size_t buf_size, int child_id,
748 uint fk_index, const char* parent_name)
749 {
750 DBUG_ENTER("format_name");
751 DBUG_PRINT("enter", ("child_id: %d, fk_index: %u, parent_name: %s",
752 child_id, fk_index, parent_name));
753 const size_t len = my_snprintf(buf, buf_size, "NDB$FKM_%d_%u_%s",
754 child_id, fk_index, parent_name);
755 DBUG_PRINT("info", ("len: %lu, buf_size: %lu", len, buf_size));
756 if (len >= buf_size - 1)
757 {
758 DBUG_PRINT("info", ("Size of buffer too small"));
759 DBUG_RETURN(NULL);
760 }
761 DBUG_PRINT("exit", ("buf: '%s', len: %lu", buf, len));
762 DBUG_RETURN(buf);
763 }
764
765
766 // Adaptor function for calling create() with List<key_part_spec>
create(NDBDICT * dict,const char * mock_name,const char * child_name,List<Key_part_spec> key_part_list,const NDBCOL * col_types[])767 bool create(NDBDICT *dict, const char* mock_name, const char* child_name,
768 List<Key_part_spec> key_part_list, const NDBCOL * col_types[])
769 {
770 // Convert List<Key_part_spec> into null terminated const char* array
771 const char* col_names[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
772 {
773 unsigned i = 0;
774 Key_part_spec* key = 0;
775 List_iterator<Key_part_spec> it1(key_part_list);
776 while ((key= it1++))
777 {
778 char col_name_buf[FN_REFLEN];
779 const char* col_name = lex2str(key->field_name, col_name_buf, sizeof(col_name_buf));
780 col_names[i++] = strdup(col_name);
781 }
782 col_names[i] = 0;
783 }
784
785 const bool ret = create(dict, mock_name, child_name, col_names, col_types);
786
787 // Free the strings in col_names array
788 for (unsigned i = 0; col_names[i] != 0; i++)
789 {
790 const char* col_name = col_names[i];
791 free(const_cast<char*>(col_name));
792 }
793
794 return ret;
795 }
796
797
create(NDBDICT * dict,const char * mock_name,const char * child_name,const char * col_names[],const NDBCOL * col_types[])798 bool create(NDBDICT *dict, const char* mock_name, const char* child_name,
799 const char* col_names[], const NDBCOL * col_types[])
800 {
801 NDBTAB mock_tab;
802
803 DBUG_ENTER("mock_table::create");
804 DBUG_PRINT("enter", ("mock_name: %s", mock_name));
805 DBUG_ASSERT(is_mock_name(mock_name));
806
807 if (mock_tab.setName(mock_name))
808 {
809 DBUG_RETURN(false);
810 }
811 mock_tab.setLogging(FALSE);
812
813 unsigned i = 0;
814 while (col_names[i])
815 {
816 NDBCOL mock_col;
817
818 const char* col_name = col_names[i];
819 DBUG_PRINT("info", ("name: %s", col_name));
820 if (mock_col.setName(col_name))
821 {
822 DBUG_ASSERT(false);
823 DBUG_RETURN(false);
824 }
825
826 const NDBCOL * col= col_types[i];
827 if (!col)
828 {
829 // Internal error, the two lists should be same size
830 DBUG_ASSERT(col);
831 DBUG_RETURN(false);
832 }
833
834 // Use column spec as requested(normally built from child table)
835 mock_col.setType(col->getType());
836 mock_col.setPrecision(col->getPrecision());
837 mock_col.setScale(col->getScale());
838 mock_col.setLength(col->getLength());
839 mock_col.setCharset(col->getCharset());
840
841 // Make column part of primary key and thus not nullable
842 mock_col.setPrimaryKey(true);
843 mock_col.setNullable(false);
844
845 if (mock_tab.addColumn(mock_col))
846 {
847 DBUG_RETURN(false);
848 }
849 i++;
850 }
851
852 // Create the table in NDB
853 if (dict->createTable(mock_tab) != 0)
854 {
855 // Error is available to caller in dict*
856 DBUG_RETURN(false);
857 }
858 info("Created mock table '%s' referenced by '%s'", mock_name, child_name);
859 DBUG_RETURN(true);
860 }
861
862 bool
build_mock_list(NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table,List<char> & mock_list)863 build_mock_list(NdbDictionary::Dictionary* dict,
864 const NdbDictionary::Table* table, List<char> &mock_list)
865 {
866 DBUG_ENTER("build_mock_list");
867
868 NdbDictionary::Dictionary::List list;
869 if (dict->listDependentObjects(list, *table) != 0)
870 {
871 error(dict, "Failed to list dependent objects for table '%s'", table->getName());
872 DBUG_RETURN(false);
873 }
874
875 for (unsigned i = 0; i < list.count; i++)
876 {
877 const NdbDictionary::Dictionary::List::Element& element = list.elements[i];
878 if (element.type != NdbDictionary::Object::ForeignKey)
879 continue;
880
881 NdbDictionary::ForeignKey fk;
882 if (dict->getForeignKey(fk, element.name) != 0)
883 {
884 // Could not find the listed fk
885 DBUG_ASSERT(false);
886 continue;
887 }
888
889 char parent_db_and_name[FN_LEN + 1];
890 const char * name = fk_split_name(parent_db_and_name,fk.getParentTable());
891
892 if (!Fk_util::is_mock_name(name))
893 continue;
894
895 mock_list.push_back(thd_strdup(m_thd, fk.getParentTable()));
896 }
897 DBUG_RETURN(true);
898 }
899
900
901 void
drop_mock_list(Ndb * ndb,NdbDictionary::Dictionary * dict,List<char> & drop_list)902 drop_mock_list(Ndb* ndb, NdbDictionary::Dictionary* dict, List<char> &drop_list)
903 {
904 const char* full_name;
905 List_iterator_fast<char> it(drop_list);
906 while ((full_name=it++))
907 {
908 DBUG_PRINT("info", ("drop table: '%s'", full_name));
909 char db_name[FN_LEN + 1];
910 const char * table_name = fk_split_name(db_name, full_name);
911 Ndb_db_guard db_guard(ndb);
912 setDbName(ndb, db_name);
913 Ndb_table_guard mocktab_g(dict, table_name);
914 if (!mocktab_g.get_table())
915 {
916 // Could not open the mock table
917 DBUG_PRINT("error", ("Could not open the listed mock table, ignore it"));
918 DBUG_ASSERT(false);
919 continue;
920 }
921
922 if (dict->dropTableGlobal(*mocktab_g.get_table()) != 0)
923 {
924 DBUG_PRINT("error", ("Failed to drop the mock table '%s'",
925 mocktab_g.get_table()->getName()));
926 DBUG_ASSERT(false);
927 continue;
928 }
929 info("Dropped mock table '%s' - referencing table dropped", table_name);
930 }
931 }
932
933
934 bool
drop(Ndb * ndb,NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table)935 drop(Ndb* ndb, NdbDictionary::Dictionary* dict,
936 const NdbDictionary::Table* table)
937 {
938 DBUG_ENTER("drop");
939
940 // Start schema transaction to make this operation atomic
941 if (dict->beginSchemaTrans() != 0)
942 {
943 error(dict, "Failed to start schema transaction");
944 DBUG_RETURN(false);
945 }
946
947 bool result = true;
948 if (!create_mock_tables_and_drop(ndb, dict, table))
949 {
950 // Operation failed, set flag to abort when ending trans
951 result = false;
952 }
953
954 // End schema transaction
955 const Uint32 end_trans_flag = result ? 0 : NdbDictionary::Dictionary::SchemaTransAbort;
956 if (dict->endSchemaTrans(end_trans_flag) != 0)
957 {
958 error(dict, "Failed to end schema transaction");
959 result = false;
960 }
961
962 DBUG_RETURN(result);
963 }
964
count_fks(NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table,uint & count) const965 bool count_fks(NdbDictionary::Dictionary* dict,
966 const NdbDictionary::Table* table, uint& count) const
967 {
968 DBUG_ENTER("count_fks");
969
970 NdbDictionary::Dictionary::List list;
971 if (dict->listDependentObjects(list, *table) != 0)
972 {
973 error(dict, "Failed to list dependent objects for table '%s'", table->getName());
974 DBUG_RETURN(false);
975 }
976 for (unsigned i = 0; i < list.count; i++)
977 {
978 if (list.elements[i].type == NdbDictionary::Object::ForeignKey)
979 count++;
980 }
981 DBUG_PRINT("exit", ("count: %u", count));
982 DBUG_RETURN(true);
983 }
984
985
drop_fk(Ndb * ndb,NdbDictionary::Dictionary * dict,const char * fk_name)986 bool drop_fk(Ndb* ndb, NdbDictionary::Dictionary* dict, const char* fk_name)
987 {
988 DBUG_ENTER("drop_fk");
989
990 NdbDictionary::ForeignKey fk;
991 if (dict->getForeignKey(fk, fk_name) != 0)
992 {
993 error(dict, "Could not find fk '%s'", fk_name);
994 DBUG_ASSERT(false);
995 DBUG_RETURN(false);
996 }
997
998 char parent_db_and_name[FN_LEN + 1];
999 const char * parent_name = fk_split_name(parent_db_and_name,fk.getParentTable());
1000 if (Fk_util::is_mock_name(parent_name))
1001 {
1002 // Fk is referencing a mock table, drop the table
1003 // and the constraint at the same time
1004 Ndb_db_guard db_guard(ndb);
1005 setDbName(ndb, parent_db_and_name);
1006 Ndb_table_guard mocktab_g(dict, parent_name);
1007 if (mocktab_g.get_table())
1008 {
1009 const int drop_flags= NDBDICT::DropTableCascadeConstraints;
1010 if (dict->dropTableGlobal(*mocktab_g.get_table(), drop_flags) != 0)
1011 {
1012 error(dict, "Failed to drop fk mock table '%s'", parent_name);
1013 DBUG_ASSERT(false);
1014 DBUG_RETURN(false);
1015 }
1016 // table and fk dropped
1017 DBUG_RETURN(true);
1018 }
1019 else
1020 {
1021 warn("Could not open the fk mock table '%s', ignoring it...",
1022 parent_name);
1023 DBUG_ASSERT(false);
1024 // fallthrough and try to drop only the fk,
1025 }
1026 }
1027
1028 if (dict->dropForeignKey(fk) != 0)
1029 {
1030 error(dict, "Failed to drop fk '%s'", fk_name);
1031 DBUG_ASSERT(false);
1032 DBUG_RETURN(false);
1033 }
1034 DBUG_RETURN(true);
1035 }
1036
1037
1038 void
resolve_mock_tables(NdbDictionary::Dictionary * dict,const char * new_parent_db,const char * new_parent_name) const1039 resolve_mock_tables(NdbDictionary::Dictionary* dict,
1040 const char* new_parent_db,
1041 const char* new_parent_name) const
1042 {
1043 DBUG_ENTER("resolve_mock_tables");
1044 DBUG_PRINT("enter", ("new_parent_db: %s, new_parent_name: %s",
1045 new_parent_db, new_parent_name));
1046
1047 /*
1048 List all tables in NDB and look for mock tables which could
1049 potentially be resolved to the new table
1050 */
1051 NdbDictionary::Dictionary::List table_list;
1052 if (dict->listObjects(table_list, NdbDictionary::Object::UserTable, true) != 0)
1053 {
1054 DBUG_ASSERT(false);
1055 DBUG_VOID_RETURN;
1056 }
1057
1058 for (unsigned i = 0; i < table_list.count; i++)
1059 {
1060 const NdbDictionary::Dictionary::List::Element& el = table_list.elements[i];
1061
1062 DBUG_ASSERT(el.type == NdbDictionary::Object::UserTable);
1063
1064 // Check if table is in same database as the potential new parent
1065 if (strcmp(new_parent_db, el.database) != 0)
1066 {
1067 DBUG_PRINT("info", ("Skip, '%s.%s' is in different database",
1068 el.database, el.name));
1069 continue;
1070 }
1071
1072 const char* parent_name;
1073 if (!Fk_util::split_mock_name(el.name, NULL, NULL, &parent_name))
1074 continue;
1075
1076 // Check if this mock table should reference the new table
1077 if (strcmp(parent_name, new_parent_name) != 0)
1078 {
1079 DBUG_PRINT("info", ("Skip, parent of this mock table is not the new table"));
1080 continue;
1081 }
1082
1083 resolve_mock(dict, new_parent_name, el.name);
1084 }
1085
1086 DBUG_VOID_RETURN;
1087 }
1088
1089
truncate_allowed(NdbDictionary::Dictionary * dict,const char * db,const NdbDictionary::Table * table,bool & allow) const1090 bool truncate_allowed(NdbDictionary::Dictionary* dict, const char* db,
1091 const NdbDictionary::Table* table, bool& allow) const
1092 {
1093 DBUG_ENTER("truncate_allowed");
1094
1095 NdbDictionary::Dictionary::List list;
1096 if (dict->listDependentObjects(list, *table) != 0)
1097 {
1098 error(dict, "Failed to list dependent objects for table '%s'", table->getName());
1099 DBUG_RETURN(false);
1100 }
1101 allow = true;
1102 for (unsigned i = 0; i < list.count; i++)
1103 {
1104 const NdbDictionary::Dictionary::List::Element& element = list.elements[i];
1105 if (element.type != NdbDictionary::Object::ForeignKey)
1106 continue;
1107
1108 DBUG_PRINT("info", ("fk: %s", element.name));
1109
1110 NdbDictionary::ForeignKey fk;
1111 if (dict->getForeignKey(fk, element.name) != 0)
1112 {
1113 error(dict, "Could not find the listed fk '%s'", element.name);
1114 DBUG_ASSERT(false);
1115 continue;
1116 }
1117
1118 // Refuse if table is parent of fk
1119 char parent_db_and_name[FN_LEN + 1];
1120 const char * parent_name = fk_split_name(parent_db_and_name,
1121 fk.getParentTable());
1122 if (strcmp(db, parent_db_and_name) != 0 ||
1123 strcmp(parent_name, table->getName()) != 0)
1124 {
1125 // Not parent of the fk, skip
1126 continue;
1127 }
1128
1129 allow = false;
1130 break;
1131 }
1132 DBUG_PRINT("exit", ("allow: %u", allow));
1133 DBUG_RETURN(true);
1134 }
1135 };
1136
ndb_fk_util_build_list(THD * thd,NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table,List<char> & mock_list)1137 bool ndb_fk_util_build_list(THD* thd, NdbDictionary::Dictionary* dict,
1138 const NdbDictionary::Table* table, List<char> &mock_list)
1139 {
1140 Fk_util fk_util(thd);
1141 return fk_util.build_mock_list(dict, table, mock_list);
1142 }
1143
1144
ndb_fk_util_drop_list(THD * thd,Ndb * ndb,NdbDictionary::Dictionary * dict,List<char> & drop_list)1145 void ndb_fk_util_drop_list(THD* thd, Ndb* ndb, NdbDictionary::Dictionary* dict, List<char> &drop_list)
1146 {
1147 Fk_util fk_util(thd);
1148 fk_util.drop_mock_list(ndb, dict, drop_list);
1149 }
1150
1151
ndb_fk_util_drop_table(THD * thd,Ndb * ndb,NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table)1152 bool ndb_fk_util_drop_table(THD* thd, Ndb* ndb, NdbDictionary::Dictionary* dict,
1153 const NdbDictionary::Table* table)
1154 {
1155 Fk_util fk_util(thd);
1156 return fk_util.drop(ndb, dict, table);
1157 }
1158
1159
ndb_fk_util_is_mock_name(const char * table_name)1160 bool ndb_fk_util_is_mock_name(const char* table_name)
1161 {
1162 return Fk_util::is_mock_name(table_name);
1163 }
1164
1165
1166 void
ndb_fk_util_resolve_mock_tables(THD * thd,NdbDictionary::Dictionary * dict,const char * new_parent_db,const char * new_parent_name)1167 ndb_fk_util_resolve_mock_tables(THD* thd, NdbDictionary::Dictionary* dict,
1168 const char* new_parent_db,
1169 const char* new_parent_name)
1170 {
1171 Fk_util fk_util(thd);
1172 fk_util.resolve_mock_tables(dict, new_parent_db, new_parent_name);
1173 }
1174
1175
ndb_fk_util_truncate_allowed(THD * thd,NdbDictionary::Dictionary * dict,const char * db,const NdbDictionary::Table * table,bool & allowed)1176 bool ndb_fk_util_truncate_allowed(THD* thd, NdbDictionary::Dictionary* dict,
1177 const char* db,
1178 const NdbDictionary::Table* table,
1179 bool& allowed)
1180 {
1181 Fk_util fk_util(thd);
1182 if (!fk_util.truncate_allowed(dict, db, table, allowed))
1183 return false;
1184 return true;
1185 }
1186
1187
1188 int
create_fks(THD * thd,Ndb * ndb)1189 ha_ndbcluster::create_fks(THD *thd, Ndb *ndb)
1190 {
1191 DBUG_ENTER("ha_ndbcluster::create_fks");
1192
1193 // return real mysql error to avoid total randomness..
1194 const int err_default= HA_ERR_CANNOT_ADD_FOREIGN;
1195 char tmpbuf[FN_REFLEN];
1196
1197 assert(thd->lex != 0);
1198 Key * key= 0;
1199 List_iterator<Key> key_iterator(thd->lex->alter_info.key_list);
1200 while ((key=key_iterator++))
1201 {
1202 if (key->type != KEYTYPE_FOREIGN)
1203 continue;
1204
1205 NDBDICT *dict= ndb->getDictionary();
1206 Foreign_key * fk= reinterpret_cast<Foreign_key*>(key);
1207
1208 /**
1209 * NOTE: we need to fetch also child table...
1210 * cause the one we just created (in m_table) is not properly
1211 * initialize
1212 */
1213 Ndb_table_guard child_tab(dict, m_tabname);
1214 if (child_tab.get_table() == 0)
1215 {
1216 ERR_RETURN(dict->getNdbError());
1217 }
1218
1219 /**
1220 * NOTE 2: we mark the table as invalid
1221 * so that it gets removed from GlobalDictCache if
1222 * the schema transaction later fails...
1223 *
1224 * TODO: This code currently fetches table definition from data-nodes
1225 * once per FK...which could be improved to once if a FK
1226 */
1227 child_tab.invalidate();
1228
1229 /**
1230 * Get table columns columns...
1231 */
1232 const NDBCOL * childcols[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
1233 {
1234 unsigned pos= 0;
1235 const NDBTAB * tab= child_tab.get_table();
1236 Key_part_spec* col= 0;
1237 List_iterator<Key_part_spec> it1(fk->columns);
1238 while ((col= it1++))
1239 {
1240 const NDBCOL * ndbcol= tab->getColumn(lex2str(col->field_name,
1241 tmpbuf, sizeof(tmpbuf)));
1242 if (ndbcol == 0)
1243 {
1244 push_warning_printf(thd, Sql_condition::SL_WARNING,
1245 ER_CANNOT_ADD_FOREIGN,
1246 "Child table %s has no column %s in NDB",
1247 child_tab.get_table()->getName(), tmpbuf);
1248 DBUG_RETURN(err_default);
1249 }
1250 childcols[pos++]= ndbcol;
1251 }
1252 childcols[pos]= 0; // NULL terminate
1253 }
1254
1255 bool child_primary_key= FALSE;
1256 const NDBINDEX* child_index= find_matching_index(dict,
1257 child_tab.get_table(),
1258 childcols,
1259 child_primary_key);
1260
1261 if (!child_primary_key && child_index == 0)
1262 {
1263 push_warning_printf(thd, Sql_condition::SL_WARNING,
1264 ER_CANNOT_ADD_FOREIGN,
1265 "Child table %s foreign key columns match no index in NDB",
1266 child_tab.get_table()->getName());
1267 DBUG_RETURN(err_default);
1268 }
1269
1270 Ndb_db_guard db_guard(ndb); // save db
1271
1272 char parent_db[FN_REFLEN];
1273 char parent_name[FN_REFLEN];
1274 /*
1275 * Looking at Table_ident, testing for db.str first is safer
1276 * for valgrind. Do same with table.str too.
1277 */
1278 if (fk->ref_db.str != 0 && fk->ref_db.length != 0)
1279 {
1280 my_snprintf(parent_db, sizeof(parent_db), "%*s",
1281 (int)fk->ref_db.length,
1282 fk->ref_db.str);
1283 }
1284 else
1285 {
1286 parent_db[0]= 0;
1287 }
1288 if (fk->ref_table.str != 0 && fk->ref_table.length != 0)
1289 {
1290 my_snprintf(parent_name, sizeof(parent_name), "%*s",
1291 (int)fk->ref_table.length,
1292 fk->ref_table.str);
1293 }
1294 else
1295 {
1296 parent_name[0]= 0;
1297 }
1298 if (lower_case_table_names)
1299 {
1300 ndb_fk_casedn(parent_db);
1301 ndb_fk_casedn(parent_name);
1302 }
1303 setDbName(ndb, parent_db);
1304 Ndb_table_guard parent_tab(dict, parent_name);
1305 if (parent_tab.get_table() == 0)
1306 {
1307 if (!thd_test_options(thd, OPTION_NO_FOREIGN_KEY_CHECKS))
1308 {
1309 const NdbError &error= dict->getNdbError();
1310 push_warning_printf(thd, Sql_condition::SL_WARNING,
1311 ER_CANNOT_ADD_FOREIGN,
1312 "Parent table %s not found in NDB: %d: %s",
1313 parent_name,
1314 error.code, error.message);
1315 DBUG_RETURN(err_default);
1316 }
1317
1318 DBUG_PRINT("info", ("No parent and foreign_key_checks=0"));
1319
1320 Fk_util fk_util(thd);
1321
1322 /* Count the number of existing fks on table */
1323 uint existing = 0;
1324 if(!fk_util.count_fks(dict, child_tab.get_table(), existing))
1325 {
1326 DBUG_RETURN(err_default);
1327 }
1328
1329 /* Format mock table name */
1330 char mock_name[FN_REFLEN];
1331 if (!fk_util.format_name(mock_name, sizeof(mock_name),
1332 child_tab.get_table()->getObjectId(),
1333 existing, parent_name))
1334 {
1335 push_warning_printf(thd, Sql_condition::SL_WARNING,
1336 ER_CANNOT_ADD_FOREIGN,
1337 "Failed to create mock parent table, too long mock name");
1338 DBUG_RETURN(err_default);
1339 }
1340 if (!fk_util.create(dict, mock_name, m_tabname,
1341 fk->ref_columns, childcols))
1342 {
1343 const NdbError &error= dict->getNdbError();
1344 push_warning_printf(thd, Sql_condition::SL_WARNING,
1345 ER_CANNOT_ADD_FOREIGN,
1346 "Failed to create mock parent table in NDB: %d: %s",
1347 error.code, error.message);
1348 DBUG_RETURN(err_default);
1349 }
1350
1351 parent_tab.init(mock_name);
1352 parent_tab.invalidate(); // invalidate mock table when releasing
1353 if (parent_tab.get_table() == 0)
1354 {
1355 push_warning_printf(thd, Sql_condition::SL_WARNING,
1356 ER_CANNOT_ADD_FOREIGN,
1357 "INTERNAL ERROR: Could not find created mock table '%s'",
1358 mock_name);
1359 // Internal error, should be able to load the just created mock table
1360 DBUG_ASSERT(parent_tab.get_table());
1361 DBUG_RETURN(err_default);
1362 }
1363 }
1364
1365 const NDBCOL * parentcols[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
1366 {
1367 unsigned pos= 0;
1368 const NDBTAB * tab= parent_tab.get_table();
1369 Key_part_spec* col= 0;
1370 List_iterator<Key_part_spec> it1(fk->ref_columns);
1371 while ((col= it1++))
1372 {
1373 const NDBCOL * ndbcol= tab->getColumn(lex2str(col->field_name,
1374 tmpbuf, sizeof(tmpbuf)));
1375 if (ndbcol == 0)
1376 {
1377 push_warning_printf(thd, Sql_condition::SL_WARNING,
1378 ER_CANNOT_ADD_FOREIGN,
1379 "Parent table %s has no column %s in NDB",
1380 parent_tab.get_table()->getName(), tmpbuf);
1381 DBUG_RETURN(err_default);
1382 }
1383 parentcols[pos++]= ndbcol;
1384 }
1385 parentcols[pos]= 0; // NULL terminate
1386 }
1387
1388 bool parent_primary_key= FALSE;
1389 const NDBINDEX* parent_index= find_matching_index(dict,
1390 parent_tab.get_table(),
1391 parentcols,
1392 parent_primary_key);
1393
1394 db_guard.restore(); // restore db
1395
1396 if (!parent_primary_key && parent_index == 0)
1397 {
1398 my_error(ER_FK_NO_INDEX_PARENT, MYF(0),
1399 fk->name.str ? fk->name.str : "",
1400 parent_tab.get_table()->getName());
1401 DBUG_RETURN(err_default);
1402 }
1403
1404 {
1405 /**
1406 * Check that columns match...this happens to be same
1407 * condition as the one for SPJ...
1408 */
1409 for (unsigned i = 0; parentcols[i] != 0; i++)
1410 {
1411 if (parentcols[i]->isBindable(* childcols[i]) == -1)
1412 {
1413 push_warning_printf(thd, Sql_condition::SL_WARNING,
1414 ER_CANNOT_ADD_FOREIGN,
1415 "Parent column %s.%s is incompatible with child column %s.%s in NDB",
1416 parent_tab.get_table()->getName(),
1417 parentcols[i]->getName(),
1418 child_tab.get_table()->getName(),
1419 childcols[i]->getName());
1420 DBUG_RETURN(err_default);
1421 }
1422 }
1423 }
1424
1425 NdbDictionary::ForeignKey ndbfk;
1426 char fk_name[FN_REFLEN];
1427 if (!isnull(fk->name))
1428 {
1429 my_snprintf(fk_name, sizeof(fk_name), "%s",
1430 lex2str(fk->name, tmpbuf, sizeof(tmpbuf)));
1431 }
1432 else
1433 {
1434 my_snprintf(fk_name, sizeof(fk_name), "FK_%u_%u",
1435 parent_index ?
1436 parent_index->getObjectId() :
1437 parent_tab.get_table()->getObjectId(),
1438 child_index ?
1439 child_index->getObjectId() :
1440 child_tab.get_table()->getObjectId());
1441 }
1442 if (lower_case_table_names)
1443 ndb_fk_casedn(fk_name);
1444 ndbfk.setName(fk_name);
1445 ndbfk.setParent(* parent_tab.get_table(), parent_index, parentcols);
1446 ndbfk.setChild(* child_tab.get_table(), child_index, childcols);
1447
1448 switch((fk_option)fk->delete_opt){
1449 case FK_OPTION_UNDEF:
1450 case FK_OPTION_NO_ACTION:
1451 ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::NoAction);
1452 break;
1453 case FK_OPTION_RESTRICT:
1454 ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::Restrict);
1455 break;
1456 case FK_OPTION_CASCADE:
1457 ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::Cascade);
1458 break;
1459 case FK_OPTION_SET_NULL:
1460 ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::SetNull);
1461 break;
1462 case FK_OPTION_DEFAULT:
1463 ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::SetDefault);
1464 break;
1465 default:
1466 assert(false);
1467 ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::NoAction);
1468 }
1469
1470 switch((fk_option)fk->update_opt){
1471 case FK_OPTION_UNDEF:
1472 case FK_OPTION_NO_ACTION:
1473 ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::NoAction);
1474 break;
1475 case FK_OPTION_RESTRICT:
1476 ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::Restrict);
1477 break;
1478 case FK_OPTION_CASCADE:
1479 ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::Cascade);
1480 break;
1481 case FK_OPTION_SET_NULL:
1482 ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::SetNull);
1483 break;
1484 case FK_OPTION_DEFAULT:
1485 ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::SetDefault);
1486 break;
1487 default:
1488 assert(false);
1489 ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::NoAction);
1490 }
1491
1492 int flags = 0;
1493 if (thd_test_options(thd, OPTION_NO_FOREIGN_KEY_CHECKS))
1494 {
1495 flags |= NdbDictionary::Dictionary::CreateFK_NoVerify;
1496 }
1497 NdbDictionary::ObjectId objid;
1498 int err= dict->createForeignKey(ndbfk, &objid, flags);
1499
1500 if (child_index)
1501 {
1502 dict->removeIndexGlobal(* child_index, 0);
1503 }
1504
1505 if (parent_index)
1506 {
1507 dict->removeIndexGlobal(* parent_index, 0);
1508 }
1509
1510 if (err)
1511 {
1512 ERR_RETURN(dict->getNdbError());
1513 }
1514 }
1515
1516 ndb_fk_util_resolve_mock_tables(thd, ndb->getDictionary(),
1517 m_dbname, m_tabname);
1518
1519 DBUG_RETURN(0);
1520 }
1521
1522 bool
is_fk_defined_on_table_or_index(uint index)1523 ha_ndbcluster::is_fk_defined_on_table_or_index(uint index)
1524 {
1525 /**
1526 * This doesnt seem implemented in Innodb either...
1527 */
1528 return FALSE;
1529 }
1530
1531 uint
referenced_by_foreign_key()1532 ha_ndbcluster::referenced_by_foreign_key()
1533 {
1534 DBUG_ENTER("ha_ndbcluster::referenced_by_foreign_key");
1535
1536 Ndb_fk_data *data= m_fk_data;
1537 if (data == 0)
1538 {
1539 DBUG_ASSERT(false);
1540 DBUG_RETURN(0);
1541 }
1542
1543 DBUG_PRINT("info", ("count FKs total %u child %u parent %u",
1544 data->list.elements, data->cnt_child, data->cnt_parent));
1545 DBUG_RETURN(data->cnt_parent != 0);
1546 }
1547
1548 uint
is_child_or_parent_of_fk()1549 ha_ndbcluster::is_child_or_parent_of_fk()
1550 {
1551 DBUG_ENTER("ha_ndbcluster::is_child_or_parent_of_fk");
1552
1553 Ndb_fk_data *data= m_fk_data;
1554 if (data == 0)
1555 {
1556 DBUG_ASSERT(false);
1557 DBUG_RETURN(0);
1558 }
1559
1560 DBUG_PRINT("info", ("count FKs total %u child %u parent %u",
1561 data->list.elements, data->cnt_child, data->cnt_parent));
1562 DBUG_RETURN(data->list.elements != 0);
1563 }
1564
1565 bool
can_switch_engines()1566 ha_ndbcluster::can_switch_engines()
1567 {
1568 DBUG_ENTER("ha_ndbcluster::can_switch_engines");
1569
1570 if (is_child_or_parent_of_fk())
1571 DBUG_RETURN(0);
1572
1573 DBUG_RETURN(1);
1574 }
1575
1576 static
1577 const char *
fk_split_name(char dst[],const char * src,bool index)1578 fk_split_name(char dst[], const char * src, bool index)
1579 {
1580 DBUG_PRINT("info", ("fk_split_name: %s index=%d", src, index));
1581
1582 /**
1583 * Split a fully qualified (ndb) name into db and name
1584 *
1585 * Store result in dst
1586 */
1587 char * dstptr = dst;
1588 const char * save = src;
1589 while (src[0] != 0 && src[0] != '/')
1590 {
1591 * dstptr = * src;
1592 dstptr++;
1593 src++;
1594 }
1595
1596 if (src[0] == 0)
1597 {
1598 /**
1599 * No '/' found
1600 * set db to ''
1601 * and return pointer to name
1602 *
1603 * This is for compability with create_fk/drop_fk tools...
1604 */
1605 dst[0] = 0;
1606 strcpy(dst + 1, save);
1607 DBUG_PRINT("info", ("fk_split_name: %s,%s", dst, dst + 1));
1608 return dst + 1;
1609 }
1610
1611 assert(src[0] == '/');
1612 src++;
1613 * dstptr = 0;
1614 dstptr++;
1615
1616 // Skip over catalog (not implemented)
1617 while (src[0] != '/')
1618 {
1619 src++;
1620 }
1621
1622 assert(src[0] == '/');
1623 src++;
1624
1625 /**
1626 * Indexes contains an extra /
1627 */
1628 if (index)
1629 {
1630 while (src[0] != '/')
1631 {
1632 src++;
1633 }
1634 assert(src[0] == '/');
1635 src++;
1636 }
1637 strcpy(dstptr, src);
1638 DBUG_PRINT("info", ("fk_split_name: %s,%s", dst, dstptr));
1639 return dstptr;
1640 }
1641
1642 struct Ndb_mem_root_guard {
Ndb_mem_root_guardNdb_mem_root_guard1643 Ndb_mem_root_guard(MEM_ROOT *new_root) {
1644 root_ptr= my_thread_get_THR_MALLOC();
1645 DBUG_ASSERT(root_ptr != 0);
1646 old_root= *root_ptr;
1647 *root_ptr= new_root;
1648 }
~Ndb_mem_root_guardNdb_mem_root_guard1649 ~Ndb_mem_root_guard() {
1650 *root_ptr= old_root;
1651 }
1652 private:
1653 MEM_ROOT **root_ptr;
1654 MEM_ROOT *old_root;
1655 };
1656
1657 int
get_fk_data(THD * thd,Ndb * ndb)1658 ha_ndbcluster::get_fk_data(THD *thd, Ndb *ndb)
1659 {
1660 DBUG_ENTER("ha_ndbcluster::get_fk_data");
1661
1662 MEM_ROOT *mem_root= &m_fk_mem_root;
1663 Ndb_mem_root_guard mem_root_guard(mem_root);
1664
1665 free_root(mem_root, 0);
1666 m_fk_data= 0;
1667 init_alloc_root(PSI_INSTRUMENT_ME, mem_root, fk_root_block_size, 0);
1668
1669 NdbError err_OOM;
1670 err_OOM.code= 4000; // should we check OOM errors at all?
1671 NdbError err_API;
1672 err_API.code= 4011; // API internal should not happen
1673
1674 Ndb_fk_data *data= new (mem_root) Ndb_fk_data;
1675 if (data == 0)
1676 ERR_RETURN(err_OOM);
1677 data->cnt_child= 0;
1678 data->cnt_parent= 0;
1679
1680 DBUG_PRINT("info", ("%s.%s: list dependent objects",
1681 m_dbname, m_tabname));
1682 int res;
1683 NDBDICT *dict= ndb->getDictionary();
1684 NDBDICT::List obj_list;
1685 res= dict->listDependentObjects(obj_list, *m_table);
1686 if (res != 0)
1687 ERR_RETURN(dict->getNdbError());
1688 DBUG_PRINT("info", ("found %u dependent objects", obj_list.count));
1689
1690 for (unsigned i = 0; i < obj_list.count; i++)
1691 {
1692 const NDBDICT::List::Element &e= obj_list.elements[i];
1693 if (obj_list.elements[i].type != NdbDictionary::Object::ForeignKey)
1694 {
1695 DBUG_PRINT("info", ("skip non-FK %s type %d", e.name, e.type));
1696 continue;
1697 }
1698 DBUG_PRINT("info", ("found FK %s", e.name));
1699
1700 NdbDictionary::ForeignKey fk;
1701 res= dict->getForeignKey(fk, e.name);
1702 if (res != 0)
1703 ERR_RETURN(dict->getNdbError());
1704
1705 Ndb_fk_item *item= new (mem_root) Ndb_fk_item;
1706 if (item == 0)
1707 ERR_RETURN(err_OOM);
1708 FOREIGN_KEY_INFO &f_key_info= item->f_key_info;
1709
1710 {
1711 char fk_full_name[FN_LEN + 1];
1712 const char * name = fk_split_name(fk_full_name, fk.getName());
1713 f_key_info.foreign_id = thd_make_lex_string(thd, 0, name,
1714 (uint)strlen(name), 1);
1715 }
1716
1717 {
1718 char child_db_and_name[FN_LEN + 1];
1719 const char * child_name = fk_split_name(child_db_and_name,
1720 fk.getChildTable());
1721
1722 /* Dependent (child) database name */
1723 f_key_info.foreign_db =
1724 thd_make_lex_string(thd, 0, child_db_and_name,
1725 (uint)strlen(child_db_and_name),
1726 1);
1727 /* Dependent (child) table name */
1728 f_key_info.foreign_table =
1729 thd_make_lex_string(thd, 0, child_name,
1730 (uint)strlen(child_name),
1731 1);
1732
1733 Ndb_db_guard db_guard(ndb);
1734 setDbName(ndb, child_db_and_name);
1735 Ndb_table_guard child_tab(dict, child_name);
1736 if (child_tab.get_table() == 0)
1737 {
1738 DBUG_ASSERT(false);
1739 ERR_RETURN(dict->getNdbError());
1740 }
1741
1742 for (unsigned i = 0; i < fk.getChildColumnCount(); i++)
1743 {
1744 const NdbDictionary::Column * col =
1745 child_tab.get_table()->getColumn(fk.getChildColumnNo(i));
1746 if (col == 0)
1747 ERR_RETURN(err_API);
1748 LEX_STRING * name =
1749 thd_make_lex_string(thd, 0, col->getName(),
1750 (uint)strlen(col->getName()), 1);
1751 f_key_info.foreign_fields.push_back(name);
1752 }
1753 }
1754
1755 {
1756 char parent_db_and_name[FN_LEN + 1];
1757 const char * parent_name = fk_split_name(parent_db_and_name,
1758 fk.getParentTable());
1759
1760 /* Referenced (parent) database name */
1761 f_key_info.referenced_db =
1762 thd_make_lex_string(thd, 0, parent_db_and_name,
1763 (uint)strlen(parent_db_and_name),
1764 1);
1765 /* Referenced (parent) table name */
1766 f_key_info.referenced_table =
1767 thd_make_lex_string(thd, 0, parent_name,
1768 (uint)strlen(parent_name),
1769 1);
1770
1771 Ndb_db_guard db_guard(ndb);
1772 setDbName(ndb, parent_db_and_name);
1773 Ndb_table_guard parent_tab(dict, parent_name);
1774 if (parent_tab.get_table() == 0)
1775 {
1776 DBUG_ASSERT(false);
1777 ERR_RETURN(dict->getNdbError());
1778 }
1779
1780 for (unsigned i = 0; i < fk.getParentColumnCount(); i++)
1781 {
1782 const NdbDictionary::Column * col =
1783 parent_tab.get_table()->getColumn(fk.getParentColumnNo(i));
1784 if (col == 0)
1785 ERR_RETURN(err_API);
1786 LEX_STRING * name =
1787 thd_make_lex_string(thd, 0, col->getName(),
1788 (uint)strlen(col->getName()), 1);
1789 f_key_info.referenced_fields.push_back(name);
1790 }
1791
1792 }
1793
1794 {
1795 const char *update_method = "";
1796 switch (item->update_action= fk.getOnUpdateAction()){
1797 case NdbDictionary::ForeignKey::NoAction:
1798 update_method = "NO ACTION";
1799 break;
1800 case NdbDictionary::ForeignKey::Restrict:
1801 update_method = "RESTRICT";
1802 break;
1803 case NdbDictionary::ForeignKey::Cascade:
1804 update_method = "CASCADE";
1805 break;
1806 case NdbDictionary::ForeignKey::SetNull:
1807 update_method = "SET NULL";
1808 break;
1809 case NdbDictionary::ForeignKey::SetDefault:
1810 update_method = "SET DEFAULT";
1811 break;
1812 }
1813 f_key_info.update_method =
1814 thd_make_lex_string(thd, 0, update_method,
1815 (uint)strlen(update_method),
1816 1);
1817 }
1818
1819 {
1820 const char *delete_method = "";
1821 switch (item->delete_action= fk.getOnDeleteAction()){
1822 case NdbDictionary::ForeignKey::NoAction:
1823 delete_method = "NO ACTION";
1824 break;
1825 case NdbDictionary::ForeignKey::Restrict:
1826 delete_method = "RESTRICT";
1827 break;
1828 case NdbDictionary::ForeignKey::Cascade:
1829 delete_method = "CASCADE";
1830 break;
1831 case NdbDictionary::ForeignKey::SetNull:
1832 delete_method = "SET NULL";
1833 break;
1834 case NdbDictionary::ForeignKey::SetDefault:
1835 delete_method = "SET DEFAULT";
1836 break;
1837 }
1838 f_key_info.delete_method =
1839 thd_make_lex_string(thd, 0, delete_method,
1840 (uint)strlen(delete_method),
1841 1);
1842 }
1843
1844 if (fk.getParentIndex() != 0)
1845 {
1846 // sys/def/10/xb1$unique
1847 char db_and_name[FN_LEN + 1];
1848 const char * name=fk_split_name(db_and_name, fk.getParentIndex(), true);
1849 f_key_info.referenced_key_name =
1850 thd_make_lex_string(thd, 0, name,
1851 (uint)strlen(name),
1852 1);
1853 }
1854 else
1855 {
1856 const char* name= "PRIMARY";
1857 f_key_info.referenced_key_name =
1858 thd_make_lex_string(thd, 0, name,
1859 (uint)strlen(name),
1860 1);
1861 }
1862
1863 item->is_child=
1864 strcmp(m_dbname, f_key_info.foreign_db->str) == 0 &&
1865 strcmp(m_tabname, f_key_info.foreign_table->str) == 0;
1866
1867 item->is_parent=
1868 strcmp(m_dbname, f_key_info.referenced_db->str) == 0 &&
1869 strcmp(m_tabname, f_key_info.referenced_table->str) == 0;
1870
1871 data->cnt_child+= item->is_child;
1872 data->cnt_parent+= item->is_parent;
1873
1874 res= data->list.push_back(item);
1875 if (res != 0)
1876 ERR_RETURN(err_OOM);
1877 }
1878
1879 DBUG_PRINT("info", ("count FKs total %u child %u parent %u",
1880 data->list.elements, data->cnt_child, data->cnt_parent));
1881
1882 m_fk_data= data;
1883 DBUG_RETURN(0);
1884 }
1885
1886 void
release_fk_data(THD * thd)1887 ha_ndbcluster::release_fk_data(THD *thd)
1888 {
1889 DBUG_ENTER("ha_ndbcluster::release_fk_data");
1890
1891 Ndb_fk_data *data= m_fk_data;
1892 if (data != 0)
1893 {
1894 DBUG_PRINT("info", ("count FKs total %u child %u parent %u",
1895 data->list.elements, data->cnt_child, data->cnt_parent));
1896 }
1897
1898 MEM_ROOT *mem_root= &m_fk_mem_root;
1899 free_root(mem_root, 0);
1900 m_fk_data= 0;
1901
1902 DBUG_VOID_RETURN;
1903 }
1904
1905 int
get_child_or_parent_fk_list(THD * thd,List<FOREIGN_KEY_INFO> * f_key_list,bool is_child,bool is_parent)1906 ha_ndbcluster::get_child_or_parent_fk_list(THD *thd,
1907 List<FOREIGN_KEY_INFO> * f_key_list,
1908 bool is_child, bool is_parent)
1909 {
1910 DBUG_ENTER("ha_ndbcluster::get_child_or_parent_fk_list");
1911 DBUG_PRINT("info", ("table %s.%s", m_dbname, m_tabname));
1912
1913 Ndb_fk_data *data= m_fk_data;
1914 if (data == 0)
1915 {
1916 DBUG_ASSERT(false);
1917 DBUG_RETURN(0);
1918 }
1919
1920 DBUG_PRINT("info", ("count FKs total %u child %u parent %u",
1921 data->list.elements, data->cnt_child, data->cnt_parent));
1922
1923 Ndb_fk_item *item= 0;
1924 List_iterator<Ndb_fk_item> iter(data->list);
1925 while ((item= iter++))
1926 {
1927 FOREIGN_KEY_INFO &f_key_info= item->f_key_info;
1928 DBUG_PRINT("info", ("FK %s ref %s -> %s is_child %d is_parent %d",
1929 f_key_info.foreign_id->str,
1930 f_key_info.foreign_table->str,
1931 f_key_info.referenced_table->str,
1932 item->is_child, item->is_parent));
1933 if (is_child && !item->is_child)
1934 continue;
1935 if (is_parent && !item->is_parent)
1936 continue;
1937
1938 DBUG_PRINT("info", ("add %s to list", f_key_info.foreign_id->str));
1939 f_key_list->push_back(&f_key_info);
1940 }
1941
1942 DBUG_RETURN(0);
1943 }
1944
1945 int
get_foreign_key_list(THD * thd,List<FOREIGN_KEY_INFO> * f_key_list)1946 ha_ndbcluster::get_foreign_key_list(THD *thd,
1947 List<FOREIGN_KEY_INFO> * f_key_list)
1948 {
1949 DBUG_ENTER("ha_ndbcluster::get_foreign_key_list");
1950 int res= get_child_or_parent_fk_list(thd, f_key_list, true, false);
1951 DBUG_PRINT("info", ("count FKs child %u", f_key_list->elements));
1952 DBUG_RETURN(res);
1953 }
1954
1955 int
get_parent_foreign_key_list(THD * thd,List<FOREIGN_KEY_INFO> * f_key_list)1956 ha_ndbcluster::get_parent_foreign_key_list(THD *thd,
1957 List<FOREIGN_KEY_INFO> * f_key_list)
1958 {
1959 DBUG_ENTER("ha_ndbcluster::get_parent_foreign_key_list");
1960 int res= get_child_or_parent_fk_list(thd, f_key_list, false, true);
1961 DBUG_PRINT("info", ("count FKs parent %u", f_key_list->elements));
1962 DBUG_RETURN(res);
1963 }
1964
1965 static
1966 int
cmp_fk_name(const void * _e0,const void * _e1)1967 cmp_fk_name(const void * _e0, const void * _e1)
1968 {
1969 const NDBDICT::List::Element * e0 = (NDBDICT::List::Element*)_e0;
1970 const NDBDICT::List::Element * e1 = (NDBDICT::List::Element*)_e1;
1971 int res;
1972 if ((res= strcmp(e0->name, e1->name)) != 0)
1973 return res;
1974
1975 if ((res= strcmp(e0->database, e1->database)) != 0)
1976 return res;
1977
1978 if ((res= strcmp(e0->schema, e1->schema)) != 0)
1979 return res;
1980
1981 return e0->id - e1->id;
1982 }
1983
1984 char*
get_foreign_key_create_info()1985 ha_ndbcluster::get_foreign_key_create_info()
1986 {
1987 DBUG_ENTER("ha_ndbcluster::get_foreign_key_create_info");
1988
1989 /**
1990 * List foreigns for this table
1991 */
1992 if (m_table == 0)
1993 {
1994 DBUG_RETURN(0);
1995 }
1996
1997 if (table == 0)
1998 {
1999 DBUG_RETURN(0);
2000 }
2001
2002 THD* thd = table->in_use;
2003 if (thd == 0)
2004 {
2005 DBUG_RETURN(0);
2006 }
2007
2008 Ndb *ndb= get_ndb(thd);
2009 if (ndb == 0)
2010 {
2011 DBUG_RETURN(0);
2012 }
2013
2014 NDBDICT *dict= ndb->getDictionary();
2015 NDBDICT::List obj_list;
2016
2017 dict->listDependentObjects(obj_list, *m_table);
2018 /**
2019 * listDependentObjects will return FK's in order that they
2020 * are stored in hash-table in Dbdict (i.e random)
2021 *
2022 * sort them to make MTR and similar happy
2023 */
2024 my_qsort(obj_list.elements, obj_list.count, sizeof(obj_list.elements[0]),
2025 cmp_fk_name);
2026 String fk_string;
2027 for (unsigned i = 0; i < obj_list.count; i++)
2028 {
2029 if (obj_list.elements[i].type != NdbDictionary::Object::ForeignKey)
2030 continue;
2031
2032 NdbDictionary::ForeignKey fk;
2033 int res= dict->getForeignKey(fk, obj_list.elements[i].name);
2034 if (res != 0)
2035 {
2036 // Push warning??
2037 DBUG_RETURN(0);
2038 }
2039
2040 NdbError err;
2041 const int noinvalidate= 0;
2042 const NDBTAB * childtab= 0;
2043 const NDBTAB * parenttab= 0;
2044
2045 char parent_db_and_name[FN_LEN + 1];
2046 {
2047 const char * name = fk_split_name(parent_db_and_name,fk.getParentTable());
2048 setDbName(ndb, parent_db_and_name);
2049 parenttab= dict->getTableGlobal(name);
2050 if (parenttab == 0)
2051 {
2052 err= dict->getNdbError();
2053 goto errout;
2054 }
2055 }
2056
2057 char child_db_and_name[FN_LEN + 1];
2058 {
2059 const char * name = fk_split_name(child_db_and_name, fk.getChildTable());
2060 setDbName(ndb, child_db_and_name);
2061 childtab= dict->getTableGlobal(name);
2062 if (childtab == 0)
2063 {
2064 err= dict->getNdbError();
2065 goto errout;
2066 }
2067 }
2068
2069 if (! (strcmp(child_db_and_name, m_dbname) == 0 &&
2070 strcmp(childtab->getName(), m_tabname) == 0))
2071 {
2072 /**
2073 * this was on parent table (fk are shown on child table in SQL)
2074 */
2075 assert(strcmp(parent_db_and_name, m_dbname) == 0);
2076 assert(strcmp(parenttab->getName(), m_tabname) == 0);
2077 continue;
2078 }
2079
2080 fk_string.append(",");
2081 fk_string.append("\n ");
2082 fk_string.append(" CONSTRAINT `");
2083 {
2084 char db_and_name[FN_LEN+1];
2085 const char * name = fk_split_name(db_and_name, fk.getName());
2086 fk_string.append(name);
2087 }
2088 fk_string.append("` FOREIGN KEY (");
2089
2090 {
2091 bool first= true;
2092 for (unsigned j = 0; j < fk.getChildColumnCount(); j++)
2093 {
2094 unsigned no = fk.getChildColumnNo(j);
2095 if (!first)
2096 {
2097 fk_string.append(",");
2098 }
2099 fk_string.append("`");
2100 fk_string.append(childtab->getColumn(no)->getName());
2101 fk_string.append("`");
2102 first= false;
2103 }
2104 }
2105
2106 fk_string.append(") REFERENCES `");
2107 if (strcmp(parent_db_and_name, child_db_and_name) != 0)
2108 {
2109 fk_string.append(parent_db_and_name);
2110 fk_string.append("`.`");
2111 }
2112
2113
2114 const char* real_parent_name;
2115 if (ndb_show_foreign_key_mock_tables(thd) == false &&
2116 Fk_util::split_mock_name(parenttab->getName(),
2117 NULL, NULL, &real_parent_name))
2118 {
2119 DBUG_PRINT("info", ("real_parent_name: %s", real_parent_name));
2120 fk_string.append(real_parent_name);
2121 }
2122 else
2123 {
2124 fk_string.append(parenttab->getName());
2125 }
2126
2127 fk_string.append("` (");
2128
2129 {
2130 bool first= true;
2131 for (unsigned j = 0; j < fk.getParentColumnCount(); j++)
2132 {
2133 unsigned no = fk.getParentColumnNo(j);
2134 if (!first)
2135 {
2136 fk_string.append(",");
2137 }
2138 fk_string.append("`");
2139 fk_string.append(parenttab->getColumn(no)->getName());
2140 fk_string.append("`");
2141 first= false;
2142 }
2143 }
2144 fk_string.append(")");
2145
2146 switch(fk.getOnDeleteAction()){
2147 case NdbDictionary::ForeignKey::NoAction:
2148 fk_string.append(" ON DELETE NO ACTION");
2149 break;
2150 case NdbDictionary::ForeignKey::Restrict:
2151 fk_string.append(" ON DELETE RESTRICT");
2152 break;
2153 case NdbDictionary::ForeignKey::Cascade:
2154 fk_string.append(" ON DELETE CASCADE");
2155 break;
2156 case NdbDictionary::ForeignKey::SetNull:
2157 fk_string.append(" ON DELETE SET NULL");
2158 break;
2159 case NdbDictionary::ForeignKey::SetDefault:
2160 fk_string.append(" ON DELETE SET DEFAULT");
2161 break;
2162 }
2163
2164 switch(fk.getOnUpdateAction()){
2165 case NdbDictionary::ForeignKey::NoAction:
2166 fk_string.append(" ON UPDATE NO ACTION");
2167 break;
2168 case NdbDictionary::ForeignKey::Restrict:
2169 fk_string.append(" ON UPDATE RESTRICT");
2170 break;
2171 case NdbDictionary::ForeignKey::Cascade:
2172 fk_string.append(" ON UPDATE CASCADE");
2173 break;
2174 case NdbDictionary::ForeignKey::SetNull:
2175 fk_string.append(" ON UPDATE SET NULL");
2176 break;
2177 case NdbDictionary::ForeignKey::SetDefault:
2178 fk_string.append(" ON UPDATE SET DEFAULT");
2179 break;
2180 }
2181 errout:
2182 if (childtab)
2183 {
2184 dict->removeTableGlobal(* childtab, noinvalidate);
2185 }
2186
2187 if (parenttab)
2188 {
2189 dict->removeTableGlobal(* parenttab, noinvalidate);
2190 }
2191
2192 if (err.code != 0)
2193 {
2194 push_warning_printf(thd, Sql_condition::SL_WARNING,
2195 ER_ILLEGAL_HA_CREATE_OPTION,
2196 "Failed to retreive FK information: %d:%s",
2197 err.code,
2198 err.message);
2199
2200 DBUG_RETURN(0); // How to report error ??
2201 }
2202 }
2203
2204 DBUG_RETURN(strdup(fk_string.c_ptr()));
2205 }
2206
2207 void
free_foreign_key_create_info(char * str)2208 ha_ndbcluster::free_foreign_key_create_info(char* str)
2209 {
2210 if (str != 0)
2211 {
2212 free(str);
2213 }
2214 }
2215
2216 int
copy_fk_for_offline_alter(THD * thd,Ndb * ndb,NDBTAB * _dsttab)2217 ha_ndbcluster::copy_fk_for_offline_alter(THD * thd, Ndb* ndb, NDBTAB* _dsttab)
2218 {
2219 DBUG_ENTER("ha_ndbcluster::copy_fk_for_offline_alter");
2220 if (thd->lex == 0)
2221 {
2222 assert(false);
2223 DBUG_RETURN(0);
2224 }
2225
2226 Ndb_db_guard db_guard(ndb);
2227 const char * src_db = thd->lex->select_lex->table_list.first->db;
2228 const char * src_tab = thd->lex->select_lex->table_list.first->table_name;
2229
2230 if (src_db == 0 || src_tab == 0)
2231 {
2232 assert(false);
2233 DBUG_RETURN(0);
2234 }
2235
2236 assert(thd->lex != 0);
2237 NDBDICT* dict = ndb->getDictionary();
2238 setDbName(ndb, src_db);
2239 Ndb_table_guard srctab(dict, src_tab);
2240 if (srctab.get_table() == 0)
2241 {
2242 /**
2243 * when doign alter table engine=ndb this can happen
2244 */
2245 DBUG_RETURN(0);
2246 }
2247
2248 db_guard.restore();
2249 Ndb_table_guard dsttab(dict, _dsttab->getName());
2250 if (dsttab.get_table() == 0)
2251 {
2252 ERR_RETURN(dict->getNdbError());
2253 }
2254
2255 setDbName(ndb, src_db);
2256 NDBDICT::List obj_list;
2257 if (dict->listDependentObjects(obj_list, *srctab.get_table()) != 0)
2258 {
2259 ERR_RETURN(dict->getNdbError());
2260 }
2261
2262 // check if fk to drop exists
2263 {
2264 Alter_drop * drop_item= 0;
2265 List_iterator<Alter_drop> drop_iterator(thd->lex->alter_info.drop_list);
2266 while ((drop_item=drop_iterator++))
2267 {
2268 if (drop_item->type != Alter_drop::FOREIGN_KEY)
2269 continue;
2270 bool found= false;
2271 for (unsigned i = 0; i < obj_list.count; i++)
2272 {
2273 char db_and_name[FN_LEN + 1];
2274 const char * name= fk_split_name(db_and_name,obj_list.elements[i].name);
2275 if (ndb_fk_casecmp(drop_item->name, name) != 0)
2276 continue;
2277
2278 NdbDictionary::ForeignKey fk;
2279 if (dict->getForeignKey(fk, obj_list.elements[i].name) != 0)
2280 {
2281 ERR_RETURN(dict->getNdbError());
2282 }
2283
2284 char child_db_and_name[FN_LEN + 1];
2285 const char* child_name = fk_split_name(child_db_and_name,
2286 fk.getChildTable());
2287 if (strcmp(child_db_and_name, src_db) == 0 &&
2288 strcmp(child_name, src_tab) == 0)
2289 {
2290 found= true;
2291 break;
2292 }
2293 }
2294 if (!found)
2295 {
2296 // FK not found
2297 my_error(ER_CANT_DROP_FIELD_OR_KEY, MYF(0), drop_item->name);
2298 DBUG_RETURN(ER_CANT_DROP_FIELD_OR_KEY);
2299 }
2300 }
2301 }
2302
2303 for (unsigned i = 0; i < obj_list.count; i++)
2304 {
2305 if (obj_list.elements[i].type == NdbDictionary::Object::ForeignKey)
2306 {
2307 NdbDictionary::ForeignKey fk;
2308 if (dict->getForeignKey(fk, obj_list.elements[i].name) != 0)
2309 {
2310 ERR_RETURN(dict->getNdbError());
2311 }
2312
2313 {
2314 /**
2315 * Check if it should be copied
2316 */
2317 char db_and_name[FN_LEN + 1];
2318 const char * name= fk_split_name(db_and_name,obj_list.elements[i].name);
2319
2320 bool found= false;
2321 Alter_drop * drop_item= 0;
2322 List_iterator<Alter_drop> drop_iterator(thd->lex->alter_info.drop_list);
2323 while ((drop_item=drop_iterator++))
2324 {
2325 if (drop_item->type != Alter_drop::FOREIGN_KEY)
2326 continue;
2327 if (ndb_fk_casecmp(drop_item->name, name) != 0)
2328 continue;
2329
2330 char child_db_and_name[FN_LEN + 1];
2331 const char* child_name = fk_split_name(child_db_and_name,
2332 fk.getChildTable());
2333 if (strcmp(child_db_and_name, src_db) == 0 &&
2334 strcmp(child_name, src_tab) == 0)
2335 {
2336 found= true;
2337 break;
2338 }
2339 }
2340 if (found)
2341 {
2342 /**
2343 * Item is on drop list...
2344 * don't copy it
2345 */
2346 continue;
2347 }
2348 }
2349
2350 unsigned parentObjectId= 0;
2351 unsigned childObjectId= 0;
2352
2353 {
2354 char db_and_name[FN_LEN + 1];
2355 const char * name= fk_split_name(db_and_name, fk.getParentTable());
2356 setDbName(ndb, db_and_name);
2357 Ndb_table_guard org_parent(dict, name);
2358 if (org_parent.get_table() == 0)
2359 {
2360 ERR_RETURN(dict->getNdbError());
2361 }
2362 parentObjectId= org_parent.get_table()->getObjectId();
2363 }
2364
2365 {
2366 char db_and_name[FN_LEN + 1];
2367 const char * name= fk_split_name(db_and_name, fk.getChildTable());
2368 setDbName(ndb, db_and_name);
2369 Ndb_table_guard org_child(dict, name);
2370 if (org_child.get_table() == 0)
2371 {
2372 ERR_RETURN(dict->getNdbError());
2373 }
2374 childObjectId= org_child.get_table()->getObjectId();
2375 }
2376
2377 /**
2378 * flags for CreateForeignKey
2379 */
2380 int flags = 0;
2381
2382 char db_and_name[FN_LEN + 1];
2383 const char * name= fk_split_name(db_and_name, fk.getParentTable());
2384 if (strcmp(name, src_tab) == 0 &&
2385 strcmp(db_and_name, src_db) == 0)
2386 {
2387 /**
2388 * We used to be parent...
2389 */
2390 const NDBCOL * cols[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
2391 for (unsigned j= 0; j < fk.getParentColumnCount(); j++)
2392 {
2393 unsigned no= fk.getParentColumnNo(j);
2394 const NDBCOL * orgcol = srctab.get_table()->getColumn(no);
2395 cols[j]= dsttab.get_table()->getColumn(orgcol->getName());
2396 }
2397 cols[fk.getParentColumnCount()]= 0;
2398 parentObjectId= dsttab.get_table()->getObjectId();
2399 if (fk.getParentIndex() != 0)
2400 {
2401 name = fk_split_name(db_and_name, fk.getParentIndex(), true);
2402 setDbName(ndb, db_and_name);
2403 const NDBINDEX * idx = dict->getIndexGlobal(name,*dsttab.get_table());
2404 if (idx == 0)
2405 {
2406 printf("%u %s - %u/%u get_index(%s)\n",
2407 __LINE__, fk.getName(),
2408 parentObjectId,
2409 childObjectId,
2410 name); fflush(stdout);
2411 ERR_RETURN(dict->getNdbError());
2412 }
2413 fk.setParent(* dsttab.get_table(), idx, cols);
2414 dict->removeIndexGlobal(* idx, 0);
2415 }
2416 else
2417 {
2418 fk.setParent(* dsttab.get_table(), 0, cols);
2419 }
2420
2421
2422 /**
2423 * We're parent, and this is offline alter table
2424 * then we can't verify that FK cause the new parent will
2425 * be populated later during copy data between tables
2426 *
2427 * However, iff FK is consistent when this alter starts,
2428 * it should remain consistent since mysql does not
2429 * allow the alter to modify the columns referenced
2430 */
2431 flags |= NdbDictionary::Dictionary::CreateFK_NoVerify;
2432 }
2433 else
2434 {
2435 name = fk_split_name(db_and_name, fk.getChildTable());
2436 assert(strcmp(name, src_tab) == 0 &&
2437 strcmp(db_and_name, src_db) == 0);
2438 const NDBCOL * cols[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
2439 for (unsigned j= 0; j < fk.getChildColumnCount(); j++)
2440 {
2441 unsigned no= fk.getChildColumnNo(j);
2442 const NDBCOL * orgcol = srctab.get_table()->getColumn(no);
2443 cols[j]= dsttab.get_table()->getColumn(orgcol->getName());
2444 }
2445 cols[fk.getChildColumnCount()]= 0;
2446 childObjectId= dsttab.get_table()->getObjectId();
2447 if (fk.getChildIndex() != 0)
2448 {
2449 name = fk_split_name(db_and_name, fk.getChildIndex(), true);
2450 setDbName(ndb, db_and_name);
2451 const NDBINDEX * idx = dict->getIndexGlobal(name,*dsttab.get_table());
2452 if (idx == 0)
2453 {
2454 printf("%u %s - %u/%u get_index(%s)\n",
2455 __LINE__, fk.getName(),
2456 parentObjectId,
2457 childObjectId,
2458 name); fflush(stdout);
2459 ERR_RETURN(dict->getNdbError());
2460 }
2461 fk.setChild(* dsttab.get_table(), idx, cols);
2462 dict->removeIndexGlobal(* idx, 0);
2463 }
2464 else
2465 {
2466 fk.setChild(* dsttab.get_table(), 0, cols);
2467 }
2468 }
2469
2470 char new_name[FN_LEN + 1];
2471 name= fk_split_name(db_and_name, fk.getName());
2472 my_snprintf(new_name, sizeof(new_name), "%s",
2473 name);
2474 fk.setName(new_name);
2475 setDbName(ndb, db_and_name);
2476
2477 if (thd_test_options(thd, OPTION_NO_FOREIGN_KEY_CHECKS))
2478 {
2479 flags |= NdbDictionary::Dictionary::CreateFK_NoVerify;
2480 }
2481 NdbDictionary::ObjectId objid;
2482 if (dict->createForeignKey(fk, &objid, flags) != 0)
2483 {
2484 ERR_RETURN(dict->getNdbError());
2485 }
2486 }
2487 }
2488 DBUG_RETURN(0);
2489 }
2490
2491 int
drop_fk_for_online_alter(THD * thd,Ndb * ndb,NDBDICT * dict,const NDBTAB * tab)2492 ha_ndbcluster::drop_fk_for_online_alter(THD * thd, Ndb* ndb, NDBDICT * dict,
2493 const NDBTAB* tab)
2494 {
2495 DBUG_ENTER("ha_ndbcluster::drop_fk_for_online_alter");
2496 if (thd->lex == 0)
2497 {
2498 assert(false);
2499 DBUG_RETURN(0);
2500 }
2501
2502 Ndb_table_guard srctab(dict, tab->getName());
2503 if (srctab.get_table() == 0)
2504 {
2505 DBUG_ASSERT(false); // Why ??
2506 DBUG_RETURN(0);
2507 }
2508
2509 NDBDICT::List obj_list;
2510 if (dict->listDependentObjects(obj_list, *srctab.get_table()) != 0)
2511 {
2512 ERR_RETURN(dict->getNdbError());
2513 }
2514
2515 Alter_drop * drop_item= 0;
2516 List_iterator<Alter_drop> drop_iterator(thd->lex->alter_info.drop_list);
2517 while ((drop_item=drop_iterator++))
2518 {
2519 if (drop_item->type != Alter_drop::FOREIGN_KEY)
2520 continue;
2521
2522 bool found= false;
2523 for (unsigned i = 0; i < obj_list.count; i++)
2524 {
2525 if (obj_list.elements[i].type != NdbDictionary::Object::ForeignKey)
2526 {
2527 continue;
2528 }
2529
2530 char db_and_name[FN_LEN + 1];
2531 const char * name= fk_split_name(db_and_name,obj_list.elements[i].name);
2532
2533 if (ndb_fk_casecmp(drop_item->name, name) != 0)
2534 continue;
2535
2536 NdbDictionary::ForeignKey fk;
2537 if (dict->getForeignKey(fk, obj_list.elements[i].name) != 0)
2538 {
2539 ERR_RETURN(dict->getNdbError());
2540 }
2541
2542 char child_db_and_name[FN_LEN + 1];
2543 const char* child_name = fk_split_name(child_db_and_name,
2544 fk.getChildTable());
2545 if (strcmp(child_db_and_name, ndb->getDatabaseName()) == 0 &&
2546 strcmp(child_name, tab->getName()) == 0)
2547 {
2548 found= true;
2549 Fk_util fk_util(thd);
2550 if (!fk_util.drop_fk(ndb, dict, obj_list.elements[i].name))
2551 {
2552 ERR_RETURN(dict->getNdbError());
2553 }
2554 break;
2555 }
2556 }
2557 if (!found)
2558 {
2559 // FK not found
2560 my_error(ER_CANT_DROP_FIELD_OR_KEY, MYF(0), drop_item->name);
2561 DBUG_RETURN(ER_CANT_DROP_FIELD_OR_KEY);
2562 }
2563 }
2564 DBUG_RETURN(0);
2565 }
2566
2567
2568 /**
2569 Save all fk data into a fk_list
2570 - Build list of foreign keys for which the given table is child
2571
2572 @retval
2573 0 ok
2574 @retval
2575 != 0 failure in saving the fk data
2576 */
2577
2578 int
get_fk_data_for_truncate(NDBDICT * dict,const NDBTAB * table,Ndb_fk_list & fk_list)2579 ha_ndbcluster::get_fk_data_for_truncate(NDBDICT* dict, const NDBTAB* table,
2580 Ndb_fk_list& fk_list)
2581 {
2582 DBUG_ENTER("ha_ndbcluster::get_fk_data_for_truncate");
2583
2584 NDBDICT::List obj_list;
2585 if (dict->listDependentObjects(obj_list, *table) != 0)
2586 {
2587 ERR_RETURN(dict->getNdbError());
2588 }
2589 for (unsigned i = 0; i < obj_list.count; i++)
2590 {
2591 DBUG_PRINT("debug", ("DependentObject %d : %s, Type : %d", i,
2592 obj_list.elements[i].name,
2593 obj_list.elements[i].type));
2594 if (obj_list.elements[i].type != NdbDictionary::Object::ForeignKey)
2595 continue;
2596
2597 /* obj is an fk. Fetch it */
2598 NDBFK fk;
2599 if (dict->getForeignKey(fk, obj_list.elements[i].name) != 0)
2600 {
2601 ERR_RETURN(dict->getNdbError());
2602 }
2603 DBUG_PRINT("debug", ("Retrieving FK : %s", fk.getName()));
2604
2605 fk_list.push_back(new NdbDictionary::ForeignKey(fk));
2606 DBUG_PRINT("info", ("Foreign Key added to list : %s", fk.getName()));
2607 }
2608
2609 DBUG_RETURN(0);
2610 }
2611
2612
2613 /**
2614 Restore foreign keys into the child table from fk_list
2615 - for all foreign keys in the given fk list, re-assign child object ids
2616 to reflect the newly created child table/indexes
2617 - create the fk in the child table
2618
2619 @retval
2620 0 ok
2621 @retval
2622 != 0 failure in recreating the fk data
2623 */
2624
2625 int
recreate_fk_for_truncate(THD * thd,Ndb * ndb,const char * tab_name,Ndb_fk_list & fk_list)2626 ha_ndbcluster::recreate_fk_for_truncate(THD* thd, Ndb* ndb, const char* tab_name,
2627 Ndb_fk_list& fk_list)
2628 {
2629 DBUG_ENTER("ha_ndbcluster::create_fk_for_truncate");
2630
2631 int flags = 0;
2632 const int err_default= HA_ERR_CANNOT_ADD_FOREIGN;
2633
2634 NDBDICT* dict = ndb->getDictionary();
2635
2636 /* fetch child table */
2637 Ndb_table_guard child_tab(dict, tab_name);
2638 if (child_tab.get_table() == 0)
2639 {
2640 push_warning_printf(thd, Sql_condition::SL_WARNING,
2641 ER_CANNOT_ADD_FOREIGN,
2642 "INTERNAL ERROR: Could not find created child table '%s'",
2643 tab_name);
2644 // Internal error, should be able to load the just created child table
2645 DBUG_ASSERT(child_tab.get_table());
2646 DBUG_RETURN(err_default);
2647 }
2648
2649 NDBFK* fk;
2650 List_iterator<NDBFK> fk_iterator(fk_list);
2651 while ((fk= fk_iterator++))
2652 {
2653 DBUG_PRINT("info",("Parsing foreign key : %s", fk->getName()));
2654
2655 /* Get child table columns and index */
2656 const NDBCOL * child_cols[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
2657 {
2658 unsigned pos= 0;
2659 const NDBTAB* tab= child_tab.get_table();
2660 for(unsigned i= 0; i < fk->getChildColumnCount(); i++)
2661 {
2662 const NDBCOL * ndbcol= tab->getColumn(fk->getChildColumnNo(i));
2663 if (ndbcol == 0)
2664 {
2665 push_warning_printf(thd, Sql_condition::SL_WARNING,
2666 ER_CANNOT_ADD_FOREIGN,
2667 "Child table %s has no column referred by the FK %s",
2668 tab->getName(), fk->getName());
2669 DBUG_ASSERT(ndbcol);
2670 DBUG_RETURN(err_default);
2671 }
2672 child_cols[pos++]= ndbcol;
2673 }
2674 child_cols[pos]= 0;
2675 }
2676
2677 bool child_primary_key= FALSE;
2678 const NDBINDEX* child_index= find_matching_index(dict,
2679 child_tab.get_table(),
2680 child_cols,
2681 child_primary_key);
2682
2683 if (!child_primary_key && child_index == 0)
2684 {
2685 my_error(ER_FK_NO_INDEX_CHILD, MYF(0), fk->getName(),
2686 child_tab.get_table()->getName());
2687 DBUG_RETURN(err_default);
2688 }
2689
2690 /* update the fk's child references */
2691 fk->setChild(* child_tab.get_table(), child_index, child_cols);
2692
2693 /*
2694 the name of "fk" seems to be different when you read it up
2695 compared to when you create it. (Probably a historical artifact)
2696 So update fk's name
2697 */
2698 {
2699 char name[FN_REFLEN+1];
2700 unsigned parent_id, child_id;
2701 if (sscanf(fk->getName(), "%u/%u/%s",
2702 &parent_id, &child_id, name) != 3)
2703 {
2704 push_warning_printf(thd, Sql_condition::SL_WARNING,
2705 ER_CANNOT_ADD_FOREIGN,
2706 "Skip, failed to parse name of fk: %s",
2707 fk->getName());
2708 DBUG_RETURN(err_default);
2709 }
2710
2711 char fk_name[FN_REFLEN+1];
2712 my_snprintf(fk_name, sizeof(fk_name), "%s",
2713 name);
2714 DBUG_PRINT("info", ("Setting new fk name: %s", fk_name));
2715 fk->setName(fk_name);
2716 }
2717
2718 if (thd_test_options(thd, OPTION_NO_FOREIGN_KEY_CHECKS))
2719 {
2720 flags |= NdbDictionary::Dictionary::CreateFK_NoVerify;
2721 }
2722
2723 NdbDictionary::ObjectId objid;
2724 int err= dict->createForeignKey(*fk, &objid, flags);
2725
2726 if (child_index)
2727 {
2728 dict->removeIndexGlobal(* child_index, 0);
2729 }
2730
2731 if (err)
2732 {
2733 ERR_RETURN(dict->getNdbError());
2734 }
2735 }
2736 DBUG_RETURN(0);
2737 }
2738