1 /*
2    Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "ha_ndbcluster_glue.h"
26 #include "ha_ndbcluster.h"
27 #include "ndb_table_guard.h"
28 #include "mysql/service_thd_alloc.h"
29 
30 #define ERR_RETURN(err)                  \
31 {                                        \
32   const NdbError& tmp= err;              \
33   DBUG_RETURN(ndb_to_mysql_error(&tmp)); \
34 }
35 
36 // Typedefs for long names
37 typedef NdbDictionary::Dictionary NDBDICT;
38 typedef NdbDictionary::Table NDBTAB;
39 typedef NdbDictionary::Column NDBCOL;
40 typedef NdbDictionary::Index NDBINDEX;
41 typedef NdbDictionary::ForeignKey NDBFK;
42 
43 /*
44   Foreign key data where this table is child or parent or both.
45   Like indexes, these are cached under each handler instance.
46   Unlike indexes, no references to global dictionary are kept.
47 */
48 
49 struct Ndb_fk_item : Sql_alloc
50 {
51   FOREIGN_KEY_INFO f_key_info;
52   int update_action;    // NDBFK::FkAction
53   int delete_action;
54   bool is_child;
55   bool is_parent;
56 };
57 
58 struct Ndb_fk_data : Sql_alloc
59 {
60   List<Ndb_fk_item> list;
61   uint cnt_child;
62   uint cnt_parent;
63 };
64 
65 // Forward decl
66 static
67 const char *
68 fk_split_name(char dst[], const char * src, bool index= false);
69 
70 /*
71   Create all the fks  for a table.
72 
73   The actual foreign keys are not passed in handler interface
74   so gets them from thd->lex :-(
75 */
76 static
77 const NDBINDEX*
find_matching_index(NDBDICT * dict,const NDBTAB * tab,const NDBCOL * columns[],bool & matches_primary_key)78 find_matching_index(NDBDICT* dict,
79                     const NDBTAB * tab,
80                     const NDBCOL * columns[],
81                     /* OUT */ bool & matches_primary_key)
82 {
83   /**
84    * First check if it matches primary key
85    */
86   {
87     matches_primary_key= FALSE;
88 
89     uint cnt_pk= 0, cnt_col= 0;
90     for (unsigned i = 0; columns[i] != 0; i++)
91     {
92       cnt_col++;
93       if (columns[i]->getPrimaryKey())
94         cnt_pk++;
95     }
96 
97     // check if all columns was part of full primary key
98     if (cnt_col == (uint)tab->getNoOfPrimaryKeys() &&
99         cnt_col == cnt_pk)
100     {
101       matches_primary_key= TRUE;
102       return 0;
103     }
104   }
105 
106   /**
107    * check indexes...
108    * first choice is unique index
109    * second choice is ordered index...with as many columns as possible
110    */
111   const int noinvalidate= 0;
112   uint best_matching_columns= 0;
113   const NDBINDEX* best_matching_index= 0;
114 
115   NDBDICT::List index_list;
116   dict->listIndexes(index_list, *tab);
117   for (unsigned i = 0; i < index_list.count; i++)
118   {
119     const char * index_name= index_list.elements[i].name;
120     const NDBINDEX* index= dict->getIndexGlobal(index_name, *tab);
121     if (index->getType() == NDBINDEX::UniqueHashIndex)
122     {
123       uint cnt= 0;
124       for (unsigned j = 0; columns[j] != 0; j++)
125       {
126         /*
127          * Search for matching columns in any order
128          * since order does not matter for unique index
129          */
130         bool found= FALSE;
131         for (unsigned c = 0; c < index->getNoOfColumns(); c++)
132         {
133           if (!strcmp(columns[j]->getName(), index->getColumn(c)->getName()))
134           {
135             found= TRUE;
136             break;
137           }
138         }
139         if (found)
140           cnt++;
141         else
142           break;
143       }
144       if (cnt == index->getNoOfColumns())
145       {
146         /**
147          * Full match...return this index, no need to look further
148          */
149         if (best_matching_index)
150         {
151           // release ref to previous best candidate
152           dict->removeIndexGlobal(* best_matching_index, noinvalidate);
153         }
154         return index; // NOTE: also returns reference
155       }
156 
157       /**
158        * Not full match...i.e not usable
159        */
160       dict->removeIndexGlobal(* index, noinvalidate);
161       continue;
162     }
163     else if (index->getType() == NDBINDEX::OrderedIndex)
164     {
165       uint cnt= 0;
166       for (; columns[cnt] != 0; cnt++)
167       {
168         const NDBCOL * ndbcol= index->getColumn(cnt);
169         if (ndbcol == 0)
170           break;
171 
172         if (strcmp(columns[cnt]->getName(), ndbcol->getName()) != 0)
173           break;
174       }
175 
176       if (cnt > best_matching_columns)
177       {
178         /**
179          * better match...
180          */
181         if (best_matching_index)
182         {
183           dict->removeIndexGlobal(* best_matching_index, noinvalidate);
184         }
185         best_matching_index= index;
186         best_matching_columns= cnt;
187       }
188       else
189       {
190         dict->removeIndexGlobal(* index, noinvalidate);
191       }
192     }
193     else
194     {
195       // what ?? unknown index type
196       assert(false);
197       dict->removeIndexGlobal(* index, noinvalidate);
198       continue;
199     }
200   }
201 
202   return best_matching_index; // NOTE: also returns reference
203 }
204 
205 static
206 void
setDbName(Ndb * ndb,const char * name)207 setDbName(Ndb* ndb, const char * name)
208 {
209   if (name && strlen(name) != 0)
210   {
211     ndb->setDatabaseName(name);
212   }
213 }
214 
215 struct Ndb_db_guard
216 {
Ndb_db_guardNdb_db_guard217   Ndb_db_guard(Ndb* ndb) {
218     this->ndb = ndb;
219     strcpy(save_db, ndb->getDatabaseName());
220   }
221 
restoreNdb_db_guard222   void restore() {
223     ndb->setDatabaseName(save_db);
224   }
225 
~Ndb_db_guardNdb_db_guard226   ~Ndb_db_guard() {
227     ndb->setDatabaseName(save_db);
228   }
229 private:
230   Ndb* ndb;
231   char save_db[FN_REFLEN + 1];
232 };
233 
234 /**
235  * ndbapi want's c-strings (null terminated)
236  * mysql frequently uses LEX-string...(ptr + len)
237  *
238  * also...they have changed between 5.1 and 5.5...
239  * add a small compability-kit
240  */
241 static inline
242 const char *
lex2str(const LEX_STRING & str,char buf[],size_t len)243 lex2str(const LEX_STRING& str, char buf[], size_t len)
244 {
245   my_snprintf(buf, len, "%.*s", (int)str.length, str.str);
246   return buf;
247 }
248 
249 static inline
250 const char *
lex2str(const char * str,char buf[],size_t len)251 lex2str(const char * str, char buf[], size_t len)
252 {
253   return str;
254 }
255 
256 static inline
257 bool
isnull(const LEX_STRING & str)258 isnull(const LEX_STRING& str)
259 {
260   return str.str == 0 || str.length == 0;
261 }
262 
263 static inline
264 bool
isnull(const char * str)265 isnull(const char * str)
266 {
267   return str == 0;
268 }
269 
270 // copied from unused table_case_convert() in mysqld.h
271 static void
ndb_fk_casedn(char * name)272 ndb_fk_casedn(char *name)
273 {
274   assert(name != 0);
275   uint length = (uint)strlen(name);
276   assert(files_charset_info != 0 &&
277          files_charset_info->casedn_multiply == 1);
278   files_charset_info->cset->casedn(files_charset_info,
279                                    name, length, name, length);
280 }
281 
282 static int
ndb_fk_casecmp(const char * name1,const char * name2)283 ndb_fk_casecmp(const char* name1, const char* name2)
284 {
285   if (!lower_case_table_names)
286   {
287     return strcmp(name1, name2);
288   }
289   char tmp1[FN_LEN + 1];
290   char tmp2[FN_LEN + 1];
291   strcpy(tmp1, name1);
292   strcpy(tmp2, name2);
293   ndb_fk_casedn(tmp1);
294   ndb_fk_casedn(tmp2);
295   return strcmp(tmp1, tmp2);
296 }
297 
298 extern bool ndb_show_foreign_key_mock_tables(THD* thd);
299 
300 class Fk_util
301 {
302   THD* m_thd;
303 
304   void
info(const char * fmt,...) const305   info(const char* fmt, ...) const
306   {
307     va_list args;
308     char msg[MYSQL_ERRMSG_SIZE];
309     va_start(args,fmt);
310     my_vsnprintf(msg, sizeof(msg), fmt, args);
311     va_end(args);
312 
313     // Push as warning if user has turned on ndb_show_foreign_key_mock_tables
314     if (ndb_show_foreign_key_mock_tables(m_thd))
315     {
316       push_warning(m_thd, Sql_condition::SL_WARNING, ER_YES, msg);
317     }
318 
319     // Print info to log
320     sql_print_information("NDB FK: %s", msg);
321   }
322 
323 
324   void
warn(const char * fmt,...) const325   warn(const char* fmt, ...) const
326   {
327     va_list args;
328     char msg[MYSQL_ERRMSG_SIZE];
329     va_start(args,fmt);
330     my_vsnprintf(msg, sizeof(msg), fmt, args);
331     va_end(args);
332     push_warning(m_thd, Sql_condition::SL_WARNING, ER_CANNOT_ADD_FOREIGN, msg);
333 
334     // Print warning to log
335     sql_print_warning("NDB FK: %s", msg);
336   }
337 
338 
339   void
error(const NdbDictionary::Dictionary * dict,const char * fmt,...) const340   error(const NdbDictionary::Dictionary* dict, const char* fmt, ...) const
341   {
342     va_list args;
343     char msg[MYSQL_ERRMSG_SIZE];
344     va_start(args,fmt);
345     my_vsnprintf(msg, sizeof(msg), fmt, args);
346     va_end(args);
347     push_warning(m_thd, Sql_condition::SL_WARNING,
348                  ER_CANNOT_ADD_FOREIGN, msg);
349 
350     char ndb_msg[MYSQL_ERRMSG_SIZE] = {0};
351     if (dict)
352     {
353       // Extract message from Ndb
354       const NdbError& error = dict->getNdbError();
355       my_snprintf(ndb_msg, sizeof(ndb_msg),
356                   "%d '%s'", error.code, error.message);
357       push_warning_printf(m_thd, Sql_condition::SL_WARNING,
358                           ER_CANNOT_ADD_FOREIGN, "Ndb error: %s", ndb_msg);
359     }
360     // Print error to log
361     sql_print_error("NDB FK: %s, Ndb error: %s", msg, ndb_msg);
362   }
363 
364 
365   void
remove_index_global(NdbDictionary::Dictionary * dict,const NdbDictionary::Index * index) const366   remove_index_global(NdbDictionary::Dictionary* dict, const NdbDictionary::Index* index) const
367   {
368     if (!index)
369       return;
370 
371     dict->removeIndexGlobal(*index, 0);
372   }
373 
374 
375   bool
copy_fk_to_new_parent(NdbDictionary::Dictionary * dict,NdbDictionary::ForeignKey & fk,const char * new_parent_name,const char * column_names[]) const376   copy_fk_to_new_parent(NdbDictionary::Dictionary* dict, NdbDictionary::ForeignKey& fk,
377                    const char* new_parent_name, const char* column_names[]) const
378   {
379     DBUG_ENTER("copy_fk_to_new_parent");
380     DBUG_PRINT("info", ("new_parent_name: %s", new_parent_name));
381 
382     // Load up the new parent table
383     Ndb_table_guard new_parent_tab(dict, new_parent_name);
384     if (!new_parent_tab.get_table())
385     {
386       error(dict, "Failed to load potentially new parent '%s'", new_parent_name);
387       DBUG_RETURN(false);
388     }
389 
390     // Build new parent column list from parent column names
391     const NdbDictionary::Column* columns[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
392     {
393       unsigned num_columns = 0;
394       for (unsigned i = 0; column_names[i] != 0; i++)
395       {
396         DBUG_PRINT("info", ("column: %s", column_names[i]));
397         const NdbDictionary::Column* col =
398             new_parent_tab.get_table()->getColumn(column_names[i]);
399         if (!col)
400         {
401           // Parent table didn't have any column with the given name, can happen
402           warn("Could not resolve '%s' as fk parent for '%s' since it didn't have "
403                "all the referenced columns", new_parent_name, fk.getChildTable());
404           DBUG_RETURN(false);
405         }
406         columns[num_columns++]= col;
407       }
408       columns[num_columns]= 0;
409     }
410 
411     NdbDictionary::ForeignKey new_fk(fk);
412 
413     // Create name for the new fk by splitting the fk's name and replacing
414     // the <parent id> part in format "<parent_id>/<child_id>/<name>"
415     {
416       char name[FN_REFLEN+1];
417       unsigned parent_id, child_id;
418       if (sscanf(fk.getName(), "%u/%u/%s",
419              &parent_id, &child_id, name) != 3)
420       {
421         warn("Skip, failed to parse name of fk: %s", fk.getName());
422         DBUG_RETURN(false);
423       }
424 
425       char fk_name[FN_REFLEN+1];
426       my_snprintf(fk_name, sizeof(fk_name), "%s",
427                   name);
428       DBUG_PRINT("info", ("Setting new fk name: %s", fk_name));
429       new_fk.setName(fk_name);
430     }
431 
432     // Find matching index
433     bool parent_primary_key= FALSE;
434     const NdbDictionary::Index* parent_index= find_matching_index(dict,
435                                                                   new_parent_tab.get_table(),
436                                                                   columns,
437                                                                   parent_primary_key);
438     DBUG_PRINT("info", ("parent_primary_key: %d", parent_primary_key));
439 
440     // Check if either pk or index matched
441     if (!parent_primary_key && parent_index == 0)
442     {
443       warn("Could not resolve '%s' as fk parent for '%s' since no matching index "
444            "could be found", new_parent_name, fk.getChildTable());
445       DBUG_RETURN(false);
446     }
447 
448     if (parent_index != 0)
449     {
450       DBUG_PRINT("info", ("Setting parent with index %s", parent_index->getName()));
451       new_fk.setParent(*new_parent_tab.get_table(), parent_index, columns);
452     }
453     else
454     {
455       DBUG_PRINT("info", ("Setting parent without index"));
456       new_fk.setParent(*new_parent_tab.get_table(), 0, columns);
457     }
458 
459     // Old fk is dropped by cascading when the mock table is dropped
460 
461     // Create new fk referencing the new table
462     DBUG_PRINT("info", ("Create new fk: %s", new_fk.getName()));
463     int flags = 0;
464     if (thd_test_options(m_thd, OPTION_NO_FOREIGN_KEY_CHECKS))
465     {
466       flags |= NdbDictionary::Dictionary::CreateFK_NoVerify;
467     }
468     NdbDictionary::ObjectId objid;
469     if (dict->createForeignKey(new_fk, &objid, flags) != 0)
470     {
471       error(dict, "Failed to create foreign key '%s'", new_fk.getName());
472       remove_index_global(dict, parent_index);
473       DBUG_RETURN(false);
474     }
475 
476     remove_index_global(dict, parent_index);
477     DBUG_RETURN(true);
478   }
479 
480 
481   void
resolve_mock(NdbDictionary::Dictionary * dict,const char * new_parent_name,const char * mock_name) const482   resolve_mock(NdbDictionary::Dictionary* dict,
483                const char* new_parent_name, const char* mock_name) const
484   {
485     DBUG_ENTER("resolve_mock");
486     DBUG_PRINT("enter", ("mock_name '%s'", mock_name));
487     assert(is_mock_name(mock_name));
488 
489     // Load up the mock table
490     Ndb_table_guard mock_tab(dict, mock_name);
491     if (!mock_tab.get_table())
492     {
493       error(dict, "Failed to load the listed mock table '%s'", mock_name);
494       assert(false);
495       DBUG_VOID_RETURN;
496     }
497 
498     // List dependent objects of mock table
499     NdbDictionary::Dictionary::List list;
500     if (dict->listDependentObjects(list, *mock_tab.get_table()) != 0)
501     {
502       error(dict, "Failed to list dependent objects for mock table '%s'", mock_name);
503       DBUG_VOID_RETURN;
504     }
505 
506     for (unsigned i = 0; i < list.count; i++)
507     {
508       const NdbDictionary::Dictionary::List::Element& element = list.elements[i];
509       if (element.type != NdbDictionary::Object::ForeignKey)
510         continue;
511 
512       DBUG_PRINT("info", ("fk: %s", element.name));
513 
514       NdbDictionary::ForeignKey fk;
515       if (dict->getForeignKey(fk, element.name) != 0)
516       {
517         error(dict, "Could not find the listed fk '%s'", element.name);
518         continue;
519       }
520 
521       // Build column name list for parent
522       const char* col_names[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
523       {
524         unsigned num_columns = 0;
525         for (unsigned j = 0; j < fk.getParentColumnCount(); j++)
526         {
527           const NdbDictionary::Column* col =
528               mock_tab.get_table()->getColumn(fk.getParentColumnNo(j));
529           if (!col)
530           {
531             error(NULL, "Could not find column '%s' in mock table '%s'",
532                   fk.getParentColumnNo(j), mock_name);
533             continue;
534           }
535           col_names[num_columns++]= col->getName();
536         }
537         col_names[num_columns]= 0;
538 
539         if (num_columns != fk.getParentColumnCount())
540         {
541           error(NULL, "Could not find all columns referenced by fk in mock table '%s'",
542                 mock_name);
543           continue;
544         }
545       }
546 
547       if (!copy_fk_to_new_parent(dict, fk, new_parent_name, col_names))
548         continue;
549 
550       // New fk has been created between child and new parent, drop the mock
551       // table and it's related fk
552       const int drop_flags= NDBDICT::DropTableCascadeConstraints;
553       if (dict->dropTableGlobal(*mock_tab.get_table(), drop_flags) != 0)
554       {
555         error(dict, "Failed to drop mock table '%s'", mock_name);
556         continue;
557       }
558       info("Dropped mock table '%s' - resolved by '%s'", mock_name, new_parent_name);
559     }
560     DBUG_VOID_RETURN;
561   }
562 
563 
564   bool
create_mock_tables_and_drop(Ndb * ndb,NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table)565   create_mock_tables_and_drop(Ndb* ndb, NdbDictionary::Dictionary* dict,
566                               const NdbDictionary::Table* table)
567   {
568     DBUG_ENTER("create_mock_tables_and_drop");
569     DBUG_PRINT("enter", ("table: %s", table->getName()));
570 
571     /*
572       List all foreign keys referencing the table to be dropped
573       and recreate those to point at a new mock
574     */
575     NdbDictionary::Dictionary::List list;
576     if (dict->listDependentObjects(list, *table) != 0)
577     {
578       error(dict, "Failed to list dependent objects for table '%s'", table->getName());
579       DBUG_RETURN(false);
580     }
581 
582     uint fk_index = 0;
583     for (unsigned i = 0; i < list.count; i++)
584     {
585       const NdbDictionary::Dictionary::List::Element& element = list.elements[i];
586 
587       if (element.type != NdbDictionary::Object::ForeignKey)
588         continue;
589 
590       DBUG_PRINT("fk", ("name: %s, type: %d", element.name, element.type));
591 
592       NdbDictionary::ForeignKey fk;
593       if (dict->getForeignKey(fk, element.name) != 0)
594       {
595         // Could not find the listed fk
596         assert(false);
597         continue;
598       }
599 
600       // Parent of the found fk should be the table to be dropped
601       DBUG_PRINT("info", ("fk.parent: %s", fk.getParentTable()));
602       char parent_db_and_name[FN_LEN + 1];
603       const char * parent_name = fk_split_name(parent_db_and_name, fk.getParentTable());
604 
605       if (strcmp(parent_db_and_name, ndb->getDatabaseName()) != 0 ||
606           strcmp(parent_name, table->getName()) != 0)
607       {
608         DBUG_PRINT("info", ("fk is not parent, skip"));
609         continue;
610       }
611 
612       DBUG_PRINT("info", ("fk.child: %s", fk.getChildTable()));
613       char child_db_and_name[FN_LEN + 1];
614       const char * child_name = fk_split_name(child_db_and_name, fk.getChildTable());
615 
616       // Open child table
617       Ndb_db_guard db_guard(ndb);
618       setDbName(ndb, child_db_and_name);
619       Ndb_table_guard child_tab(dict, child_name);
620       if (child_tab.get_table() == 0)
621       {
622         error(dict, "Failed to open child table '%s'", child_name);
623         DBUG_RETURN(false);
624       }
625 
626       /* Format mock table name */
627       char mock_name[FN_REFLEN];
628       if (!format_name(mock_name, sizeof(mock_name),
629                        child_tab.get_table()->getObjectId(),
630                        fk_index, parent_name))
631       {
632         error(NULL, "Failed to create mock parent table, too long mock name");
633         DBUG_RETURN(false);
634       }
635 
636       // Build both column name and column type list from parent(which will be dropped)
637       const char* col_names[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
638       const NdbDictionary::Column* col_types[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
639       {
640         unsigned num_columns = 0;
641         for (unsigned j = 0; j < fk.getParentColumnCount(); j++)
642         {
643           const NdbDictionary::Column* col =
644               table->getColumn(fk.getParentColumnNo(j));
645           DBUG_PRINT("col", ("[%u] %s", i, col->getName()));
646           if (!col)
647           {
648             error(NULL, "Could not find column '%s' in parent table '%s'",
649                   fk.getParentColumnNo(j), table->getName());
650             continue;
651           }
652           col_names[num_columns] = col->getName();
653           col_types[num_columns] = col;
654           num_columns++;
655         }
656         col_names[num_columns]= 0;
657         col_types[num_columns] = 0;
658 
659         if (num_columns != fk.getParentColumnCount())
660         {
661           error(NULL, "Could not find all columns referenced by fk in parent table '%s'",
662                 table->getName());
663           continue;
664         }
665       }
666       db_guard.restore(); // restore db
667 
668       // Create new mock
669       if (!create(dict, mock_name, child_name,
670                   col_names, col_types))
671       {
672         error(dict, "Failed to create mock parent table '%s", mock_name);
673         assert(false);
674         DBUG_RETURN(false);
675       }
676 
677       // Recreate fks to point at new mock
678       if (!copy_fk_to_new_parent(dict, fk, mock_name, col_names))
679       {
680         DBUG_RETURN(false);
681       }
682 
683       fk_index++;
684     }
685 
686     // Drop the requested table and all foreign keys refering to it
687     // i.e the old fks
688     const int drop_flags= NDBDICT::DropTableCascadeConstraints;
689     if (dict->dropTableGlobal(*table, drop_flags) != 0)
690     {
691       error(dict, "Failed to drop the requested table");
692       DBUG_RETURN(false);
693     }
694 
695     DBUG_RETURN(true);
696   }
697 
698 public:
Fk_util(THD * thd)699   Fk_util(THD* thd) : m_thd(thd) {}
700 
701   static
split_mock_name(const char * name,unsigned * child_id_ptr=NULL,unsigned * child_index_ptr=NULL,const char ** parent_name=NULL)702   bool split_mock_name(const char* name,
703                        unsigned* child_id_ptr = NULL,
704                        unsigned* child_index_ptr = NULL,
705                        const char** parent_name = NULL)
706   {
707     const struct {
708       const char* str;
709       size_t len;
710     } prefix = { STRING_WITH_LEN("NDB$FKM_") };
711 
712     if (strncmp(name, prefix.str, prefix.len) != 0)
713       return false;
714 
715     char* end;
716     const char* ptr= name + prefix.len + 1;
717 
718     // Parse child id
719     long child_id = strtol(ptr, &end, 10);
720     if (ptr == end || child_id < 0 || *end == 0 || *end != '_')
721       return false;
722     ptr = end+1;
723 
724     // Parse child index
725     long child_index = strtol(ptr, &end, 10);
726     if (ptr == end || child_id < 0 || *end == 0 || *end != '_')
727       return false;
728     ptr = end+1;
729 
730     // Assign and return OK
731     if (child_id_ptr)
732       *child_id_ptr = child_id;
733     if (child_index_ptr)
734       *child_index_ptr = child_index;
735     if (parent_name)
736       *parent_name = ptr;
737     return true;
738   }
739 
740   static
is_mock_name(const char * name)741   bool is_mock_name(const char* name)
742   {
743     return split_mock_name(name);
744   }
745 
746   static
format_name(char buf[],size_t buf_size,int child_id,uint fk_index,const char * parent_name)747   const char* format_name(char buf[], size_t buf_size, int child_id,
748                           uint fk_index, const char* parent_name)
749   {
750     DBUG_ENTER("format_name");
751     DBUG_PRINT("enter", ("child_id: %d, fk_index: %u, parent_name: %s",
752                          child_id, fk_index, parent_name));
753     const size_t len = my_snprintf(buf, buf_size, "NDB$FKM_%d_%u_%s",
754                                    child_id, fk_index, parent_name);
755     DBUG_PRINT("info", ("len: %lu, buf_size: %lu", len, buf_size));
756     if (len >= buf_size - 1)
757     {
758       DBUG_PRINT("info", ("Size of buffer too small"));
759       DBUG_RETURN(NULL);
760     }
761     DBUG_PRINT("exit", ("buf: '%s', len: %lu", buf, len));
762     DBUG_RETURN(buf);
763   }
764 
765 
766   // Adaptor function for calling create() with List<key_part_spec>
create(NDBDICT * dict,const char * mock_name,const char * child_name,List<Key_part_spec> key_part_list,const NDBCOL * col_types[])767   bool create(NDBDICT *dict, const char* mock_name, const char* child_name,
768               List<Key_part_spec> key_part_list, const NDBCOL * col_types[])
769   {
770     // Convert List<Key_part_spec> into null terminated const char* array
771     const char* col_names[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
772     {
773       unsigned i = 0;
774       Key_part_spec* key = 0;
775       List_iterator<Key_part_spec> it1(key_part_list);
776       while ((key= it1++))
777       {
778         char col_name_buf[FN_REFLEN];
779         const char* col_name = lex2str(key->field_name, col_name_buf, sizeof(col_name_buf));
780         col_names[i++] = strdup(col_name);
781       }
782       col_names[i] = 0;
783     }
784 
785     const bool ret = create(dict, mock_name, child_name, col_names, col_types);
786 
787     // Free the strings in col_names array
788     for (unsigned i = 0; col_names[i] != 0; i++)
789     {
790       const char* col_name = col_names[i];
791       free(const_cast<char*>(col_name));
792     }
793 
794     return ret;
795   }
796 
797 
create(NDBDICT * dict,const char * mock_name,const char * child_name,const char * col_names[],const NDBCOL * col_types[])798   bool create(NDBDICT *dict, const char* mock_name, const char* child_name,
799               const char* col_names[], const NDBCOL * col_types[])
800   {
801     NDBTAB mock_tab;
802 
803     DBUG_ENTER("mock_table::create");
804     DBUG_PRINT("enter", ("mock_name: %s", mock_name));
805     assert(is_mock_name(mock_name));
806 
807     if (mock_tab.setName(mock_name))
808     {
809       DBUG_RETURN(false);
810     }
811     mock_tab.setLogging(FALSE);
812 
813     unsigned i = 0;
814     while (col_names[i])
815     {
816       NDBCOL mock_col;
817 
818       const char* col_name = col_names[i];
819       DBUG_PRINT("info", ("name: %s", col_name));
820       if (mock_col.setName(col_name))
821       {
822         assert(false);
823         DBUG_RETURN(false);
824       }
825 
826       const NDBCOL * col= col_types[i];
827       if (!col)
828       {
829         // Internal error, the two lists should be same size
830         assert(col);
831         DBUG_RETURN(false);
832       }
833 
834       // Use column spec as requested(normally built from child table)
835       mock_col.setType(col->getType());
836       mock_col.setPrecision(col->getPrecision());
837       mock_col.setScale(col->getScale());
838       mock_col.setLength(col->getLength());
839       mock_col.setCharset(col->getCharset());
840 
841       // Make column part of primary key and thus not nullable
842       mock_col.setPrimaryKey(true);
843       mock_col.setNullable(false);
844 
845       if (mock_tab.addColumn(mock_col))
846       {
847         DBUG_RETURN(false);
848       }
849       i++;
850     }
851 
852     // Create the table in NDB
853     if (dict->createTable(mock_tab) != 0)
854     {
855       // Error is available to caller in dict*
856       DBUG_RETURN(false);
857     }
858     info("Created mock table '%s' referenced by '%s'", mock_name, child_name);
859     DBUG_RETURN(true);
860   }
861 
862   bool
build_mock_list(NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table,List<char> & mock_list)863   build_mock_list(NdbDictionary::Dictionary* dict,
864                   const NdbDictionary::Table* table, List<char> &mock_list)
865   {
866     DBUG_ENTER("build_mock_list");
867 
868     NdbDictionary::Dictionary::List list;
869     if (dict->listDependentObjects(list, *table) != 0)
870     {
871       error(dict, "Failed to list dependent objects for table '%s'", table->getName());
872       DBUG_RETURN(false);
873     }
874 
875     for (unsigned i = 0; i < list.count; i++)
876     {
877       const NdbDictionary::Dictionary::List::Element& element = list.elements[i];
878       if (element.type != NdbDictionary::Object::ForeignKey)
879         continue;
880 
881       NdbDictionary::ForeignKey fk;
882       if (dict->getForeignKey(fk, element.name) != 0)
883       {
884         // Could not find the listed fk
885         assert(false);
886         continue;
887       }
888 
889       char parent_db_and_name[FN_LEN + 1];
890       const char * name = fk_split_name(parent_db_and_name,fk.getParentTable());
891 
892       if (!Fk_util::is_mock_name(name))
893         continue;
894 
895       mock_list.push_back(thd_strdup(m_thd, fk.getParentTable()));
896     }
897     DBUG_RETURN(true);
898   }
899 
900 
901   void
drop_mock_list(Ndb * ndb,NdbDictionary::Dictionary * dict,List<char> & drop_list)902   drop_mock_list(Ndb* ndb, NdbDictionary::Dictionary* dict, List<char> &drop_list)
903   {
904     const char* full_name;
905     List_iterator_fast<char> it(drop_list);
906     while ((full_name=it++))
907     {
908       DBUG_PRINT("info", ("drop table: '%s'", full_name));
909       char db_name[FN_LEN + 1];
910       const char * table_name = fk_split_name(db_name, full_name);
911       Ndb_db_guard db_guard(ndb);
912       setDbName(ndb, db_name);
913       Ndb_table_guard mocktab_g(dict, table_name);
914       if (!mocktab_g.get_table())
915       {
916        // Could not open the mock table
917        DBUG_PRINT("error", ("Could not open the listed mock table, ignore it"));
918        assert(false);
919        continue;
920       }
921 
922       if (dict->dropTableGlobal(*mocktab_g.get_table()) != 0)
923       {
924         DBUG_PRINT("error", ("Failed to drop the mock table '%s'",
925                               mocktab_g.get_table()->getName()));
926         assert(false);
927         continue;
928       }
929       info("Dropped mock table '%s' - referencing table dropped", table_name);
930     }
931   }
932 
933 
934   bool
drop(Ndb * ndb,NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table)935   drop(Ndb* ndb, NdbDictionary::Dictionary* dict,
936        const NdbDictionary::Table* table)
937   {
938     DBUG_ENTER("drop");
939 
940     // Start schema transaction to make this operation atomic
941     if (dict->beginSchemaTrans() != 0)
942     {
943       error(dict, "Failed to start schema transaction");
944       DBUG_RETURN(false);
945     }
946 
947     bool result = true;
948     if (!create_mock_tables_and_drop(ndb, dict, table))
949     {
950       // Operation failed, set flag to abort when ending trans
951       result = false;
952     }
953 
954     // End schema transaction
955     const Uint32 end_trans_flag = result ?  0 : NdbDictionary::Dictionary::SchemaTransAbort;
956     if (dict->endSchemaTrans(end_trans_flag) != 0)
957     {
958       error(dict, "Failed to end schema transaction");
959       result = false;
960     }
961 
962     DBUG_RETURN(result);
963   }
964 
count_fks(NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table,uint & count) const965   bool count_fks(NdbDictionary::Dictionary* dict,
966                  const NdbDictionary::Table* table, uint& count) const
967   {
968     DBUG_ENTER("count_fks");
969 
970     NdbDictionary::Dictionary::List list;
971     if (dict->listDependentObjects(list, *table) != 0)
972     {
973       error(dict, "Failed to list dependent objects for table '%s'", table->getName());
974       DBUG_RETURN(false);
975     }
976     for (unsigned i = 0; i < list.count; i++)
977     {
978       if (list.elements[i].type == NdbDictionary::Object::ForeignKey)
979         count++;
980     }
981     DBUG_PRINT("exit", ("count: %u", count));
982     DBUG_RETURN(true);
983   }
984 
985 
drop_fk(Ndb * ndb,NdbDictionary::Dictionary * dict,const char * fk_name)986   bool drop_fk(Ndb* ndb, NdbDictionary::Dictionary* dict, const char* fk_name)
987   {
988     DBUG_ENTER("drop_fk");
989 
990     NdbDictionary::ForeignKey fk;
991     if (dict->getForeignKey(fk, fk_name) != 0)
992     {
993       error(dict, "Could not find fk '%s'", fk_name);
994       assert(false);
995       DBUG_RETURN(false);
996     }
997 
998     char parent_db_and_name[FN_LEN + 1];
999     const char * parent_name = fk_split_name(parent_db_and_name,fk.getParentTable());
1000     if (Fk_util::is_mock_name(parent_name))
1001     {
1002       // Fk is referencing a mock table, drop the table
1003       // and the constraint at the same time
1004       Ndb_db_guard db_guard(ndb);
1005       setDbName(ndb, parent_db_and_name);
1006       Ndb_table_guard mocktab_g(dict, parent_name);
1007       if (mocktab_g.get_table())
1008       {
1009         const int drop_flags= NDBDICT::DropTableCascadeConstraints;
1010         if (dict->dropTableGlobal(*mocktab_g.get_table(), drop_flags) != 0)
1011         {
1012           error(dict, "Failed to drop fk mock table '%s'", parent_name);
1013           assert(false);
1014           DBUG_RETURN(false);
1015         }
1016         // table and fk dropped
1017         DBUG_RETURN(true);
1018       }
1019       else
1020       {
1021         warn("Could not open the fk mock table '%s', ignoring it...",
1022              parent_name);
1023         assert(false);
1024         // fallthrough and try to drop only the fk,
1025       }
1026     }
1027 
1028     if (dict->dropForeignKey(fk) != 0)
1029     {
1030       error(dict, "Failed to drop fk '%s'", fk_name);
1031       assert(false);
1032       DBUG_RETURN(false);
1033     }
1034     DBUG_RETURN(true);
1035   }
1036 
1037 
1038   void
resolve_mock_tables(NdbDictionary::Dictionary * dict,const char * new_parent_db,const char * new_parent_name) const1039   resolve_mock_tables(NdbDictionary::Dictionary* dict,
1040                       const char* new_parent_db,
1041                       const char* new_parent_name) const
1042   {
1043     DBUG_ENTER("resolve_mock_tables");
1044     DBUG_PRINT("enter", ("new_parent_db: %s, new_parent_name: %s",
1045                          new_parent_db, new_parent_name));
1046 
1047     /*
1048       List all tables in NDB and look for mock tables which could
1049       potentially be resolved to the new table
1050     */
1051     NdbDictionary::Dictionary::List table_list;
1052     if (dict->listObjects(table_list, NdbDictionary::Object::UserTable, true) != 0)
1053     {
1054       assert(false);
1055       DBUG_VOID_RETURN;
1056     }
1057 
1058     for (unsigned i = 0; i < table_list.count; i++)
1059     {
1060       const NdbDictionary::Dictionary::List::Element& el = table_list.elements[i];
1061 
1062       assert(el.type == NdbDictionary::Object::UserTable);
1063 
1064       // Check if table is in same database as the potential new parent
1065       if (strcmp(new_parent_db, el.database) != 0)
1066       {
1067         DBUG_PRINT("info", ("Skip, '%s.%s' is in different database",
1068                             el.database, el.name));
1069         continue;
1070       }
1071 
1072       const char* parent_name;
1073       if (!Fk_util::split_mock_name(el.name, NULL, NULL, &parent_name))
1074         continue;
1075 
1076       // Check if this mock table should reference the new table
1077       if (strcmp(parent_name, new_parent_name) != 0)
1078       {
1079         DBUG_PRINT("info", ("Skip, parent of this mock table is not the new table"));
1080         continue;
1081       }
1082 
1083       resolve_mock(dict, new_parent_name, el.name);
1084     }
1085 
1086     DBUG_VOID_RETURN;
1087   }
1088 
1089 
truncate_allowed(NdbDictionary::Dictionary * dict,const char * db,const NdbDictionary::Table * table,bool & allow) const1090   bool truncate_allowed(NdbDictionary::Dictionary* dict, const char* db,
1091                         const NdbDictionary::Table* table, bool& allow) const
1092   {
1093     DBUG_ENTER("truncate_allowed");
1094 
1095     NdbDictionary::Dictionary::List list;
1096     if (dict->listDependentObjects(list, *table) != 0)
1097     {
1098       error(dict, "Failed to list dependent objects for table '%s'", table->getName());
1099       DBUG_RETURN(false);
1100     }
1101     allow = true;
1102     for (unsigned i = 0; i < list.count; i++)
1103     {
1104       const NdbDictionary::Dictionary::List::Element& element = list.elements[i];
1105       if (element.type != NdbDictionary::Object::ForeignKey)
1106         continue;
1107 
1108       DBUG_PRINT("info", ("fk: %s", element.name));
1109 
1110       NdbDictionary::ForeignKey fk;
1111       if (dict->getForeignKey(fk, element.name) != 0)
1112       {
1113         error(dict, "Could not find the listed fk '%s'", element.name);
1114         assert(false);
1115         continue;
1116       }
1117 
1118       // Refuse if table is parent of fk
1119       char parent_db_and_name[FN_LEN + 1];
1120       const char * parent_name = fk_split_name(parent_db_and_name,
1121                                                fk.getParentTable());
1122       if (strcmp(db, parent_db_and_name) != 0 ||
1123           strcmp(parent_name, table->getName()) != 0)
1124       {
1125         // Not parent of the fk, skip
1126         continue;
1127       }
1128 
1129       allow = false;
1130       break;
1131     }
1132     DBUG_PRINT("exit", ("allow: %u", allow));
1133     DBUG_RETURN(true);
1134   }
1135 };
1136 
ndb_fk_util_build_list(THD * thd,NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table,List<char> & mock_list)1137 bool ndb_fk_util_build_list(THD* thd, NdbDictionary::Dictionary* dict,
1138                             const NdbDictionary::Table* table, List<char> &mock_list)
1139 {
1140   Fk_util fk_util(thd);
1141   return fk_util.build_mock_list(dict, table, mock_list);
1142 }
1143 
1144 
ndb_fk_util_drop_list(THD * thd,Ndb * ndb,NdbDictionary::Dictionary * dict,List<char> & drop_list)1145 void ndb_fk_util_drop_list(THD* thd, Ndb* ndb, NdbDictionary::Dictionary* dict, List<char> &drop_list)
1146 {
1147   Fk_util fk_util(thd);
1148   fk_util.drop_mock_list(ndb, dict, drop_list);
1149 }
1150 
1151 
ndb_fk_util_drop_table(THD * thd,Ndb * ndb,NdbDictionary::Dictionary * dict,const NdbDictionary::Table * table)1152 bool ndb_fk_util_drop_table(THD* thd, Ndb* ndb, NdbDictionary::Dictionary* dict,
1153                             const NdbDictionary::Table* table)
1154 {
1155   Fk_util fk_util(thd);
1156   return fk_util.drop(ndb, dict, table);
1157 }
1158 
1159 
ndb_fk_util_is_mock_name(const char * table_name)1160 bool ndb_fk_util_is_mock_name(const char* table_name)
1161 {
1162   return Fk_util::is_mock_name(table_name);
1163 }
1164 
1165 
1166 void
ndb_fk_util_resolve_mock_tables(THD * thd,NdbDictionary::Dictionary * dict,const char * new_parent_db,const char * new_parent_name)1167 ndb_fk_util_resolve_mock_tables(THD* thd, NdbDictionary::Dictionary* dict,
1168                                 const char* new_parent_db,
1169                                 const char* new_parent_name)
1170 {
1171   Fk_util fk_util(thd);
1172   fk_util.resolve_mock_tables(dict, new_parent_db, new_parent_name);
1173 }
1174 
1175 
ndb_fk_util_truncate_allowed(THD * thd,NdbDictionary::Dictionary * dict,const char * db,const NdbDictionary::Table * table,bool & allowed)1176 bool ndb_fk_util_truncate_allowed(THD* thd, NdbDictionary::Dictionary* dict,
1177                                   const char* db,
1178                                   const NdbDictionary::Table* table,
1179                                   bool& allowed)
1180 {
1181   Fk_util fk_util(thd);
1182   if (!fk_util.truncate_allowed(dict, db, table, allowed))
1183     return false;
1184   return true;
1185 }
1186 
1187 
1188 int
create_fks(THD * thd,Ndb * ndb)1189 ha_ndbcluster::create_fks(THD *thd, Ndb *ndb)
1190 {
1191   DBUG_ENTER("ha_ndbcluster::create_fks");
1192 
1193   // return real mysql error to avoid total randomness..
1194   const int err_default= HA_ERR_CANNOT_ADD_FOREIGN;
1195   char tmpbuf[FN_REFLEN];
1196 
1197   assert(thd->lex != 0);
1198   Key * key= 0;
1199   List_iterator<Key> key_iterator(thd->lex->alter_info.key_list);
1200   while ((key=key_iterator++))
1201   {
1202     if (key->type != KEYTYPE_FOREIGN)
1203       continue;
1204 
1205     NDBDICT *dict= ndb->getDictionary();
1206     Foreign_key * fk= reinterpret_cast<Foreign_key*>(key);
1207 
1208     /**
1209      * NOTE: we need to fetch also child table...
1210      *   cause the one we just created (in m_table) is not properly
1211      *   initialize
1212      */
1213     Ndb_table_guard child_tab(dict, m_tabname);
1214     if (child_tab.get_table() == 0)
1215     {
1216       ERR_RETURN(dict->getNdbError());
1217     }
1218 
1219     /**
1220      * NOTE 2: we mark the table as invalid
1221      *         so that it gets removed from GlobalDictCache if
1222      *         the schema transaction later fails...
1223      *
1224      * TODO: This code currently fetches table definition from data-nodes
1225      *       once per FK...which could be improved to once if a FK
1226      */
1227     child_tab.invalidate();
1228 
1229     /**
1230      * Get table columns columns...
1231      */
1232     const NDBCOL * childcols[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
1233     {
1234       unsigned pos= 0;
1235       const NDBTAB * tab= child_tab.get_table();
1236       Key_part_spec* col= 0;
1237       List_iterator<Key_part_spec> it1(fk->columns);
1238       while ((col= it1++))
1239       {
1240         const NDBCOL * ndbcol= tab->getColumn(lex2str(col->field_name,
1241                                                       tmpbuf, sizeof(tmpbuf)));
1242         if (ndbcol == 0)
1243         {
1244           push_warning_printf(thd, Sql_condition::SL_WARNING,
1245                               ER_CANNOT_ADD_FOREIGN,
1246                               "Child table %s has no column %s in NDB",
1247                               child_tab.get_table()->getName(), tmpbuf);
1248           DBUG_RETURN(err_default);
1249         }
1250         childcols[pos++]= ndbcol;
1251       }
1252       childcols[pos]= 0; // NULL terminate
1253     }
1254 
1255     bool child_primary_key= FALSE;
1256     const NDBINDEX* child_index= find_matching_index(dict,
1257                                                      child_tab.get_table(),
1258                                                      childcols,
1259                                                      child_primary_key);
1260 
1261     if (!child_primary_key && child_index == 0)
1262     {
1263       push_warning_printf(thd, Sql_condition::SL_WARNING,
1264                           ER_CANNOT_ADD_FOREIGN,
1265                           "Child table %s foreign key columns match no index in NDB",
1266                           child_tab.get_table()->getName());
1267       DBUG_RETURN(err_default);
1268     }
1269 
1270     Ndb_db_guard db_guard(ndb); // save db
1271 
1272     char parent_db[FN_REFLEN];
1273     char parent_name[FN_REFLEN];
1274     /*
1275      * Looking at Table_ident, testing for db.str first is safer
1276      * for valgrind.  Do same with table.str too.
1277      */
1278     if (fk->ref_db.str != 0 && fk->ref_db.length != 0)
1279     {
1280       my_snprintf(parent_db, sizeof(parent_db), "%*s",
1281                   (int)fk->ref_db.length,
1282                   fk->ref_db.str);
1283     }
1284     else
1285     {
1286       parent_db[0]= 0;
1287     }
1288     if (fk->ref_table.str != 0 && fk->ref_table.length != 0)
1289     {
1290       my_snprintf(parent_name, sizeof(parent_name), "%*s",
1291                   (int)fk->ref_table.length,
1292                   fk->ref_table.str);
1293     }
1294     else
1295     {
1296       parent_name[0]= 0;
1297     }
1298     if (lower_case_table_names)
1299     {
1300       ndb_fk_casedn(parent_db);
1301       ndb_fk_casedn(parent_name);
1302     }
1303     setDbName(ndb, parent_db);
1304     Ndb_table_guard parent_tab(dict, parent_name);
1305     if (parent_tab.get_table() == 0)
1306     {
1307        if (!thd_test_options(thd, OPTION_NO_FOREIGN_KEY_CHECKS))
1308        {
1309          const NdbError &error= dict->getNdbError();
1310          push_warning_printf(thd, Sql_condition::SL_WARNING,
1311                              ER_CANNOT_ADD_FOREIGN,
1312                              "Parent table %s not found in NDB: %d: %s",
1313                              parent_name,
1314                              error.code, error.message);
1315          DBUG_RETURN(err_default);
1316        }
1317 
1318        DBUG_PRINT("info", ("No parent and foreign_key_checks=0"));
1319 
1320        Fk_util fk_util(thd);
1321 
1322        /* Count the number of existing fks on table */
1323        uint existing = 0;
1324        if(!fk_util.count_fks(dict, child_tab.get_table(), existing))
1325        {
1326          DBUG_RETURN(err_default);
1327        }
1328 
1329        /* Format mock table name */
1330        char mock_name[FN_REFLEN];
1331        if (!fk_util.format_name(mock_name, sizeof(mock_name),
1332                                 child_tab.get_table()->getObjectId(),
1333                                 existing, parent_name))
1334        {
1335          push_warning_printf(thd, Sql_condition::SL_WARNING,
1336                              ER_CANNOT_ADD_FOREIGN,
1337                              "Failed to create mock parent table, too long mock name");
1338          DBUG_RETURN(err_default);
1339        }
1340        if (!fk_util.create(dict, mock_name, m_tabname,
1341                            fk->ref_columns, childcols))
1342        {
1343          const NdbError &error= dict->getNdbError();
1344          push_warning_printf(thd, Sql_condition::SL_WARNING,
1345                              ER_CANNOT_ADD_FOREIGN,
1346                              "Failed to create mock parent table in NDB: %d: %s",
1347                              error.code, error.message);
1348          DBUG_RETURN(err_default);
1349        }
1350 
1351        parent_tab.init(mock_name);
1352        parent_tab.invalidate(); // invalidate mock table when releasing
1353        if (parent_tab.get_table() == 0)
1354        {
1355          push_warning_printf(thd, Sql_condition::SL_WARNING,
1356                              ER_CANNOT_ADD_FOREIGN,
1357                              "INTERNAL ERROR: Could not find created mock table '%s'",
1358                              mock_name);
1359          // Internal error, should be able to load the just created mock table
1360          assert(parent_tab.get_table());
1361          DBUG_RETURN(err_default);
1362        }
1363     }
1364 
1365     const NDBCOL * parentcols[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
1366     {
1367       unsigned pos= 0;
1368       const NDBTAB * tab= parent_tab.get_table();
1369       Key_part_spec* col= 0;
1370       List_iterator<Key_part_spec> it1(fk->ref_columns);
1371       while ((col= it1++))
1372       {
1373         const NDBCOL * ndbcol= tab->getColumn(lex2str(col->field_name,
1374                                                       tmpbuf, sizeof(tmpbuf)));
1375         if (ndbcol == 0)
1376         {
1377           push_warning_printf(thd, Sql_condition::SL_WARNING,
1378                               ER_CANNOT_ADD_FOREIGN,
1379                               "Parent table %s has no column %s in NDB",
1380                               parent_tab.get_table()->getName(), tmpbuf);
1381           DBUG_RETURN(err_default);
1382         }
1383         parentcols[pos++]= ndbcol;
1384       }
1385       parentcols[pos]= 0; // NULL terminate
1386     }
1387 
1388     bool parent_primary_key= FALSE;
1389     const NDBINDEX* parent_index= find_matching_index(dict,
1390                                                       parent_tab.get_table(),
1391                                                       parentcols,
1392                                                       parent_primary_key);
1393 
1394     db_guard.restore(); // restore db
1395 
1396     if (!parent_primary_key && parent_index == 0)
1397     {
1398       my_error(ER_FK_NO_INDEX_PARENT, MYF(0),
1399                fk->name.str ? fk->name.str : "",
1400                parent_tab.get_table()->getName());
1401       DBUG_RETURN(err_default);
1402     }
1403 
1404     {
1405       /**
1406        * Check that columns match...this happens to be same
1407        *   condition as the one for SPJ...
1408        */
1409       for (unsigned i = 0; parentcols[i] != 0; i++)
1410       {
1411         if (parentcols[i]->isBindable(* childcols[i]) == -1)
1412         {
1413           push_warning_printf(thd, Sql_condition::SL_WARNING,
1414                               ER_CANNOT_ADD_FOREIGN,
1415                               "Parent column %s.%s is incompatible with child column %s.%s in NDB",
1416                               parent_tab.get_table()->getName(),
1417                               parentcols[i]->getName(),
1418                               child_tab.get_table()->getName(),
1419                               childcols[i]->getName());
1420           DBUG_RETURN(err_default);
1421         }
1422       }
1423     }
1424 
1425     NdbDictionary::ForeignKey ndbfk;
1426     char fk_name[FN_REFLEN];
1427     if (!isnull(fk->name))
1428     {
1429       my_snprintf(fk_name, sizeof(fk_name), "%s",
1430                   lex2str(fk->name, tmpbuf, sizeof(tmpbuf)));
1431     }
1432     else
1433     {
1434       my_snprintf(fk_name, sizeof(fk_name), "FK_%u_%u",
1435                   parent_index ?
1436                   parent_index->getObjectId() :
1437                   parent_tab.get_table()->getObjectId(),
1438                   child_index ?
1439                   child_index->getObjectId() :
1440                   child_tab.get_table()->getObjectId());
1441     }
1442     if (lower_case_table_names)
1443       ndb_fk_casedn(fk_name);
1444     ndbfk.setName(fk_name);
1445     ndbfk.setParent(* parent_tab.get_table(), parent_index, parentcols);
1446     ndbfk.setChild(* child_tab.get_table(), child_index, childcols);
1447 
1448     switch((fk_option)fk->delete_opt){
1449     case FK_OPTION_UNDEF:
1450     case FK_OPTION_NO_ACTION:
1451       ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::NoAction);
1452       break;
1453     case FK_OPTION_RESTRICT:
1454       ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::Restrict);
1455       break;
1456     case FK_OPTION_CASCADE:
1457       ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::Cascade);
1458       break;
1459     case FK_OPTION_SET_NULL:
1460       ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::SetNull);
1461       break;
1462     case FK_OPTION_DEFAULT:
1463       ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::SetDefault);
1464       break;
1465     default:
1466       assert(false);
1467       ndbfk.setOnDeleteAction(NdbDictionary::ForeignKey::NoAction);
1468     }
1469 
1470     switch((fk_option)fk->update_opt){
1471     case FK_OPTION_UNDEF:
1472     case FK_OPTION_NO_ACTION:
1473       ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::NoAction);
1474       break;
1475     case FK_OPTION_RESTRICT:
1476       ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::Restrict);
1477       break;
1478     case FK_OPTION_CASCADE:
1479       ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::Cascade);
1480       break;
1481     case FK_OPTION_SET_NULL:
1482       ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::SetNull);
1483       break;
1484     case FK_OPTION_DEFAULT:
1485       ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::SetDefault);
1486       break;
1487     default:
1488       assert(false);
1489       ndbfk.setOnUpdateAction(NdbDictionary::ForeignKey::NoAction);
1490     }
1491 
1492     int flags = 0;
1493     if (thd_test_options(thd, OPTION_NO_FOREIGN_KEY_CHECKS))
1494     {
1495       flags |= NdbDictionary::Dictionary::CreateFK_NoVerify;
1496     }
1497     NdbDictionary::ObjectId objid;
1498     int err= dict->createForeignKey(ndbfk, &objid, flags);
1499 
1500     if (child_index)
1501     {
1502       dict->removeIndexGlobal(* child_index, 0);
1503     }
1504 
1505     if (parent_index)
1506     {
1507       dict->removeIndexGlobal(* parent_index, 0);
1508     }
1509 
1510     if (err)
1511     {
1512       ERR_RETURN(dict->getNdbError());
1513     }
1514   }
1515 
1516   ndb_fk_util_resolve_mock_tables(thd, ndb->getDictionary(),
1517                                   m_dbname, m_tabname);
1518 
1519   DBUG_RETURN(0);
1520 }
1521 
1522 bool
is_fk_defined_on_table_or_index(uint index)1523 ha_ndbcluster::is_fk_defined_on_table_or_index(uint index)
1524 {
1525   /**
1526    * This doesnt seem implemented in Innodb either...
1527    */
1528   return FALSE;
1529 }
1530 
1531 uint
referenced_by_foreign_key()1532 ha_ndbcluster::referenced_by_foreign_key()
1533 {
1534   DBUG_ENTER("ha_ndbcluster::referenced_by_foreign_key");
1535 
1536   Ndb_fk_data *data= m_fk_data;
1537   if (data == 0)
1538   {
1539     assert(false);
1540     DBUG_RETURN(0);
1541   }
1542 
1543   DBUG_PRINT("info", ("count FKs total %u child %u parent %u",
1544                       data->list.elements, data->cnt_child, data->cnt_parent));
1545   DBUG_RETURN(data->cnt_parent != 0);
1546 }
1547 
1548 uint
is_child_or_parent_of_fk()1549 ha_ndbcluster::is_child_or_parent_of_fk()
1550 {
1551   DBUG_ENTER("ha_ndbcluster::is_child_or_parent_of_fk");
1552 
1553   Ndb_fk_data *data= m_fk_data;
1554   if (data == 0)
1555   {
1556     assert(false);
1557     DBUG_RETURN(0);
1558   }
1559 
1560   DBUG_PRINT("info", ("count FKs total %u child %u parent %u",
1561                       data->list.elements, data->cnt_child, data->cnt_parent));
1562   DBUG_RETURN(data->list.elements != 0);
1563 }
1564 
1565 bool
can_switch_engines()1566 ha_ndbcluster::can_switch_engines()
1567 {
1568   DBUG_ENTER("ha_ndbcluster::can_switch_engines");
1569 
1570   if (is_child_or_parent_of_fk())
1571     DBUG_RETURN(0);
1572 
1573   DBUG_RETURN(1);
1574 }
1575 
1576 static
1577 const char *
fk_split_name(char dst[],const char * src,bool index)1578 fk_split_name(char dst[], const char * src, bool index)
1579 {
1580   DBUG_PRINT("info", ("fk_split_name: %s index=%d", src, index));
1581 
1582   /**
1583    * Split a fully qualified (ndb) name into db and name
1584    *
1585    * Store result in dst
1586    */
1587   char * dstptr = dst;
1588   const char * save = src;
1589   while (src[0] != 0 && src[0] != '/')
1590   {
1591     * dstptr = * src;
1592     dstptr++;
1593     src++;
1594   }
1595 
1596   if (src[0] == 0)
1597   {
1598     /**
1599      * No '/' found
1600      *  set db to ''
1601      *  and return pointer to name
1602      *
1603      * This is for compability with create_fk/drop_fk tools...
1604      */
1605     dst[0] = 0;
1606     strcpy(dst + 1, save);
1607     DBUG_PRINT("info", ("fk_split_name: %s,%s", dst, dst + 1));
1608     return dst + 1;
1609   }
1610 
1611   assert(src[0] == '/');
1612   src++;
1613   * dstptr = 0;
1614   dstptr++;
1615 
1616   // Skip over catalog (not implemented)
1617   while (src[0] != '/')
1618   {
1619     src++;
1620   }
1621 
1622   assert(src[0] == '/');
1623   src++;
1624 
1625   /**
1626    * Indexes contains an extra /
1627    */
1628   if (index)
1629   {
1630     while (src[0] != '/')
1631     {
1632       src++;
1633     }
1634     assert(src[0] == '/');
1635     src++;
1636   }
1637   strcpy(dstptr, src);
1638   DBUG_PRINT("info", ("fk_split_name: %s,%s", dst, dstptr));
1639   return dstptr;
1640 }
1641 
1642 struct Ndb_mem_root_guard {
Ndb_mem_root_guardNdb_mem_root_guard1643   Ndb_mem_root_guard(MEM_ROOT *new_root) {
1644     root_ptr= my_thread_get_THR_MALLOC();
1645     assert(root_ptr != 0);
1646     old_root= *root_ptr;
1647     *root_ptr= new_root;
1648   }
~Ndb_mem_root_guardNdb_mem_root_guard1649   ~Ndb_mem_root_guard() {
1650     *root_ptr= old_root;
1651   }
1652 private:
1653   MEM_ROOT **root_ptr;
1654   MEM_ROOT *old_root;
1655 };
1656 
1657 int
get_fk_data(THD * thd,Ndb * ndb)1658 ha_ndbcluster::get_fk_data(THD *thd, Ndb *ndb)
1659 {
1660   DBUG_ENTER("ha_ndbcluster::get_fk_data");
1661 
1662   MEM_ROOT *mem_root= &m_fk_mem_root;
1663   Ndb_mem_root_guard mem_root_guard(mem_root);
1664 
1665   free_root(mem_root, 0);
1666   m_fk_data= 0;
1667   init_alloc_root(PSI_INSTRUMENT_ME, mem_root, fk_root_block_size, 0);
1668 
1669   NdbError err_OOM;
1670   err_OOM.code= 4000; // should we check OOM errors at all?
1671   NdbError err_API;
1672   err_API.code= 4011; // API internal should not happen
1673 
1674   Ndb_fk_data *data= new (mem_root) Ndb_fk_data;
1675   if (data == 0)
1676     ERR_RETURN(err_OOM);
1677   data->cnt_child= 0;
1678   data->cnt_parent= 0;
1679 
1680   DBUG_PRINT("info", ("%s.%s: list dependent objects",
1681                       m_dbname, m_tabname));
1682   int res;
1683   NDBDICT *dict= ndb->getDictionary();
1684   NDBDICT::List obj_list;
1685   res= dict->listDependentObjects(obj_list, *m_table);
1686   if (res != 0)
1687     ERR_RETURN(dict->getNdbError());
1688   DBUG_PRINT("info", ("found %u dependent objects", obj_list.count));
1689 
1690   for (unsigned i = 0; i < obj_list.count; i++)
1691   {
1692     const NDBDICT::List::Element &e= obj_list.elements[i];
1693     if (obj_list.elements[i].type != NdbDictionary::Object::ForeignKey)
1694     {
1695       DBUG_PRINT("info", ("skip non-FK %s type %d", e.name, e.type));
1696       continue;
1697     }
1698     DBUG_PRINT("info", ("found FK %s", e.name));
1699 
1700     NdbDictionary::ForeignKey fk;
1701     res= dict->getForeignKey(fk, e.name);
1702     if (res != 0)
1703       ERR_RETURN(dict->getNdbError());
1704 
1705     Ndb_fk_item *item= new (mem_root) Ndb_fk_item;
1706     if (item == 0)
1707       ERR_RETURN(err_OOM);
1708     FOREIGN_KEY_INFO &f_key_info= item->f_key_info;
1709 
1710     {
1711       char fk_full_name[FN_LEN + 1];
1712       const char * name = fk_split_name(fk_full_name, fk.getName());
1713       f_key_info.foreign_id = thd_make_lex_string(thd, 0, name,
1714                                                   (uint)strlen(name), 1);
1715     }
1716 
1717     {
1718       char child_db_and_name[FN_LEN + 1];
1719       const char * child_name = fk_split_name(child_db_and_name,
1720                                               fk.getChildTable());
1721 
1722       /* Dependent (child) database name */
1723       f_key_info.foreign_db =
1724         thd_make_lex_string(thd, 0, child_db_and_name,
1725                             (uint)strlen(child_db_and_name),
1726                             1);
1727       /* Dependent (child) table name */
1728       f_key_info.foreign_table =
1729         thd_make_lex_string(thd, 0, child_name,
1730                             (uint)strlen(child_name),
1731                             1);
1732 
1733       Ndb_db_guard db_guard(ndb);
1734       setDbName(ndb, child_db_and_name);
1735       Ndb_table_guard child_tab(dict, child_name);
1736       if (child_tab.get_table() == 0)
1737       {
1738         assert(false);
1739         ERR_RETURN(dict->getNdbError());
1740       }
1741 
1742       for (unsigned i = 0; i < fk.getChildColumnCount(); i++)
1743       {
1744         const NdbDictionary::Column * col =
1745           child_tab.get_table()->getColumn(fk.getChildColumnNo(i));
1746         if (col == 0)
1747           ERR_RETURN(err_API);
1748         LEX_STRING * name =
1749           thd_make_lex_string(thd, 0, col->getName(),
1750                               (uint)strlen(col->getName()), 1);
1751         f_key_info.foreign_fields.push_back(name);
1752       }
1753     }
1754 
1755     {
1756       char parent_db_and_name[FN_LEN + 1];
1757       const char * parent_name = fk_split_name(parent_db_and_name,
1758                                                fk.getParentTable());
1759 
1760       /* Referenced (parent) database name */
1761       f_key_info.referenced_db =
1762         thd_make_lex_string(thd, 0, parent_db_and_name,
1763                             (uint)strlen(parent_db_and_name),
1764                             1);
1765       /* Referenced (parent) table name */
1766       f_key_info.referenced_table =
1767         thd_make_lex_string(thd, 0, parent_name,
1768                             (uint)strlen(parent_name),
1769                             1);
1770 
1771       Ndb_db_guard db_guard(ndb);
1772       setDbName(ndb, parent_db_and_name);
1773       Ndb_table_guard parent_tab(dict, parent_name);
1774       if (parent_tab.get_table() == 0)
1775       {
1776         assert(false);
1777         ERR_RETURN(dict->getNdbError());
1778       }
1779 
1780       for (unsigned i = 0; i < fk.getParentColumnCount(); i++)
1781       {
1782         const NdbDictionary::Column * col =
1783           parent_tab.get_table()->getColumn(fk.getParentColumnNo(i));
1784         if (col == 0)
1785           ERR_RETURN(err_API);
1786         LEX_STRING * name =
1787           thd_make_lex_string(thd, 0, col->getName(),
1788                               (uint)strlen(col->getName()), 1);
1789         f_key_info.referenced_fields.push_back(name);
1790       }
1791 
1792     }
1793 
1794     {
1795       const char *update_method = "";
1796       switch (item->update_action= fk.getOnUpdateAction()){
1797       case NdbDictionary::ForeignKey::NoAction:
1798         update_method = "NO ACTION";
1799         break;
1800       case NdbDictionary::ForeignKey::Restrict:
1801         update_method = "RESTRICT";
1802         break;
1803       case NdbDictionary::ForeignKey::Cascade:
1804         update_method = "CASCADE";
1805         break;
1806       case NdbDictionary::ForeignKey::SetNull:
1807         update_method = "SET NULL";
1808         break;
1809       case NdbDictionary::ForeignKey::SetDefault:
1810         update_method = "SET DEFAULT";
1811         break;
1812       }
1813       f_key_info.update_method =
1814         thd_make_lex_string(thd, 0, update_method,
1815                             (uint)strlen(update_method),
1816                             1);
1817     }
1818 
1819     {
1820       const char *delete_method = "";
1821       switch (item->delete_action= fk.getOnDeleteAction()){
1822       case NdbDictionary::ForeignKey::NoAction:
1823         delete_method = "NO ACTION";
1824         break;
1825       case NdbDictionary::ForeignKey::Restrict:
1826         delete_method = "RESTRICT";
1827         break;
1828       case NdbDictionary::ForeignKey::Cascade:
1829         delete_method = "CASCADE";
1830         break;
1831       case NdbDictionary::ForeignKey::SetNull:
1832         delete_method = "SET NULL";
1833         break;
1834       case NdbDictionary::ForeignKey::SetDefault:
1835         delete_method = "SET DEFAULT";
1836         break;
1837       }
1838       f_key_info.delete_method =
1839         thd_make_lex_string(thd, 0, delete_method,
1840                             (uint)strlen(delete_method),
1841                             1);
1842     }
1843 
1844     if (fk.getParentIndex() != 0)
1845     {
1846       // sys/def/10/xb1$unique
1847       char db_and_name[FN_LEN + 1];
1848       const char * name=fk_split_name(db_and_name, fk.getParentIndex(), true);
1849       f_key_info.referenced_key_name =
1850         thd_make_lex_string(thd, 0, name,
1851                             (uint)strlen(name),
1852                             1);
1853     }
1854     else
1855     {
1856       const char* name= "PRIMARY";
1857       f_key_info.referenced_key_name =
1858         thd_make_lex_string(thd, 0, name,
1859                             (uint)strlen(name),
1860                             1);
1861     }
1862 
1863     item->is_child=
1864       strcmp(m_dbname, f_key_info.foreign_db->str) == 0 &&
1865       strcmp(m_tabname, f_key_info.foreign_table->str) == 0;
1866 
1867     item->is_parent=
1868       strcmp(m_dbname, f_key_info.referenced_db->str) == 0 &&
1869       strcmp(m_tabname, f_key_info.referenced_table->str) == 0;
1870 
1871     data->cnt_child+= item->is_child;
1872     data->cnt_parent+= item->is_parent;
1873 
1874     res= data->list.push_back(item);
1875     if (res != 0)
1876       ERR_RETURN(err_OOM);
1877   }
1878 
1879   DBUG_PRINT("info", ("count FKs total %u child %u parent %u",
1880                       data->list.elements, data->cnt_child, data->cnt_parent));
1881 
1882   m_fk_data= data;
1883   DBUG_RETURN(0);
1884 }
1885 
1886 void
release_fk_data(THD * thd)1887 ha_ndbcluster::release_fk_data(THD *thd)
1888 {
1889   DBUG_ENTER("ha_ndbcluster::release_fk_data");
1890 
1891   Ndb_fk_data *data= m_fk_data;
1892   if (data != 0)
1893   {
1894     DBUG_PRINT("info", ("count FKs total %u child %u parent %u",
1895                         data->list.elements, data->cnt_child, data->cnt_parent));
1896   }
1897 
1898   MEM_ROOT *mem_root= &m_fk_mem_root;
1899   free_root(mem_root, 0);
1900   m_fk_data= 0;
1901 
1902   DBUG_VOID_RETURN;
1903 }
1904 
1905 int
get_child_or_parent_fk_list(THD * thd,List<FOREIGN_KEY_INFO> * f_key_list,bool is_child,bool is_parent)1906 ha_ndbcluster::get_child_or_parent_fk_list(THD *thd,
1907                                            List<FOREIGN_KEY_INFO> * f_key_list,
1908                                            bool is_child, bool is_parent)
1909 {
1910   DBUG_ENTER("ha_ndbcluster::get_child_or_parent_fk_list");
1911   DBUG_PRINT("info", ("table %s.%s", m_dbname, m_tabname));
1912 
1913   Ndb_fk_data *data= m_fk_data;
1914   if (data == 0)
1915   {
1916     assert(false);
1917     DBUG_RETURN(0);
1918   }
1919 
1920   DBUG_PRINT("info", ("count FKs total %u child %u parent %u",
1921                       data->list.elements, data->cnt_child, data->cnt_parent));
1922 
1923   Ndb_fk_item *item= 0;
1924   List_iterator<Ndb_fk_item> iter(data->list);
1925   while ((item= iter++))
1926   {
1927     FOREIGN_KEY_INFO &f_key_info= item->f_key_info;
1928     DBUG_PRINT("info", ("FK %s ref %s -> %s is_child %d is_parent %d",
1929                         f_key_info.foreign_id->str,
1930                         f_key_info.foreign_table->str,
1931                         f_key_info.referenced_table->str,
1932                         item->is_child, item->is_parent));
1933     if (is_child && !item->is_child)
1934       continue;
1935     if (is_parent && !item->is_parent)
1936       continue;
1937 
1938     DBUG_PRINT("info", ("add %s to list", f_key_info.foreign_id->str));
1939     f_key_list->push_back(&f_key_info);
1940   }
1941 
1942   DBUG_RETURN(0);
1943 }
1944 
1945 int
get_foreign_key_list(THD * thd,List<FOREIGN_KEY_INFO> * f_key_list)1946 ha_ndbcluster::get_foreign_key_list(THD *thd,
1947                                     List<FOREIGN_KEY_INFO> * f_key_list)
1948 {
1949   DBUG_ENTER("ha_ndbcluster::get_foreign_key_list");
1950   int res= get_child_or_parent_fk_list(thd, f_key_list, true, false);
1951   DBUG_PRINT("info", ("count FKs child %u", f_key_list->elements));
1952   DBUG_RETURN(res);
1953 }
1954 
1955 int
get_parent_foreign_key_list(THD * thd,List<FOREIGN_KEY_INFO> * f_key_list)1956 ha_ndbcluster::get_parent_foreign_key_list(THD *thd,
1957                                            List<FOREIGN_KEY_INFO> * f_key_list)
1958 {
1959   DBUG_ENTER("ha_ndbcluster::get_parent_foreign_key_list");
1960   int res= get_child_or_parent_fk_list(thd, f_key_list, false, true);
1961   DBUG_PRINT("info", ("count FKs parent %u", f_key_list->elements));
1962   DBUG_RETURN(res);
1963 }
1964 
1965 static
1966 int
cmp_fk_name(const void * _e0,const void * _e1)1967 cmp_fk_name(const void * _e0, const void * _e1)
1968 {
1969   const NDBDICT::List::Element * e0 = (NDBDICT::List::Element*)_e0;
1970   const NDBDICT::List::Element * e1 = (NDBDICT::List::Element*)_e1;
1971   int res;
1972   if ((res= strcmp(e0->name, e1->name)) != 0)
1973     return res;
1974 
1975   if ((res= strcmp(e0->database, e1->database)) != 0)
1976     return res;
1977 
1978   if ((res= strcmp(e0->schema, e1->schema)) != 0)
1979     return res;
1980 
1981   return e0->id - e1->id;
1982 }
1983 
1984 char*
get_foreign_key_create_info()1985 ha_ndbcluster::get_foreign_key_create_info()
1986 {
1987   DBUG_ENTER("ha_ndbcluster::get_foreign_key_create_info");
1988 
1989   /**
1990    * List foreigns for this table
1991    */
1992   if (m_table == 0)
1993   {
1994     DBUG_RETURN(0);
1995   }
1996 
1997   if (table == 0)
1998   {
1999     DBUG_RETURN(0);
2000   }
2001 
2002   THD* thd = table->in_use;
2003   if (thd == 0)
2004   {
2005     DBUG_RETURN(0);
2006   }
2007 
2008   Ndb *ndb= get_ndb(thd);
2009   if (ndb == 0)
2010   {
2011     DBUG_RETURN(0);
2012   }
2013 
2014   NDBDICT *dict= ndb->getDictionary();
2015   NDBDICT::List obj_list;
2016 
2017   dict->listDependentObjects(obj_list, *m_table);
2018   /**
2019    * listDependentObjects will return FK's in order that they
2020    *   are stored in hash-table in Dbdict (i.e random)
2021    *
2022    * sort them to make MTR and similar happy
2023    */
2024   my_qsort(obj_list.elements, obj_list.count, sizeof(obj_list.elements[0]),
2025            cmp_fk_name);
2026   String fk_string;
2027   for (unsigned i = 0; i < obj_list.count; i++)
2028   {
2029     if (obj_list.elements[i].type != NdbDictionary::Object::ForeignKey)
2030       continue;
2031 
2032     NdbDictionary::ForeignKey fk;
2033     int res= dict->getForeignKey(fk, obj_list.elements[i].name);
2034     if (res != 0)
2035     {
2036       // Push warning??
2037       DBUG_RETURN(0);
2038     }
2039 
2040     NdbError err;
2041     const int noinvalidate= 0;
2042     const NDBTAB * childtab= 0;
2043     const NDBTAB * parenttab= 0;
2044 
2045     char parent_db_and_name[FN_LEN + 1];
2046     {
2047       const char * name = fk_split_name(parent_db_and_name,fk.getParentTable());
2048       setDbName(ndb, parent_db_and_name);
2049       parenttab= dict->getTableGlobal(name);
2050       if (parenttab == 0)
2051       {
2052         err= dict->getNdbError();
2053         goto errout;
2054       }
2055     }
2056 
2057     char child_db_and_name[FN_LEN + 1];
2058     {
2059       const char * name = fk_split_name(child_db_and_name, fk.getChildTable());
2060       setDbName(ndb, child_db_and_name);
2061       childtab= dict->getTableGlobal(name);
2062       if (childtab == 0)
2063       {
2064         err= dict->getNdbError();
2065         goto errout;
2066       }
2067     }
2068 
2069     if (! (strcmp(child_db_and_name, m_dbname) == 0 &&
2070            strcmp(childtab->getName(), m_tabname) == 0))
2071     {
2072       /**
2073        * this was on parent table (fk are shown on child table in SQL)
2074        */
2075       assert(strcmp(parent_db_and_name, m_dbname) == 0);
2076       assert(strcmp(parenttab->getName(), m_tabname) == 0);
2077       continue;
2078     }
2079 
2080     fk_string.append(",");
2081     fk_string.append("\n ");
2082     fk_string.append(" CONSTRAINT `");
2083     {
2084       char db_and_name[FN_LEN+1];
2085       const char * name = fk_split_name(db_and_name, fk.getName());
2086       fk_string.append(name);
2087     }
2088     fk_string.append("` FOREIGN KEY (");
2089 
2090     {
2091       bool first= true;
2092       for (unsigned j = 0; j < fk.getChildColumnCount(); j++)
2093       {
2094         unsigned no = fk.getChildColumnNo(j);
2095         if (!first)
2096         {
2097           fk_string.append(",");
2098         }
2099         fk_string.append("`");
2100         fk_string.append(childtab->getColumn(no)->getName());
2101         fk_string.append("`");
2102         first= false;
2103       }
2104     }
2105 
2106     fk_string.append(") REFERENCES `");
2107     if (strcmp(parent_db_and_name, child_db_and_name) != 0)
2108     {
2109       fk_string.append(parent_db_and_name);
2110       fk_string.append("`.`");
2111     }
2112 
2113 
2114     const char* real_parent_name;
2115     if (ndb_show_foreign_key_mock_tables(thd) == false &&
2116         Fk_util::split_mock_name(parenttab->getName(),
2117                                  NULL, NULL, &real_parent_name))
2118     {
2119       DBUG_PRINT("info", ("real_parent_name: %s", real_parent_name));
2120       fk_string.append(real_parent_name);
2121     }
2122     else
2123     {
2124       fk_string.append(parenttab->getName());
2125     }
2126 
2127     fk_string.append("` (");
2128 
2129     {
2130       bool first= true;
2131       for (unsigned j = 0; j < fk.getParentColumnCount(); j++)
2132       {
2133         unsigned no = fk.getParentColumnNo(j);
2134         if (!first)
2135         {
2136           fk_string.append(",");
2137         }
2138         fk_string.append("`");
2139         fk_string.append(parenttab->getColumn(no)->getName());
2140         fk_string.append("`");
2141         first= false;
2142       }
2143     }
2144     fk_string.append(")");
2145 
2146     switch(fk.getOnDeleteAction()){
2147     case NdbDictionary::ForeignKey::NoAction:
2148       fk_string.append(" ON DELETE NO ACTION");
2149       break;
2150     case NdbDictionary::ForeignKey::Restrict:
2151       fk_string.append(" ON DELETE RESTRICT");
2152       break;
2153     case NdbDictionary::ForeignKey::Cascade:
2154       fk_string.append(" ON DELETE CASCADE");
2155       break;
2156     case NdbDictionary::ForeignKey::SetNull:
2157       fk_string.append(" ON DELETE SET NULL");
2158       break;
2159     case NdbDictionary::ForeignKey::SetDefault:
2160       fk_string.append(" ON DELETE SET DEFAULT");
2161       break;
2162     }
2163 
2164     switch(fk.getOnUpdateAction()){
2165     case NdbDictionary::ForeignKey::NoAction:
2166       fk_string.append(" ON UPDATE NO ACTION");
2167       break;
2168     case NdbDictionary::ForeignKey::Restrict:
2169       fk_string.append(" ON UPDATE RESTRICT");
2170       break;
2171     case NdbDictionary::ForeignKey::Cascade:
2172       fk_string.append(" ON UPDATE CASCADE");
2173       break;
2174     case NdbDictionary::ForeignKey::SetNull:
2175       fk_string.append(" ON UPDATE SET NULL");
2176       break;
2177     case NdbDictionary::ForeignKey::SetDefault:
2178       fk_string.append(" ON UPDATE SET DEFAULT");
2179       break;
2180     }
2181 errout:
2182     if (childtab)
2183     {
2184       dict->removeTableGlobal(* childtab, noinvalidate);
2185     }
2186 
2187     if (parenttab)
2188     {
2189       dict->removeTableGlobal(* parenttab, noinvalidate);
2190     }
2191 
2192     if (err.code != 0)
2193     {
2194       push_warning_printf(thd, Sql_condition::SL_WARNING,
2195                           ER_ILLEGAL_HA_CREATE_OPTION,
2196                           "Failed to retreive FK information: %d:%s",
2197                           err.code,
2198                           err.message);
2199 
2200       DBUG_RETURN(0); // How to report error ??
2201     }
2202   }
2203 
2204   DBUG_RETURN(strdup(fk_string.c_ptr()));
2205 }
2206 
2207 void
free_foreign_key_create_info(char * str)2208 ha_ndbcluster::free_foreign_key_create_info(char* str)
2209 {
2210   if (str != 0)
2211   {
2212     free(str);
2213   }
2214 }
2215 
2216 int
copy_fk_for_offline_alter(THD * thd,Ndb * ndb,NDBTAB * _dsttab)2217 ha_ndbcluster::copy_fk_for_offline_alter(THD * thd, Ndb* ndb, NDBTAB* _dsttab)
2218 {
2219   DBUG_ENTER("ha_ndbcluster::copy_fk_for_offline_alter");
2220   if (thd->lex == 0)
2221   {
2222     assert(false);
2223     DBUG_RETURN(0);
2224   }
2225 
2226   Ndb_db_guard db_guard(ndb);
2227   const char * src_db = thd->lex->select_lex->table_list.first->db;
2228   const char * src_tab = thd->lex->select_lex->table_list.first->table_name;
2229 
2230   if (src_db == 0 || src_tab == 0)
2231   {
2232     assert(false);
2233     DBUG_RETURN(0);
2234   }
2235 
2236   assert(thd->lex != 0);
2237   NDBDICT* dict = ndb->getDictionary();
2238   setDbName(ndb, src_db);
2239   Ndb_table_guard srctab(dict, src_tab);
2240   if (srctab.get_table() == 0)
2241   {
2242     /**
2243      * when doign alter table engine=ndb this can happen
2244      */
2245     DBUG_RETURN(0);
2246   }
2247 
2248   db_guard.restore();
2249   Ndb_table_guard dsttab(dict, _dsttab->getName());
2250   if (dsttab.get_table() == 0)
2251   {
2252     ERR_RETURN(dict->getNdbError());
2253   }
2254 
2255   setDbName(ndb, src_db);
2256   NDBDICT::List obj_list;
2257   if (dict->listDependentObjects(obj_list, *srctab.get_table()) != 0)
2258   {
2259     ERR_RETURN(dict->getNdbError());
2260   }
2261 
2262   // check if fk to drop exists
2263   {
2264     Alter_drop * drop_item= 0;
2265     List_iterator<Alter_drop> drop_iterator(thd->lex->alter_info.drop_list);
2266     while ((drop_item=drop_iterator++))
2267     {
2268       if (drop_item->type != Alter_drop::FOREIGN_KEY)
2269         continue;
2270       bool found= false;
2271       for (unsigned i = 0; i < obj_list.count; i++)
2272       {
2273         char db_and_name[FN_LEN + 1];
2274         const char * name= fk_split_name(db_and_name,obj_list.elements[i].name);
2275         if (ndb_fk_casecmp(drop_item->name, name) != 0)
2276           continue;
2277 
2278         NdbDictionary::ForeignKey fk;
2279         if (dict->getForeignKey(fk, obj_list.elements[i].name) != 0)
2280         {
2281           ERR_RETURN(dict->getNdbError());
2282         }
2283 
2284         char child_db_and_name[FN_LEN + 1];
2285         const char* child_name = fk_split_name(child_db_and_name,
2286                                                fk.getChildTable());
2287         if (strcmp(child_db_and_name, src_db) == 0 &&
2288             strcmp(child_name, src_tab) == 0)
2289         {
2290           found= true;
2291           break;
2292         }
2293       }
2294       if (!found)
2295       {
2296         // FK not found
2297         my_error(ER_CANT_DROP_FIELD_OR_KEY, MYF(0), drop_item->name);
2298         DBUG_RETURN(ER_CANT_DROP_FIELD_OR_KEY);
2299       }
2300     }
2301   }
2302 
2303   for (unsigned i = 0; i < obj_list.count; i++)
2304   {
2305     if (obj_list.elements[i].type == NdbDictionary::Object::ForeignKey)
2306     {
2307       NdbDictionary::ForeignKey fk;
2308       if (dict->getForeignKey(fk, obj_list.elements[i].name) != 0)
2309       {
2310         ERR_RETURN(dict->getNdbError());
2311       }
2312 
2313       {
2314         /**
2315          * Check if it should be copied
2316          */
2317         char db_and_name[FN_LEN + 1];
2318         const char * name= fk_split_name(db_and_name,obj_list.elements[i].name);
2319 
2320         bool found= false;
2321         Alter_drop * drop_item= 0;
2322         List_iterator<Alter_drop> drop_iterator(thd->lex->alter_info.drop_list);
2323         while ((drop_item=drop_iterator++))
2324         {
2325           if (drop_item->type != Alter_drop::FOREIGN_KEY)
2326             continue;
2327           if (ndb_fk_casecmp(drop_item->name, name) != 0)
2328             continue;
2329 
2330           char child_db_and_name[FN_LEN + 1];
2331           const char* child_name = fk_split_name(child_db_and_name,
2332                                                  fk.getChildTable());
2333           if (strcmp(child_db_and_name, src_db) == 0 &&
2334               strcmp(child_name, src_tab) == 0)
2335           {
2336             found= true;
2337             break;
2338           }
2339         }
2340         if (found)
2341         {
2342           /**
2343            * Item is on drop list...
2344            *   don't copy it
2345            */
2346           continue;
2347         }
2348       }
2349 
2350       unsigned parentObjectId= 0;
2351       unsigned childObjectId= 0;
2352 
2353       {
2354         char db_and_name[FN_LEN + 1];
2355         const char * name= fk_split_name(db_and_name, fk.getParentTable());
2356         setDbName(ndb, db_and_name);
2357         Ndb_table_guard org_parent(dict, name);
2358         if (org_parent.get_table() == 0)
2359         {
2360           ERR_RETURN(dict->getNdbError());
2361         }
2362         parentObjectId= org_parent.get_table()->getObjectId();
2363       }
2364 
2365       {
2366         char db_and_name[FN_LEN + 1];
2367         const char * name= fk_split_name(db_and_name, fk.getChildTable());
2368         setDbName(ndb, db_and_name);
2369         Ndb_table_guard org_child(dict, name);
2370         if (org_child.get_table() == 0)
2371         {
2372           ERR_RETURN(dict->getNdbError());
2373         }
2374         childObjectId= org_child.get_table()->getObjectId();
2375       }
2376 
2377       /**
2378        * flags for CreateForeignKey
2379        */
2380       int flags = 0;
2381 
2382       char db_and_name[FN_LEN + 1];
2383       const char * name= fk_split_name(db_and_name, fk.getParentTable());
2384       if (strcmp(name, src_tab) == 0 &&
2385           strcmp(db_and_name, src_db) == 0)
2386       {
2387         /**
2388          * We used to be parent...
2389          */
2390         const NDBCOL * cols[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
2391         for (unsigned j= 0; j < fk.getParentColumnCount(); j++)
2392         {
2393           unsigned no= fk.getParentColumnNo(j);
2394           const NDBCOL * orgcol = srctab.get_table()->getColumn(no);
2395           cols[j]= dsttab.get_table()->getColumn(orgcol->getName());
2396         }
2397         cols[fk.getParentColumnCount()]= 0;
2398         parentObjectId= dsttab.get_table()->getObjectId();
2399         if (fk.getParentIndex() != 0)
2400         {
2401           name = fk_split_name(db_and_name, fk.getParentIndex(), true);
2402           setDbName(ndb, db_and_name);
2403           const NDBINDEX * idx = dict->getIndexGlobal(name,*dsttab.get_table());
2404           if (idx == 0)
2405           {
2406             printf("%u %s - %u/%u get_index(%s)\n",
2407                    __LINE__, fk.getName(),
2408                    parentObjectId,
2409                    childObjectId,
2410                    name); fflush(stdout);
2411             ERR_RETURN(dict->getNdbError());
2412           }
2413           fk.setParent(* dsttab.get_table(), idx, cols);
2414           dict->removeIndexGlobal(* idx, 0);
2415         }
2416         else
2417         {
2418           fk.setParent(* dsttab.get_table(), 0, cols);
2419         }
2420 
2421 
2422         /**
2423          * We're parent, and this is offline alter table
2424          *   then we can't verify that FK cause the new parent will
2425          *   be populated later during copy data between tables
2426          *
2427          * However, iff FK is consistent when this alter starts,
2428          *   it should remain consistent since mysql does not
2429          *   allow the alter to modify the columns referenced
2430          */
2431         flags |= NdbDictionary::Dictionary::CreateFK_NoVerify;
2432       }
2433       else
2434       {
2435         name = fk_split_name(db_and_name, fk.getChildTable());
2436         assert(strcmp(name, src_tab) == 0 &&
2437                strcmp(db_and_name, src_db) == 0);
2438         const NDBCOL * cols[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
2439         for (unsigned j= 0; j < fk.getChildColumnCount(); j++)
2440         {
2441           unsigned no= fk.getChildColumnNo(j);
2442           const NDBCOL * orgcol = srctab.get_table()->getColumn(no);
2443           cols[j]= dsttab.get_table()->getColumn(orgcol->getName());
2444         }
2445         cols[fk.getChildColumnCount()]= 0;
2446         childObjectId= dsttab.get_table()->getObjectId();
2447         if (fk.getChildIndex() != 0)
2448         {
2449           name = fk_split_name(db_and_name, fk.getChildIndex(), true);
2450           setDbName(ndb, db_and_name);
2451           const NDBINDEX * idx = dict->getIndexGlobal(name,*dsttab.get_table());
2452           if (idx == 0)
2453           {
2454             printf("%u %s - %u/%u get_index(%s)\n",
2455                    __LINE__, fk.getName(),
2456                    parentObjectId,
2457                    childObjectId,
2458                    name); fflush(stdout);
2459             ERR_RETURN(dict->getNdbError());
2460           }
2461           fk.setChild(* dsttab.get_table(), idx, cols);
2462           dict->removeIndexGlobal(* idx, 0);
2463         }
2464         else
2465         {
2466           fk.setChild(* dsttab.get_table(), 0, cols);
2467         }
2468       }
2469 
2470       char new_name[FN_LEN + 1];
2471       name= fk_split_name(db_and_name, fk.getName());
2472       my_snprintf(new_name, sizeof(new_name), "%s",
2473                   name);
2474       fk.setName(new_name);
2475       setDbName(ndb, db_and_name);
2476 
2477       if (thd_test_options(thd, OPTION_NO_FOREIGN_KEY_CHECKS))
2478       {
2479         flags |= NdbDictionary::Dictionary::CreateFK_NoVerify;
2480       }
2481       NdbDictionary::ObjectId objid;
2482       if (dict->createForeignKey(fk, &objid, flags) != 0)
2483       {
2484         ERR_RETURN(dict->getNdbError());
2485       }
2486     }
2487   }
2488   DBUG_RETURN(0);
2489 }
2490 
2491 int
drop_fk_for_online_alter(THD * thd,Ndb * ndb,NDBDICT * dict,const NDBTAB * tab)2492 ha_ndbcluster::drop_fk_for_online_alter(THD * thd, Ndb* ndb, NDBDICT * dict,
2493                                         const NDBTAB* tab)
2494 {
2495   DBUG_ENTER("ha_ndbcluster::drop_fk_for_online_alter");
2496   if (thd->lex == 0)
2497   {
2498     assert(false);
2499     DBUG_RETURN(0);
2500   }
2501 
2502   Ndb_table_guard srctab(dict, tab->getName());
2503   if (srctab.get_table() == 0)
2504   {
2505     assert(false); // Why ??
2506     DBUG_RETURN(0);
2507   }
2508 
2509   NDBDICT::List obj_list;
2510   if (dict->listDependentObjects(obj_list, *srctab.get_table()) != 0)
2511   {
2512     ERR_RETURN(dict->getNdbError());
2513   }
2514 
2515   Alter_drop * drop_item= 0;
2516   List_iterator<Alter_drop> drop_iterator(thd->lex->alter_info.drop_list);
2517   while ((drop_item=drop_iterator++))
2518   {
2519     if (drop_item->type != Alter_drop::FOREIGN_KEY)
2520       continue;
2521 
2522     bool found= false;
2523     for (unsigned i = 0; i < obj_list.count; i++)
2524     {
2525       if (obj_list.elements[i].type != NdbDictionary::Object::ForeignKey)
2526       {
2527         continue;
2528       }
2529 
2530       char db_and_name[FN_LEN + 1];
2531       const char * name= fk_split_name(db_and_name,obj_list.elements[i].name);
2532 
2533       if (ndb_fk_casecmp(drop_item->name, name) != 0)
2534         continue;
2535 
2536       NdbDictionary::ForeignKey fk;
2537       if (dict->getForeignKey(fk, obj_list.elements[i].name) != 0)
2538       {
2539         ERR_RETURN(dict->getNdbError());
2540       }
2541 
2542       char child_db_and_name[FN_LEN + 1];
2543       const char* child_name = fk_split_name(child_db_and_name,
2544                                              fk.getChildTable());
2545       if (strcmp(child_db_and_name, ndb->getDatabaseName()) == 0 &&
2546           strcmp(child_name, tab->getName()) == 0)
2547       {
2548         found= true;
2549         Fk_util fk_util(thd);
2550         if (!fk_util.drop_fk(ndb, dict, obj_list.elements[i].name))
2551         {
2552           ERR_RETURN(dict->getNdbError());
2553         }
2554         break;
2555       }
2556     }
2557     if (!found)
2558     {
2559       // FK not found
2560       my_error(ER_CANT_DROP_FIELD_OR_KEY, MYF(0), drop_item->name);
2561       DBUG_RETURN(ER_CANT_DROP_FIELD_OR_KEY);
2562     }
2563   }
2564   DBUG_RETURN(0);
2565 }
2566 
2567 
2568 /**
2569   Save all fk data into a fk_list
2570   - Build list of foreign keys for which the given table is child
2571 
2572   @retval
2573     0     ok
2574   @retval
2575    != 0   failure in saving the fk data
2576 */
2577 
2578 int
get_fk_data_for_truncate(NDBDICT * dict,const NDBTAB * table,Ndb_fk_list & fk_list)2579 ha_ndbcluster::get_fk_data_for_truncate(NDBDICT* dict, const NDBTAB* table,
2580                                         Ndb_fk_list& fk_list)
2581 {
2582   DBUG_ENTER("ha_ndbcluster::get_fk_data_for_truncate");
2583 
2584   NDBDICT::List obj_list;
2585   if (dict->listDependentObjects(obj_list, *table) != 0)
2586   {
2587     ERR_RETURN(dict->getNdbError());
2588   }
2589   for (unsigned i = 0; i < obj_list.count; i++)
2590   {
2591     DBUG_PRINT("debug", ("DependentObject %d : %s, Type : %d", i,
2592                           obj_list.elements[i].name,
2593                           obj_list.elements[i].type));
2594     if (obj_list.elements[i].type != NdbDictionary::Object::ForeignKey)
2595       continue;
2596 
2597     /* obj is an fk. Fetch it */
2598     NDBFK fk;
2599     if (dict->getForeignKey(fk, obj_list.elements[i].name) != 0)
2600     {
2601       ERR_RETURN(dict->getNdbError());
2602     }
2603     DBUG_PRINT("debug", ("Retrieving FK : %s", fk.getName()));
2604 
2605     fk_list.push_back(new NdbDictionary::ForeignKey(fk));
2606     DBUG_PRINT("info", ("Foreign Key added to list : %s", fk.getName()));
2607   }
2608 
2609   DBUG_RETURN(0);
2610 }
2611 
2612 
2613 /**
2614   Restore foreign keys into the child table from fk_list
2615   - for all foreign keys in the given fk list, re-assign child object ids
2616     to reflect the newly created child table/indexes
2617   - create the fk in the child table
2618 
2619   @retval
2620     0     ok
2621   @retval
2622    != 0   failure in recreating the fk data
2623 */
2624 
2625 int
recreate_fk_for_truncate(THD * thd,Ndb * ndb,const char * tab_name,Ndb_fk_list & fk_list)2626 ha_ndbcluster::recreate_fk_for_truncate(THD* thd, Ndb* ndb, const char* tab_name,
2627                                         Ndb_fk_list& fk_list)
2628 {
2629   DBUG_ENTER("ha_ndbcluster::create_fk_for_truncate");
2630 
2631   int flags = 0;
2632   const int err_default= HA_ERR_CANNOT_ADD_FOREIGN;
2633 
2634   NDBDICT* dict = ndb->getDictionary();
2635 
2636   /* fetch child table */
2637   Ndb_table_guard child_tab(dict, tab_name);
2638   if (child_tab.get_table() == 0)
2639   {
2640     push_warning_printf(thd, Sql_condition::SL_WARNING,
2641                         ER_CANNOT_ADD_FOREIGN,
2642                         "INTERNAL ERROR: Could not find created child table '%s'",
2643                         tab_name);
2644     // Internal error, should be able to load the just created child table
2645     assert(child_tab.get_table());
2646     DBUG_RETURN(err_default);
2647   }
2648 
2649   NDBFK* fk;
2650   List_iterator<NDBFK> fk_iterator(fk_list);
2651   while ((fk= fk_iterator++))
2652   {
2653     DBUG_PRINT("info",("Parsing foreign key : %s", fk->getName()));
2654 
2655     /* Get child table columns and index */
2656     const NDBCOL * child_cols[NDB_MAX_ATTRIBUTES_IN_INDEX + 1];
2657     {
2658       unsigned pos= 0;
2659       const NDBTAB* tab= child_tab.get_table();
2660       for(unsigned i= 0; i < fk->getChildColumnCount(); i++)
2661       {
2662         const NDBCOL * ndbcol= tab->getColumn(fk->getChildColumnNo(i));
2663         if (ndbcol == 0)
2664         {
2665           push_warning_printf(thd, Sql_condition::SL_WARNING,
2666                               ER_CANNOT_ADD_FOREIGN,
2667                               "Child table %s has no column referred by the FK %s",
2668                               tab->getName(), fk->getName());
2669           assert(ndbcol);
2670           DBUG_RETURN(err_default);
2671         }
2672         child_cols[pos++]= ndbcol;
2673       }
2674       child_cols[pos]= 0;
2675     }
2676 
2677     bool child_primary_key= FALSE;
2678     const NDBINDEX* child_index= find_matching_index(dict,
2679                                                      child_tab.get_table(),
2680                                                      child_cols,
2681                                                      child_primary_key);
2682 
2683     if (!child_primary_key && child_index == 0)
2684     {
2685       my_error(ER_FK_NO_INDEX_CHILD, MYF(0), fk->getName(),
2686                child_tab.get_table()->getName());
2687       DBUG_RETURN(err_default);
2688     }
2689 
2690     /* update the fk's child references */
2691     fk->setChild(* child_tab.get_table(), child_index, child_cols);
2692 
2693     /*
2694      the name of "fk" seems to be different when you read it up
2695      compared to when you create it. (Probably a historical artifact)
2696      So update fk's name
2697     */
2698     {
2699       char name[FN_REFLEN+1];
2700       unsigned parent_id, child_id;
2701       if (sscanf(fk->getName(), "%u/%u/%s",
2702                  &parent_id, &child_id, name) != 3)
2703       {
2704         push_warning_printf(thd, Sql_condition::SL_WARNING,
2705                             ER_CANNOT_ADD_FOREIGN,
2706                             "Skip, failed to parse name of fk: %s",
2707                             fk->getName());
2708         DBUG_RETURN(err_default);
2709       }
2710 
2711       char fk_name[FN_REFLEN+1];
2712       my_snprintf(fk_name, sizeof(fk_name), "%s",
2713                   name);
2714       DBUG_PRINT("info", ("Setting new fk name: %s", fk_name));
2715       fk->setName(fk_name);
2716     }
2717 
2718     if (thd_test_options(thd, OPTION_NO_FOREIGN_KEY_CHECKS))
2719     {
2720       flags |= NdbDictionary::Dictionary::CreateFK_NoVerify;
2721     }
2722 
2723     NdbDictionary::ObjectId objid;
2724     int err= dict->createForeignKey(*fk, &objid, flags);
2725 
2726     if (child_index)
2727     {
2728       dict->removeIndexGlobal(* child_index, 0);
2729     }
2730 
2731     if (err)
2732     {
2733       ERR_RETURN(dict->getNdbError());
2734     }
2735   }
2736   DBUG_RETURN(0);
2737 }
2738