1 /*
2    Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 // Implements the functions declared in ndb_dd.h
26 #include "storage/ndb/plugin/ndb_dd.h"
27 
28 // Using
29 #include "sql/dd/dd.h"
30 #include "sql/dd/impl/types/object_table_definition_impl.h"  // fs_name_case()
31 #include "sql/dd/properties.h"
32 #include "sql/dd/types/index.h"
33 #include "sql/dd/types/partition.h"
34 #include "sql/dd/types/partition_index.h"
35 #include "sql/dd/types/table.h"
36 #include "sql/sql_class.h"
37 #include "sql/table.h"
38 #include "sql/thd_raii.h"
39 #include "sql/transaction.h"
40 #include "storage/ndb/plugin/ndb_dd_client.h"
41 #include "storage/ndb/plugin/ndb_dd_fk.h"
42 #include "storage/ndb/plugin/ndb_dd_sdi.h"
43 #include "storage/ndb/plugin/ndb_dd_table.h"
44 #include "storage/ndb/plugin/ndb_fk_util.h"
45 #include "storage/ndb/plugin/ndb_name_util.h"
46 #include "storage/ndb/plugin/ndb_schema_dist_table.h"
47 
ndb_sdi_serialize(THD * thd,const dd::Table * table_def,const char * schema_name_str,dd::sdi_t & sdi)48 bool ndb_sdi_serialize(THD *thd, const dd::Table *table_def,
49                        const char *schema_name_str, dd::sdi_t &sdi) {
50   const dd::String_type schema_name(schema_name_str);
51   // Require the table to be visible, hidden by SE(like mysql.ndb_schema)
52   // or else have temporary name
53   DBUG_ASSERT(table_def->hidden() == dd::Abstract_table::HT_VISIBLE ||
54               table_def->hidden() == dd::Abstract_table::HT_HIDDEN_SE ||
55               ndb_name_is_temp(table_def->name().c_str()));
56 
57   // Make a copy of the table definition to allow it to
58   // be modified before serialization
59   std::unique_ptr<dd::Table> table_def_clone(table_def->clone());
60 
61   // Check that dd::Table::clone() properly clones the table definition
62   // by comparing the serialized table def before and after clone()
63   DBUG_ASSERT(ndb_dd_sdi_serialize(thd, *table_def, schema_name) ==
64               ndb_dd_sdi_serialize(thd, *table_def_clone, schema_name));
65 
66   // Don't include the se_private_id in the serialized table def.
67   table_def_clone->set_se_private_id(dd::INVALID_OBJECT_ID);
68 
69   // Don't include any se_private_data properties in the
70   // serialized table def.
71   table_def_clone->se_private_data().clear();
72 
73   sdi = ndb_dd_sdi_serialize(thd, *table_def_clone, schema_name);
74   if (sdi.empty()) {
75     return false;  // Failed to serialize
76   }
77   return true;  // OK
78 }
79 
80 /*
81   Workaround for BUG#25657041
82 
83   During inplace alter table, the table has a temporary
84   tablename and is also marked as hidden. Since the temporary
85   name and hidden status is part of the serialized table
86   definition, there's a mismatch down the line when this is
87   stored as extra metadata in the NDB dictionary.
88 
89   The workaround for now involves setting the table as a user
90   visible table and restoring the original table name
91 */
92 
ndb_dd_fix_inplace_alter_table_def(dd::Table * table_def,const char * proper_table_name)93 void ndb_dd_fix_inplace_alter_table_def(dd::Table *table_def,
94                                         const char *proper_table_name) {
95   DBUG_TRACE;
96   DBUG_PRINT("enter", ("table_name: %s", table_def->name().c_str()));
97   DBUG_PRINT("enter", ("proper_table_name: %s", proper_table_name));
98 
99   // Check that the proper_table_name is not a temporary name
100   DBUG_ASSERT(!ndb_name_is_temp(proper_table_name));
101 
102   table_def->set_name(proper_table_name);
103   table_def->set_hidden(dd::Abstract_table::HT_VISIBLE);
104 }
105 
106 /**
107   Update the version of the Schema object in DD. All the DDLs
108   creating/altering a database will be associated with a unique counter
109   value and the node id from which they originated in the ndb_schema table.
110   These two values, the counter and node id, together form the version of
111   the schema and are set in the se_private_data field of the Schema.
112 
113   @param thd                  Thread object
114   @param schema_name          The name of the Schema to be updated.
115   @param counter              The unique counter associated with the DDL that
116                               created/altered the database.
117   @param node_id              The node id in which the DDL originated.
118   @param skip_commit          If set true, function will skip the commit,
119                               disable auto rollback.
120                               If set false, function will commit the changes.
121                               (default)
122   @return true        On success.
123   @return false       On failure
124 */
ndb_dd_update_schema_version(THD * thd,const char * schema_name,unsigned int counter,unsigned int node_id,bool skip_commit)125 bool ndb_dd_update_schema_version(THD *thd, const char *schema_name,
126                                   unsigned int counter, unsigned int node_id,
127                                   bool skip_commit) {
128   DBUG_TRACE;
129   DBUG_PRINT("enter", ("Schema : %s, counter : %u, node_id : %u", schema_name,
130                        counter, node_id));
131 
132   Ndb_dd_client dd_client(thd);
133   /* Convert the schema name to lower case on platforms that have
134      lower_case_table_names set to 2 */
135   const std::string dd_schema_name = ndb_dd_fs_name_case(schema_name);
136 
137   if (!dd_client.mdl_lock_schema_exclusive(dd_schema_name.c_str())) {
138     DBUG_PRINT("error", ("Failed to acquire exclusive lock on schema '%s'",
139                          schema_name));
140     return false;
141   }
142 
143   if (!dd_client.update_schema_version(dd_schema_name.c_str(), counter,
144                                        node_id)) {
145     return false;
146   }
147 
148   if (!skip_commit) {
149     dd_client.commit();
150   } else {
151     dd_client.disable_auto_rollback();
152   }
153 
154   return true;
155 }
156 
ndb_dd_has_local_tables_in_schema(THD * thd,const char * schema_name,bool & tables_exist_in_database)157 bool ndb_dd_has_local_tables_in_schema(THD *thd, const char *schema_name,
158                                        bool &tables_exist_in_database) {
159   DBUG_TRACE;
160   DBUG_PRINT("enter",
161              ("Checking if schema '%s' has local tables", schema_name));
162 
163   Ndb_dd_client dd_client(thd);
164   /* Convert the schema name to lower case on platforms that have
165      lower_case_table_names set to 2 */
166   const std::string dd_schema_name = ndb_dd_fs_name_case(schema_name);
167 
168   /* Lock the schema in DD */
169   if (!dd_client.mdl_lock_schema(dd_schema_name.c_str())) {
170     DBUG_PRINT("error", ("Failed to acquire MDL on schema '%s'", schema_name));
171     return false;
172   }
173 
174   /* Check if there are any local tables */
175   if (!dd_client.have_local_tables_in_schema(dd_schema_name.c_str(),
176                                              &tables_exist_in_database)) {
177     DBUG_PRINT("error", ("Failed to check if the schema '%s' has local tables",
178                          schema_name));
179     return false;
180   }
181 
182   return true;
183 }
184 
ndb_dd_fs_name_case(const dd::String_type & name)185 const std::string ndb_dd_fs_name_case(const dd::String_type &name) {
186   char name_buf[NAME_LEN + 1];
187   const std::string lc_name =
188       dd::Object_table_definition_impl::fs_name_case(name, name_buf);
189   return lc_name;
190 }
191 
ndb_dd_get_schema_uuid(THD * thd,dd::String_type * dd_schema_uuid)192 bool ndb_dd_get_schema_uuid(THD *thd, dd::String_type *dd_schema_uuid) {
193   DBUG_TRACE;
194   Ndb_dd_client dd_client(thd);
195 
196   const char *schema_name = Ndb_schema_dist_table::DB_NAME.c_str();
197   const char *table_name = Ndb_schema_dist_table::TABLE_NAME.c_str();
198 
199   // Lock the table for reading schema uuid
200   if (!dd_client.mdl_lock_table(schema_name, table_name)) {
201     DBUG_PRINT("error",
202                ("Failed to lock `%s.%s` in DD.", schema_name, table_name));
203     return false;
204   }
205 
206   // Retrieve the schema uuid stored in the ndb_schema table in DD
207   if (!dd_client.get_schema_uuid(dd_schema_uuid)) {
208     DBUG_PRINT("error", ("Failed to read schema UUID from DD"));
209     return false;
210   }
211 
212   return true;
213 }
214 
ndb_dd_update_schema_uuid(THD * thd,const std::string & ndb_schema_uuid)215 bool ndb_dd_update_schema_uuid(THD *thd, const std::string &ndb_schema_uuid) {
216   DBUG_TRACE;
217   Ndb_dd_client dd_client(thd);
218 
219   const char *schema_name = Ndb_schema_dist_table::DB_NAME.c_str();
220   const char *table_name = Ndb_schema_dist_table::TABLE_NAME.c_str();
221 
222   // Acquire exclusive locks on the table
223   if (!dd_client.mdl_locks_acquire_exclusive(schema_name, table_name)) {
224     DBUG_PRINT("error", ("Failed to acquire exclusive lock `%s.%s` in DD.",
225                          schema_name, table_name));
226     return false;
227   }
228 
229   // Update the schema UUID in DD
230   if (!dd_client.update_schema_uuid(ndb_schema_uuid.c_str())) {
231     DBUG_PRINT("error", ("Failed to update schema uuid in DD."));
232     return false;
233   }
234 
235   // Commit the change into DD and return
236   dd_client.commit();
237   return true;
238 }
239 
240 /**
241   Extract all the foreign key constraint definitions on the given table from
242   NDB and install them in the DD table.
243 
244   @param[out] dd_table_def    The DD table object on which the foreign keys
245                               are to be defined.
246   @param ndb                  The Ndb object.
247   @param ndb_table            The NDB table object from which the foreign key
248                               definitions are to be extracted.
249 
250   @return true        On success.
251   @return false       On failure
252 */
ndb_dd_upgrade_foreign_keys(dd::Table * dd_table_def,Ndb * ndb,const NdbDictionary::Table * ndb_table)253 bool ndb_dd_upgrade_foreign_keys(dd::Table *dd_table_def, Ndb *ndb,
254                                  const NdbDictionary::Table *ndb_table) {
255   DBUG_TRACE;
256 
257   // Retrieve the foreign key list
258   Ndb_fk_list fk_list;
259   if (!retrieve_foreign_key_list_from_ndb(ndb->getDictionary(), ndb_table,
260                                           &fk_list)) {
261     return false;
262   }
263 
264   // Loop all foreign keys and add them to the dd table object
265   for (const NdbDictionary::ForeignKey &ndb_fk : fk_list) {
266     char child_schema_name[FN_REFLEN + 1];
267     const char *child_table_name =
268         fk_split_name(child_schema_name, ndb_fk.getChildTable());
269     if (strcmp(child_schema_name, ndb->getDatabaseName()) != 0 ||
270         strcmp(child_table_name, ndb_table->getName())) {
271       // The FK is just referencing the table. Skip it.
272       // It will be handled by the table on which it exists.
273       continue;
274     }
275 
276     // Add the foreign key to the DD table
277     dd::Foreign_key *dd_fk_def = dd_table_def->add_foreign_key();
278 
279     // Open the parent table from NDB
280     char parent_schema_name[FN_REFLEN + 1];
281     const char *parent_table_name =
282         fk_split_name(parent_schema_name, ndb_fk.getParentTable());
283     if (strcmp(child_schema_name, parent_schema_name) == 0 &&
284         strcmp(child_table_name, parent_table_name) == 0) {
285       // Self referencing foreign key.
286       // Use the child table as parent and update the foreign key information.
287       if (!ndb_dd_fk_set_values_from_ndb(dd_fk_def, dd_table_def, ndb_fk,
288                                          ndb_table, ndb_table,
289                                          parent_schema_name)) {
290         return false;
291       }
292     } else {
293       Ndb_table_guard ndb_parent_table_guard(ndb, parent_schema_name,
294                                              parent_table_name);
295       const NdbDictionary::Table *ndb_parent_table =
296           ndb_parent_table_guard.get_table();
297       if (ndb_parent_table == nullptr) {
298         DBUG_PRINT("error",
299                    ("Unable to load table '%s.%s' from ndb. Error : %s",
300                     parent_schema_name, parent_table_name,
301                     ndb->getDictionary()->getNdbError().message));
302         return false;
303       }
304 
305       // Update the foreign key information
306       if (!ndb_dd_fk_set_values_from_ndb(dd_fk_def, dd_table_def, ndb_fk,
307                                          ndb_table, ndb_parent_table,
308                                          parent_schema_name)) {
309         return false;
310       }
311     }
312   }
313   return true;
314 }
315