1 /*
2    Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "storage/ndb/plugin/ndb_dd_client.h"
26 
27 #include <assert.h>
28 #include <iostream>
29 
30 #include "my_dbug.h"
31 #include "sql/auth/auth_common.h"  // check_readonly()
32 #include "sql/dd/cache/dictionary_client.h"
33 #include "sql/dd/dd.h"
34 #include "sql/dd/dd_table.h"
35 #include "sql/dd/properties.h"
36 #include "sql/dd/types/schema.h"
37 #include "sql/dd/types/table.h"
38 #include "sql/query_options.h"  // OPTION_AUTOCOMMIT
39 #include "sql/sql_class.h"      // THD
40 #include "sql/sql_table.h"
41 #include "sql/sql_trigger.h"  // remove_all_triggers_from_perfschema
42 #include "sql/system_variables.h"
43 #include "sql/transaction.h"            // trans_*
44 #include "storage/ndb/plugin/ndb_dd.h"  // ndb_dd_fs_name_case
45 #include "storage/ndb/plugin/ndb_dd_disk_data.h"
46 #include "storage/ndb/plugin/ndb_dd_schema.h"
47 #include "storage/ndb/plugin/ndb_dd_sdi.h"
48 #include "storage/ndb/plugin/ndb_dd_table.h"
49 #include "storage/ndb/plugin/ndb_dd_upgrade_table.h"
50 #include "storage/ndb/plugin/ndb_fk_util.h"
51 #include "storage/ndb/plugin/ndb_log.h"
52 #include "storage/ndb/plugin/ndb_schema_dist_table.h"
53 #include "storage/ndb/plugin/ndb_tdc.h"
54 #include "storage/ndb/plugin/ndb_thd.h"
55 
Ndb_dd_client(THD * thd)56 Ndb_dd_client::Ndb_dd_client(THD *thd)
57     : m_thd(thd),
58       m_client(thd->dd_client()),
59       m_save_mdl_locks(thd->mdl_context.mdl_savepoint()) {
60   DBUG_TRACE;
61   disable_autocommit();
62 
63   // Create dictionary client auto releaser, stored as
64   // opaque pointer in order to avoid including all of
65   // Dictionary_client in the ndb_dd_client header file
66   m_auto_releaser =
67       (void *)new dd::cache::Dictionary_client::Auto_releaser(m_client);
68 }
69 
~Ndb_dd_client()70 Ndb_dd_client::~Ndb_dd_client() {
71   DBUG_TRACE;
72   // Automatically restore the option_bits in THD if they have
73   // been modified
74   if (m_save_option_bits) m_thd->variables.option_bits = m_save_option_bits;
75 
76   if (m_auto_rollback) {
77     // Automatically rollback unless commit has been called
78     if (!m_comitted) rollback();
79   }
80 
81   // Release MDL locks
82   mdl_locks_release();
83 
84   // Free the dictionary client auto releaser
85   dd::cache::Dictionary_client::Auto_releaser *ar =
86       (dd::cache::Dictionary_client::Auto_releaser *)m_auto_releaser;
87   delete ar;
88 }
89 
mdl_lock_table(const char * schema_name,const char * table_name)90 bool Ndb_dd_client::mdl_lock_table(const char *schema_name,
91                                    const char *table_name) {
92   MDL_request_list mdl_requests;
93   MDL_request schema_request;
94   MDL_request mdl_request;
95   MDL_REQUEST_INIT(&schema_request, MDL_key::SCHEMA, schema_name, "",
96                    MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
97   MDL_REQUEST_INIT(&mdl_request, MDL_key::TABLE, schema_name, table_name,
98                    MDL_SHARED, MDL_EXPLICIT);
99 
100   mdl_requests.push_front(&schema_request);
101   mdl_requests.push_front(&mdl_request);
102 
103   if (m_thd->mdl_context.acquire_locks(&mdl_requests,
104                                        m_thd->variables.lock_wait_timeout)) {
105     return false;
106   }
107 
108   // Remember tickets of the acquired mdl locks
109   m_acquired_mdl_tickets.push_back(schema_request.ticket);
110   m_acquired_mdl_tickets.push_back(mdl_request.ticket);
111 
112   return true;
113 }
114 
mdl_lock_schema_exclusive(const char * schema_name,bool custom_lock_wait,ulong lock_wait_timeout)115 bool Ndb_dd_client::mdl_lock_schema_exclusive(const char *schema_name,
116                                               bool custom_lock_wait,
117                                               ulong lock_wait_timeout) {
118   MDL_request_list mdl_requests;
119   MDL_request schema_request;
120   MDL_request backup_lock_request;
121   MDL_request grl_request;
122 
123   // If protection against GRL can't be acquired, err out early.
124   if (m_thd->global_read_lock.can_acquire_protection()) {
125     return false;
126   }
127 
128   MDL_REQUEST_INIT(&schema_request, MDL_key::SCHEMA, schema_name, "",
129                    MDL_EXCLUSIVE, MDL_EXPLICIT);
130   MDL_REQUEST_INIT(&backup_lock_request, MDL_key::BACKUP_LOCK, "", "",
131                    MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
132   MDL_REQUEST_INIT(&grl_request, MDL_key::GLOBAL, "", "",
133                    MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
134 
135   mdl_requests.push_front(&schema_request);
136   mdl_requests.push_front(&backup_lock_request);
137   mdl_requests.push_front(&grl_request);
138 
139   if (!custom_lock_wait) {
140     lock_wait_timeout = m_thd->variables.lock_wait_timeout;
141   }
142 
143   if (m_thd->mdl_context.acquire_locks(&mdl_requests, lock_wait_timeout)) {
144     return false;
145   }
146 
147   // Remember tickets of the acquired mdl locks
148   m_acquired_mdl_tickets.push_back(schema_request.ticket);
149   m_acquired_mdl_tickets.push_back(backup_lock_request.ticket);
150   m_acquired_mdl_tickets.push_back(grl_request.ticket);
151 
152   return true;
153 }
154 
mdl_lock_schema(const char * schema_name)155 bool Ndb_dd_client::mdl_lock_schema(const char *schema_name) {
156   MDL_request_list mdl_requests;
157   MDL_request schema_request;
158 
159   MDL_REQUEST_INIT(&schema_request, MDL_key::SCHEMA, schema_name, "",
160                    MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
161   mdl_requests.push_front(&schema_request);
162 
163   if (m_thd->mdl_context.acquire_locks(&mdl_requests,
164                                        m_thd->variables.lock_wait_timeout)) {
165     return false;
166   }
167 
168   /*
169     Now when we have protection against concurrent change of read_only
170     option we can safely re-check its value.
171   */
172   if (check_readonly(m_thd, true)) return false;
173 
174   // Remember ticket(s) of the acquired mdl lock
175   m_acquired_mdl_tickets.push_back(schema_request.ticket);
176 
177   return true;
178 }
179 
mdl_lock_logfile_group_exclusive(const char * logfile_group_name,bool custom_lock_wait,ulong lock_wait_timeout)180 bool Ndb_dd_client::mdl_lock_logfile_group_exclusive(
181     const char *logfile_group_name, bool custom_lock_wait,
182     ulong lock_wait_timeout) {
183   MDL_request_list mdl_requests;
184   MDL_request logfile_group_request;
185   MDL_request backup_lock_request;
186   MDL_request grl_request;
187 
188   // If protection against GRL can't be acquired, err out early.
189   if (m_thd->global_read_lock.can_acquire_protection()) {
190     return false;
191   }
192 
193   MDL_REQUEST_INIT(&logfile_group_request, MDL_key::TABLESPACE, "",
194                    logfile_group_name, MDL_EXCLUSIVE, MDL_EXPLICIT);
195   MDL_REQUEST_INIT(&backup_lock_request, MDL_key::BACKUP_LOCK, "", "",
196                    MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
197   MDL_REQUEST_INIT(&grl_request, MDL_key::GLOBAL, "", "",
198                    MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
199 
200   mdl_requests.push_front(&logfile_group_request);
201   mdl_requests.push_front(&backup_lock_request);
202   mdl_requests.push_front(&grl_request);
203 
204   if (!custom_lock_wait) {
205     lock_wait_timeout = m_thd->variables.lock_wait_timeout;
206   }
207 
208   if (m_thd->mdl_context.acquire_locks(&mdl_requests, lock_wait_timeout)) {
209     return false;
210   }
211 
212   /*
213     Now when we have protection against concurrent change of read_only
214     option we can safely re-check its value.
215   */
216   if (check_readonly(m_thd, true)) return false;
217 
218   // Remember tickets of the acquired mdl locks
219   m_acquired_mdl_tickets.push_back(logfile_group_request.ticket);
220   m_acquired_mdl_tickets.push_back(backup_lock_request.ticket);
221   m_acquired_mdl_tickets.push_back(grl_request.ticket);
222 
223   return true;
224 }
225 
mdl_lock_logfile_group(const char * logfile_group_name,bool intention_exclusive)226 bool Ndb_dd_client::mdl_lock_logfile_group(const char *logfile_group_name,
227                                            bool intention_exclusive) {
228   MDL_request_list mdl_requests;
229   MDL_request logfile_group_request;
230 
231   enum_mdl_type mdl_type =
232       intention_exclusive ? MDL_INTENTION_EXCLUSIVE : MDL_SHARED_READ;
233   MDL_REQUEST_INIT(&logfile_group_request, MDL_key::TABLESPACE, "",
234                    logfile_group_name, mdl_type, MDL_EXPLICIT);
235 
236   mdl_requests.push_front(&logfile_group_request);
237 
238   if (m_thd->mdl_context.acquire_locks(&mdl_requests,
239                                        m_thd->variables.lock_wait_timeout)) {
240     return false;
241   }
242 
243   // Remember tickets of the acquired mdl locks
244   m_acquired_mdl_tickets.push_back(logfile_group_request.ticket);
245 
246   return true;
247 }
248 
mdl_lock_tablespace_exclusive(const char * tablespace_name,bool custom_lock_wait,ulong lock_wait_timeout)249 bool Ndb_dd_client::mdl_lock_tablespace_exclusive(const char *tablespace_name,
250                                                   bool custom_lock_wait,
251                                                   ulong lock_wait_timeout) {
252   MDL_request_list mdl_requests;
253   MDL_request tablespace_request;
254   MDL_request backup_lock_request;
255   MDL_request grl_request;
256 
257   // If protection against GRL can't be acquired, err out early.
258   if (m_thd->global_read_lock.can_acquire_protection()) {
259     return false;
260   }
261 
262   MDL_REQUEST_INIT(&tablespace_request, MDL_key::TABLESPACE, "",
263                    tablespace_name, MDL_EXCLUSIVE, MDL_EXPLICIT);
264   MDL_REQUEST_INIT(&backup_lock_request, MDL_key::BACKUP_LOCK, "", "",
265                    MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
266   MDL_REQUEST_INIT(&grl_request, MDL_key::GLOBAL, "", "",
267                    MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
268 
269   mdl_requests.push_front(&tablespace_request);
270   mdl_requests.push_front(&backup_lock_request);
271   mdl_requests.push_front(&grl_request);
272 
273   if (!custom_lock_wait) {
274     lock_wait_timeout = m_thd->variables.lock_wait_timeout;
275   }
276 
277   if (m_thd->mdl_context.acquire_locks(&mdl_requests, lock_wait_timeout)) {
278     return false;
279   }
280 
281   /*
282     Now when we have protection against concurrent change of read_only
283     option we can safely re-check its value.
284   */
285   if (check_readonly(m_thd, true)) return false;
286 
287   // Remember tickets of the acquired mdl locks
288   m_acquired_mdl_tickets.push_back(tablespace_request.ticket);
289   m_acquired_mdl_tickets.push_back(backup_lock_request.ticket);
290   m_acquired_mdl_tickets.push_back(grl_request.ticket);
291 
292   return true;
293 }
294 
mdl_lock_tablespace(const char * tablespace_name,bool intention_exclusive)295 bool Ndb_dd_client::mdl_lock_tablespace(const char *tablespace_name,
296                                         bool intention_exclusive) {
297   MDL_request_list mdl_requests;
298   MDL_request tablespace_request;
299 
300   enum_mdl_type mdl_type =
301       intention_exclusive ? MDL_INTENTION_EXCLUSIVE : MDL_SHARED_READ;
302   MDL_REQUEST_INIT(&tablespace_request, MDL_key::TABLESPACE, "",
303                    tablespace_name, mdl_type, MDL_EXPLICIT);
304 
305   mdl_requests.push_front(&tablespace_request);
306 
307   if (m_thd->mdl_context.acquire_locks(&mdl_requests,
308                                        m_thd->variables.lock_wait_timeout)) {
309     return false;
310   }
311 
312   // Remember tickets of the acquired mdl locks
313   m_acquired_mdl_tickets.push_back(tablespace_request.ticket);
314 
315   return true;
316 }
317 
mdl_locks_acquire_exclusive(const char * schema_name,const char * table_name,bool custom_lock_wait,ulong lock_wait_timeout)318 bool Ndb_dd_client::mdl_locks_acquire_exclusive(const char *schema_name,
319                                                 const char *table_name,
320                                                 bool custom_lock_wait,
321                                                 ulong lock_wait_timeout) {
322   MDL_request_list mdl_requests;
323   MDL_request schema_request;
324   MDL_request mdl_request;
325   MDL_request backup_lock_request;
326   MDL_request grl_request;
327 
328   // If we cannot acquire protection against GRL, err out early.
329   if (m_thd->global_read_lock.can_acquire_protection()) return false;
330 
331   MDL_REQUEST_INIT(&schema_request, MDL_key::SCHEMA, schema_name, "",
332                    MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
333   MDL_REQUEST_INIT(&mdl_request, MDL_key::TABLE, schema_name, table_name,
334                    MDL_EXCLUSIVE, MDL_EXPLICIT);
335   MDL_REQUEST_INIT(&backup_lock_request, MDL_key::BACKUP_LOCK, "", "",
336                    MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
337   MDL_REQUEST_INIT(&grl_request, MDL_key::GLOBAL, "", "",
338                    MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
339 
340   mdl_requests.push_front(&schema_request);
341   mdl_requests.push_front(&mdl_request);
342   mdl_requests.push_front(&backup_lock_request);
343   mdl_requests.push_front(&grl_request);
344 
345   if (!custom_lock_wait) {
346     lock_wait_timeout = m_thd->variables.lock_wait_timeout;
347   }
348 
349   if (m_thd->mdl_context.acquire_locks(&mdl_requests, lock_wait_timeout)) {
350     return false;
351   }
352 
353   /*
354     Now when we have protection against concurrent change of read_only
355     option we can safely re-check its value.
356   */
357   if (check_readonly(m_thd, true)) return false;
358 
359   // Remember tickets of the acquired mdl locks
360   m_acquired_mdl_tickets.push_back(schema_request.ticket);
361   m_acquired_mdl_tickets.push_back(mdl_request.ticket);
362   m_acquired_mdl_tickets.push_back(backup_lock_request.ticket);
363   m_acquired_mdl_tickets.push_back(grl_request.ticket);
364 
365   return true;
366 }
367 
mdl_locks_release()368 void Ndb_dd_client::mdl_locks_release() {
369   // Release MDL locks acquired in EXPLICIT scope
370   for (MDL_ticket *ticket : m_acquired_mdl_tickets) {
371     m_thd->mdl_context.release_lock(ticket);
372   }
373   // Release new MDL locks acquired in TRANSACTIONAL and STATEMENT scope
374   m_thd->mdl_context.rollback_to_savepoint(m_save_mdl_locks);
375 }
376 
disable_autocommit()377 void Ndb_dd_client::disable_autocommit() {
378   /*
379     Implementation details from which storage the DD uses leaks out
380     and the user of these functions magically need to turn auto commit
381     off.
382 
383     I.e as in sql_table.cc, execute_ddl_log_recovery()
384      'Prevent InnoDB from automatically committing InnoDB transaction
385       each time data-dictionary tables are closed after being updated.'
386   */
387 
388   // Don't allow empty bits as zero is used as indicator
389   // to restore the saved bits
390   assert(m_thd->variables.option_bits);
391   m_save_option_bits = m_thd->variables.option_bits;
392 
393   m_thd->variables.option_bits &= ~OPTION_AUTOCOMMIT;
394   m_thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT;
395 }
396 
commit()397 void Ndb_dd_client::commit() {
398   trans_commit_stmt(m_thd);
399   trans_commit(m_thd);
400   m_comitted = true;
401 }
402 
rollback()403 void Ndb_dd_client::rollback() {
404   trans_rollback_stmt(m_thd);
405   trans_rollback(m_thd);
406 }
407 
get_engine(const char * schema_name,const char * table_name,dd::String_type * engine)408 bool Ndb_dd_client::get_engine(const char *schema_name, const char *table_name,
409                                dd::String_type *engine) {
410   const dd::Table *existing = nullptr;
411   if (m_client->acquire(schema_name, table_name, &existing)) {
412     return false;
413   }
414 
415   if (existing == nullptr) {
416     // Table does not exist in DD
417     return false;
418   }
419 
420   *engine = existing->engine();
421 
422   return true;
423 }
424 
rename_table(const char * old_schema_name,const char * old_table_name,const char * new_schema_name,const char * new_table_name,int new_table_id,int new_table_version,Ndb_referenced_tables_invalidator * invalidator)425 bool Ndb_dd_client::rename_table(
426     const char *old_schema_name, const char *old_table_name,
427     const char *new_schema_name, const char *new_table_name, int new_table_id,
428     int new_table_version, Ndb_referenced_tables_invalidator *invalidator) {
429   // Read new schema from DD
430   const dd::Schema *new_schema = nullptr;
431   if (m_client->acquire(new_schema_name, &new_schema)) {
432     return false;
433   }
434   if (new_schema == nullptr) {
435     // Database does not exist, unexpected
436     DBUG_ASSERT(false);
437     return false;
438   }
439 
440   // Read table from DD
441   dd::Table *to_table_def = nullptr;
442   if (m_client->acquire_for_modification(old_schema_name, old_table_name,
443                                          &to_table_def))
444     return false;
445 
446   if (invalidator != nullptr &&
447       !invalidator->fetch_referenced_tables_to_invalidate(
448           old_schema_name, old_table_name, to_table_def, true)) {
449     return false;
450   }
451 
452   // Collect and lock all the tables referencing this table,
453   // referenced by this table and the foreign key names.
454   // Note : This re-attempts to lock the referenced tables that were already
455   //        locked by fetch_referenced_tables_to_invalidate(). This will be
456   //        fixed when Bug#30500825 gets fixed.
457   if (collect_and_lock_fk_tables_for_rename_table(
458           m_thd, old_schema_name, old_table_name, to_table_def, new_schema_name,
459           new_table_name, ndbcluster_hton, nullptr)) {
460     return false;
461   }
462 
463   // Set schema id and table name
464   to_table_def->set_schema_id(new_schema->id());
465   to_table_def->set_name(new_table_name);
466 
467   ndb_dd_table_set_object_id_and_version(to_table_def, new_table_id,
468                                          new_table_version);
469 
470   // Rename foreign keys
471   if (dd::rename_foreign_keys(m_thd, old_schema_name, old_table_name,
472                               ndbcluster_hton, new_schema_name, to_table_def)) {
473     // Failed to rename foreign keys or commit/rollback, unexpected
474     DBUG_ASSERT(false);
475     return false;
476   }
477 
478   // Adjust parent table for self-referencing foreign keys.
479   dd::Table::Foreign_key_collection *foreign_keys =
480       to_table_def->foreign_keys();
481   for (dd::Foreign_key *fk : *foreign_keys) {
482     if (strcmp(fk->referenced_table_schema_name().c_str(), old_schema_name) ==
483             0 &&
484         strcmp(fk->referenced_table_name().c_str(), old_table_name) == 0) {
485       fk->set_referenced_table_schema_name(new_schema_name);
486       fk->set_referenced_table_name(new_table_name);
487     }
488   }
489 
490   // Save table in DD
491   if (m_client->update(to_table_def)) {
492     // Failed to save, unexpected
493     DBUG_ASSERT(false);
494     return false;
495   }
496 
497   // Update the foreign key information of tables referencing this table in DD
498   if (adjust_fks_for_rename_table(m_thd, old_schema_name, old_table_name,
499                                   new_schema_name, new_table_name,
500                                   ndbcluster_hton)) {
501     return false;
502   }
503 
504   return true;
505 }
506 
remove_table(const char * schema_name,const char * table_name,Ndb_referenced_tables_invalidator * invalidator)507 bool Ndb_dd_client::remove_table(const char *schema_name,
508                                  const char *table_name,
509                                  Ndb_referenced_tables_invalidator *invalidator)
510 
511 {
512   DBUG_TRACE;
513   DBUG_PRINT("enter",
514              ("schema_name: '%s', table_name: '%s'", schema_name, table_name));
515 
516   const dd::Table *existing = nullptr;
517   if (m_client->acquire(schema_name, table_name, &existing)) {
518     return false;
519   }
520 
521   if (existing == nullptr) {
522     // Table does not exist
523     return true;
524   }
525 
526   if (invalidator != nullptr &&
527       !invalidator->fetch_referenced_tables_to_invalidate(
528           schema_name, table_name, existing, true)) {
529     return false;
530   }
531 
532 #ifdef HAVE_PSI_SP_INTERFACE
533   // Remove statistics, table is not using trigger(s) anymore
534   remove_all_triggers_from_perfschema(schema_name, *existing);
535 #endif
536 
537   DBUG_PRINT("info", ("removing existing table"));
538   if (m_client->drop(existing)) {
539     // Failed to remove existing
540     DBUG_ASSERT(false);  // Catch in debug, unexpected error
541     return false;
542   }
543 
544   return true;
545 }
546 
deserialize_table(const dd::sdi_t & sdi,dd::Table * table_def)547 bool Ndb_dd_client::deserialize_table(const dd::sdi_t &sdi,
548                                       dd::Table *table_def) {
549   if (ndb_dd_sdi_deserialize(m_thd, sdi, table_def)) {
550     return false;
551   }
552   return true;
553 }
554 
store_table(dd::Table * install_table) const555 bool Ndb_dd_client::store_table(dd::Table *install_table) const {
556   DBUG_TRACE;
557 
558   if (!m_client->store(install_table)) {
559     return true;  // OK
560   }
561   return false;
562 }
563 
store_table(dd::Table * install_table,int ndb_table_id)564 bool Ndb_dd_client::store_table(dd::Table *install_table, int ndb_table_id) {
565   DBUG_TRACE;
566 
567   if (!m_client->store(install_table)) {
568     return true;  // OK
569   }
570 
571   DBUG_PRINT("error", ("Failed to store table, error: '%d %s'",
572                        m_thd->get_stmt_da()->mysql_errno(),
573                        m_thd->get_stmt_da()->message_text()));
574 
575   if (m_thd->get_stmt_da()->mysql_errno() == ER_DUP_ENTRY) {
576     // Try to handle the failure which may occur when the DD already
577     // have a table definition from an old NDB table which used the
578     // same table id but with a different name.
579     // This may happen when the MySQL Server reconnects to the cluster
580     // and synchronizes its DD with NDB dictionary. Of course it indicates
581     // that the DD is out of synch with the dictionary in NDB but that's
582     // normal when the MySQL Server haven't taken part in DDL operations.
583     // And as usual NDB is the master for all NDB tables.
584 
585     // Remove the current ER_DUP_ENTRY error, subsequent failures
586     // will set a new error
587     m_thd->clear_error();
588 
589     // Find old table using the NDB tables id
590     dd::Table *old_table_def;
591     if (m_client->acquire_uncached_table_by_se_private_id(
592             "ndbcluster", ndb_table_id, &old_table_def)) {
593       // There was no old table
594       return false;
595     }
596 
597     // Double check that old table is in NDB
598     if (old_table_def->engine() != "ndbcluster") {
599       DBUG_ASSERT(false);
600       return false;
601     }
602 
603     // Lookup schema name of old table
604     dd::Schema *old_schema;
605     if (m_client->acquire_uncached(old_table_def->schema_id(), &old_schema)) {
606       return false;
607     }
608 
609     if (old_schema == nullptr) {
610       DBUG_ASSERT(false);  // Database does not exist
611       return false;
612     }
613 
614     const char *old_schema_name = old_schema->name().c_str();
615     const char *old_table_name = old_table_def->name().c_str();
616     DBUG_PRINT("info", ("Found old table '%s.%s', will try to remove it",
617                         old_schema_name, old_table_name));
618 
619     // Take exclusive locks on old table
620     if (!mdl_locks_acquire_exclusive(old_schema_name, old_table_name)) {
621       // Failed to MDL lock old table
622       return false;
623     }
624 
625     if (!remove_table(old_schema_name, old_table_name)) {
626       // Failed to remove old table from DD
627       return false;
628     }
629 
630     // Try to store the new table again
631     if (m_client->store(install_table)) {
632       DBUG_PRINT("error", ("Failed to store table, error: '%d %s'",
633                            m_thd->get_stmt_da()->mysql_errno(),
634                            m_thd->get_stmt_da()->message_text()));
635       return false;
636     }
637 
638     // Removed old table and stored the new, return OK
639     DBUG_ASSERT(!m_thd->is_error());
640     return true;
641   }
642 
643   return false;
644 }
645 
install_table(const char * schema_name,const char * table_name,const dd::sdi_t & sdi,int ndb_table_id,int ndb_table_version,size_t ndb_num_partitions,const std::string & tablespace_name,bool force_overwrite,Ndb_referenced_tables_invalidator * invalidator)646 bool Ndb_dd_client::install_table(
647     const char *schema_name, const char *table_name, const dd::sdi_t &sdi,
648     int ndb_table_id, int ndb_table_version, size_t ndb_num_partitions,
649     const std::string &tablespace_name, bool force_overwrite,
650     Ndb_referenced_tables_invalidator *invalidator) {
651   const dd::Schema *schema = nullptr;
652 
653   if (m_client->acquire(schema_name, &schema)) {
654     return false;
655   }
656   if (schema == nullptr) {
657     DBUG_ASSERT(false);  // Database does not exist
658     return false;
659   }
660 
661   std::unique_ptr<dd::Table> install_table{dd::create_object<dd::Table>()};
662   if (ndb_dd_sdi_deserialize(m_thd, sdi, install_table.get())) {
663     return false;
664   }
665 
666   // Verify that table_name in the unpacked table definition
667   // matches the table name to install
668   DBUG_ASSERT(ndb_dd_fs_name_case(install_table->name()) == table_name);
669 
670   // Verify that table defintion unpacked from NDB
671   // does not have any se_private fields set, those will be set
672   // from the NDB table metadata
673   DBUG_ASSERT(install_table->se_private_id() == dd::INVALID_OBJECT_ID);
674   DBUG_ASSERT(install_table->se_private_data().raw_string() == "");
675 
676   // Assign the id of the schema to the table_object
677   install_table->set_schema_id(schema->id());
678 
679   // Assign NDB id and version of the table
680   ndb_dd_table_set_object_id_and_version(install_table.get(), ndb_table_id,
681                                          ndb_table_version);
682 
683   // Check if the DD table object has the correct number of partitions.
684   // Correct the number of partitions in the DD table object in case of
685   // a mismatch
686   const bool check_partition_count_result = ndb_dd_table_check_partition_count(
687       install_table.get(), ndb_num_partitions);
688   if (!check_partition_count_result) {
689     ndb_dd_table_fix_partition_count(install_table.get(), ndb_num_partitions);
690   }
691 
692   // Set the tablespace id if applicable
693   if (!tablespace_name.empty()) {
694     dd::Object_id tablespace_id;
695     if (!lookup_tablespace_id(tablespace_name.c_str(), &tablespace_id)) {
696       return false;
697     }
698     ndb_dd_table_set_tablespace_id(install_table.get(), tablespace_id);
699   }
700 
701   const dd::Table *existing = nullptr;
702   if (m_client->acquire(schema_name, table_name, &existing)) {
703     return false;
704   }
705 
706   if (invalidator != nullptr &&
707       !invalidator->fetch_referenced_tables_to_invalidate(
708           schema_name, table_name, existing)) {
709     return false;
710   }
711 
712   if (existing != nullptr) {
713     // Get id and version of existing table
714     int object_id, object_version;
715     if (!ndb_dd_table_get_object_id_and_version(existing, object_id,
716                                                 object_version)) {
717       DBUG_PRINT("error", ("Could not extract object_id and object_version "
718                            "from table definition"));
719       DBUG_ASSERT(false);
720       return false;
721     }
722 
723     // Check that id and version of the existing table in DD
724     // matches NDB, otherwise it's a programming error
725     // not to request "force_overwrite"
726     if (ndb_table_id == object_id && ndb_table_version == object_version) {
727       // Table is already installed, with same id and version
728       // return sucess
729       return true;
730     }
731 
732     // Table already exists
733     if (!force_overwrite) {
734       // Don't overwrite existing table
735       DBUG_ASSERT(false);
736       return false;
737     }
738 
739     // Continue and remove the old table before
740     // installing the new
741     DBUG_PRINT("info", ("dropping existing table"));
742     if (m_client->drop(existing)) {
743       // Failed to drop existing
744       DBUG_ASSERT(false);  // Catch in debug, unexpected error
745       return false;
746     }
747   }
748 
749   if (!store_table(install_table.get(), ndb_table_id)) {
750     ndb_log_error("Failed to store table: '%s.%s'", schema_name, table_name);
751     ndb_log_error_dump("sdi for new table: %s",
752                        ndb_dd_sdi_prettify(sdi).c_str());
753     if (existing) {
754       const dd::sdi_t existing_sdi =
755           ndb_dd_sdi_serialize(m_thd, *existing, dd::String_type(schema_name));
756       ndb_log_error_dump("sdi for existing table: %s",
757                          ndb_dd_sdi_prettify(existing_sdi).c_str());
758     }
759     DBUG_ABORT();
760     return false;
761   }
762 
763   return true;  // OK
764 }
765 
migrate_table(const char * schema_name,const char * table_name,const unsigned char * frm_data,unsigned int unpacked_len,bool force_overwrite)766 bool Ndb_dd_client::migrate_table(const char *schema_name,
767                                   const char *table_name,
768                                   const unsigned char *frm_data,
769                                   unsigned int unpacked_len,
770                                   bool force_overwrite) {
771   if (force_overwrite) {
772     // Remove the old table before migrating
773     DBUG_PRINT("info", ("dropping existing table"));
774     if (!remove_table(schema_name, table_name)) {
775       return false;
776     }
777 
778     commit();
779   }
780 
781   const bool migrate_result = dd::ndb_upgrade::migrate_table_to_dd(
782       m_thd, this, schema_name, table_name, frm_data, unpacked_len);
783 
784   return migrate_result;
785 }
786 
get_table(const char * schema_name,const char * table_name,const dd::Table ** table_def)787 bool Ndb_dd_client::get_table(const char *schema_name, const char *table_name,
788                               const dd::Table **table_def) {
789   if (m_client->acquire(schema_name, table_name, table_def)) {
790     my_error(ER_NO_SUCH_TABLE, MYF(0), schema_name, table_name);
791     return false;
792   }
793   return true;
794 }
795 
table_exists(const char * schema_name,const char * table_name,bool & exists)796 bool Ndb_dd_client::table_exists(const char *schema_name,
797                                  const char *table_name, bool &exists) {
798   const dd::Table *table;
799   if (m_client->acquire(schema_name, table_name, &table)) {
800     // Failed to acquire the requested table
801     return false;
802   }
803 
804   if (table == nullptr) {
805     // The table doesn't exist
806     exists = false;
807     return true;
808   }
809 
810   // The table exists
811   exists = true;
812   return true;
813 }
814 
set_tablespace_id_in_table(const char * schema_name,const char * table_name,dd::Object_id tablespace_id)815 bool Ndb_dd_client::set_tablespace_id_in_table(const char *schema_name,
816                                                const char *table_name,
817                                                dd::Object_id tablespace_id) {
818   dd::Table *table_def = nullptr;
819   if (m_client->acquire_for_modification(schema_name, table_name, &table_def)) {
820     return false;
821   }
822   if (table_def == nullptr) {
823     DBUG_ASSERT(false);
824     return false;
825   }
826 
827   ndb_dd_table_set_tablespace_id(table_def, tablespace_id);
828 
829   if (m_client->update(table_def)) {
830     return false;
831   }
832   return true;
833 }
834 
set_object_id_and_version_in_table(const char * schema_name,const char * table_name,int object_id,int object_version)835 bool Ndb_dd_client::set_object_id_and_version_in_table(const char *schema_name,
836                                                        const char *table_name,
837                                                        int object_id,
838                                                        int object_version) {
839   DBUG_TRACE;
840 
841   /* Acquire the table */
842   dd::Table *table_def = nullptr;
843   if (m_client->acquire_for_modification(schema_name, table_name, &table_def)) {
844     DBUG_PRINT("error", ("Failed to load the table from DD"));
845     return false;
846   }
847 
848   /* Update id and version */
849   ndb_dd_table_set_object_id_and_version(table_def, object_id, object_version);
850 
851   /* Update it to DD */
852   if (m_client->update(table_def)) {
853     return false;
854   }
855 
856   return true;
857 }
858 
fetch_all_schemas(std::map<std::string,const dd::Schema * > & schemas)859 bool Ndb_dd_client::fetch_all_schemas(
860     std::map<std::string, const dd::Schema *> &schemas) {
861   DBUG_TRACE;
862 
863   std::vector<const dd::Schema *> schemas_list;
864   if (m_client->fetch_global_components(&schemas_list)) {
865     DBUG_PRINT("error", ("Failed to fetch all schemas"));
866     return false;
867   }
868 
869   for (const dd::Schema *schema : schemas_list) {
870     // Convert the schema name to lower case on platforms that have
871     // lower_case_table_names set to 2
872     const std::string schema_name = ndb_dd_fs_name_case(schema->name());
873     schemas.insert(std::make_pair(schema_name.c_str(), schema));
874   }
875   return true;
876 }
877 
fetch_schema_names(std::vector<std::string> * names)878 bool Ndb_dd_client::fetch_schema_names(std::vector<std::string> *names) {
879   DBUG_TRACE;
880 
881   std::vector<const dd::Schema *> schemas;
882   if (m_client->fetch_global_components(&schemas)) {
883     return false;
884   }
885 
886   for (const dd::Schema *schema : schemas) {
887     // Convert the schema name to lower case on platforms that have
888     // lower_case_table_names set to 2
889     const std::string schema_name = ndb_dd_fs_name_case(schema->name());
890     names->push_back(schema_name.c_str());
891   }
892   return true;
893 }
894 
get_ndb_table_names_in_schema(const char * schema_name,std::unordered_set<std::string> * names)895 bool Ndb_dd_client::get_ndb_table_names_in_schema(
896     const char *schema_name, std::unordered_set<std::string> *names) {
897   DBUG_TRACE;
898 
899   const dd::Schema *schema;
900   if (m_client->acquire(schema_name, &schema)) {
901     // Failed to open the requested Schema object
902     return false;
903   }
904 
905   if (schema == nullptr) {
906     // Database does not exist
907     return false;
908   }
909 
910   std::vector<dd::String_type> table_names;
911   if (m_client->fetch_schema_table_names_by_engine(schema, "ndbcluster",
912                                                    &table_names)) {
913     return false;
914   }
915 
916   for (const auto &name : table_names) {
917     if (!mdl_lock_table(schema_name, name.c_str())) {
918       // Failed to MDL lock table
919       return false;
920     }
921 
922     // Convert the table name to lower case on platforms that have
923     // lower_case_table_names set to 2
924     const std::string table_name = ndb_dd_fs_name_case(name);
925     names->insert(table_name);
926   }
927   return true;
928 }
929 
get_table_names_in_schema(const char * schema_name,std::unordered_set<std::string> * ndb_tables,std::unordered_set<std::string> * local_tables)930 bool Ndb_dd_client::get_table_names_in_schema(
931     const char *schema_name, std::unordered_set<std::string> *ndb_tables,
932     std::unordered_set<std::string> *local_tables) {
933   DBUG_TRACE;
934 
935   const dd::Schema *schema;
936   if (m_client->acquire(schema_name, &schema)) {
937     // Failed to open the requested Schema object
938     return false;
939   }
940 
941   if (schema == nullptr) {
942     // Database does not exist
943     return false;
944   }
945 
946   // Fetch NDB table names
947   std::vector<dd::String_type> ndb_table_names;
948   if (m_client->fetch_schema_table_names_by_engine(schema, "ndbcluster",
949                                                    &ndb_table_names)) {
950     return false;
951   }
952   for (const auto &name : ndb_table_names) {
953     // Lock the table in DD
954     if (!mdl_lock_table(schema_name, name.c_str())) {
955       // Failed to acquire MDL
956       return false;
957     }
958     // Convert the table name to lower case on platforms that have
959     // lower_case_table_names set to 2
960     const std::string table_name = ndb_dd_fs_name_case(name);
961     ndb_tables->insert(table_name);
962   }
963 
964   // Fetch all table names
965   std::vector<dd::String_type> all_table_names;
966   if (m_client->fetch_schema_table_names_not_hidden_by_se(schema,
967                                                           &all_table_names)) {
968     return false;
969   }
970   for (const auto &name : all_table_names) {
971     // Convert the table name to lower case on platforms that have
972     // lower_case_table_names set to 2
973     const std::string table_name = ndb_dd_fs_name_case(name);
974     if (ndb_tables->find(table_name) != ndb_tables->end()) {
975       // Skip NDB table
976       continue;
977     }
978     // Lock the table in DD
979     if (!mdl_lock_table(schema_name, name.c_str())) {
980       // Failed to acquire MDL
981       return false;
982     }
983     local_tables->insert(table_name);
984   }
985   return true;
986 }
987 
988 /*
989   Check given schema for local tables(i.e not in NDB)
990 
991   @param        schema_name          Name of the schema to check for tables
992   @param [out]  found_local_tables   Return parameter indicating if the schema
993                                      contained local tables or not.
994 
995   @return       false  Failure
996   @return       true   Success.
997 */
998 
have_local_tables_in_schema(const char * schema_name,bool * found_local_tables)999 bool Ndb_dd_client::have_local_tables_in_schema(const char *schema_name,
1000                                                 bool *found_local_tables) {
1001   DBUG_TRACE;
1002 
1003   const dd::Schema *schema;
1004   if (m_client->acquire(schema_name, &schema)) {
1005     // Failed to open the requested schema
1006     return false;
1007   }
1008 
1009   if (schema == nullptr) {
1010     // The schema didn't exist, thus it can't have any local tables
1011     *found_local_tables = false;
1012     return true;
1013   }
1014 
1015   // Fetch all table names
1016   std::vector<dd::String_type> all_table_names;
1017   if (m_client->fetch_schema_table_names_not_hidden_by_se(schema,
1018                                                           &all_table_names)) {
1019     return false;
1020   }
1021   // Fetch NDB table names
1022   std::vector<dd::String_type> ndb_table_names;
1023   if (m_client->fetch_schema_table_names_by_engine(schema, "ndbcluster",
1024                                                    &ndb_table_names)) {
1025     return false;
1026   }
1027 
1028   *found_local_tables = all_table_names.size() > ndb_table_names.size();
1029 
1030   return true;
1031 }
1032 
is_local_table(const char * schema_name,const char * table_name,bool & local_table)1033 bool Ndb_dd_client::is_local_table(const char *schema_name,
1034                                    const char *table_name, bool &local_table) {
1035   const dd::Table *table;
1036   if (m_client->acquire(schema_name, table_name, &table)) {
1037     // Failed to acquire the requested table
1038     return false;
1039   }
1040   if (table == nullptr) {
1041     // The table doesn't exist
1042     DBUG_ASSERT(false);
1043     return false;
1044   }
1045   local_table = table->engine() != "ndbcluster";
1046   return true;
1047 }
1048 
get_schema(const char * schema_name,const dd::Schema ** schema_def) const1049 bool Ndb_dd_client::get_schema(const char *schema_name,
1050                                const dd::Schema **schema_def) const {
1051   if (m_client->acquire(schema_name, schema_def)) {
1052     // Error is reported by the dictionary subsystem.
1053     return false;
1054   }
1055   return true;
1056 }
1057 
schema_exists(const char * schema_name,bool * schema_exists)1058 bool Ndb_dd_client::schema_exists(const char *schema_name,
1059                                   bool *schema_exists) {
1060   DBUG_TRACE;
1061 
1062   const dd::Schema *schema;
1063   if (m_client->acquire(schema_name, &schema)) {
1064     // Failed to open the requested schema
1065     return false;
1066   }
1067 
1068   if (schema == nullptr) {
1069     // The schema didn't exist
1070     *schema_exists = false;
1071     return true;
1072   }
1073 
1074   // The schema exists
1075   *schema_exists = true;
1076   return true;
1077 }
1078 
update_schema_version(const char * schema_name,unsigned int counter,unsigned int node_id)1079 bool Ndb_dd_client::update_schema_version(const char *schema_name,
1080                                           unsigned int counter,
1081                                           unsigned int node_id) {
1082   DBUG_TRACE;
1083   DBUG_PRINT("enter", ("Schema : %s, counter : %u, node_id : %u", schema_name,
1084                        counter, node_id));
1085 
1086   DBUG_ASSERT(m_thd->mdl_context.owns_equal_or_stronger_lock(
1087       MDL_key::SCHEMA, schema_name, "", MDL_EXCLUSIVE));
1088 
1089   dd::Schema *schema;
1090 
1091   if (m_client->acquire_for_modification(schema_name, &schema) ||
1092       schema == nullptr) {
1093     DBUG_PRINT("error", ("Failed to fetch the Schema object"));
1094     return false;
1095   }
1096 
1097   // Set the values
1098   ndb_dd_schema_set_counter_and_nodeid(schema, counter, node_id);
1099 
1100   // Update Schema in DD
1101   if (m_client->update(schema)) {
1102     DBUG_PRINT("error", ("Failed to update the Schema in DD"));
1103     return false;
1104   }
1105 
1106   return true;
1107 }
1108 
lookup_tablespace_id(const char * tablespace_name,dd::Object_id * tablespace_id)1109 bool Ndb_dd_client::lookup_tablespace_id(const char *tablespace_name,
1110                                          dd::Object_id *tablespace_id) {
1111   DBUG_TRACE;
1112   DBUG_PRINT("enter", ("tablespace_name: %s", tablespace_name));
1113 
1114   DBUG_ASSERT(m_thd->mdl_context.owns_equal_or_stronger_lock(
1115       MDL_key::TABLESPACE, "", tablespace_name, MDL_INTENTION_EXCLUSIVE));
1116 
1117   // Acquire tablespace.
1118   const dd::Tablespace *ts_obj = NULL;
1119   if (m_client->acquire(tablespace_name, &ts_obj)) {
1120     // acquire() always fails with an error being reported.
1121     return false;
1122   }
1123 
1124   if (!ts_obj) {
1125     my_error(ER_TABLESPACE_MISSING_WITH_NAME, MYF(0), tablespace_name);
1126     return false;
1127   }
1128 
1129   *tablespace_id = ts_obj->id();
1130   DBUG_PRINT("exit", ("tablespace_id: %llu", *tablespace_id));
1131 
1132   return true;
1133 }
1134 
get_tablespace(const char * tablespace_name,const dd::Tablespace ** tablespace_def)1135 bool Ndb_dd_client::get_tablespace(const char *tablespace_name,
1136                                    const dd::Tablespace **tablespace_def) {
1137   if (m_client->acquire(tablespace_name, tablespace_def)) {
1138     return false;
1139   }
1140   return true;
1141 }
1142 
tablespace_exists(const char * tablespace_name,bool & exists)1143 bool Ndb_dd_client::tablespace_exists(const char *tablespace_name,
1144                                       bool &exists) {
1145   const dd::Tablespace *tablespace;
1146   if (m_client->acquire(tablespace_name, &tablespace)) {
1147     // Failed to acquire the requested tablespace
1148     return false;
1149   }
1150 
1151   if (tablespace == nullptr) {
1152     // The tablespace doesn't exist
1153     exists = false;
1154     return true;
1155   }
1156 
1157   // The tablespace exists
1158   exists = true;
1159   return true;
1160 }
1161 
fetch_ndb_tablespace_names(std::unordered_set<std::string> & names)1162 bool Ndb_dd_client::fetch_ndb_tablespace_names(
1163     std::unordered_set<std::string> &names) {
1164   DBUG_TRACE;
1165 
1166   std::vector<const dd::Tablespace *> tablespaces;
1167   if (m_client->fetch_global_components<dd::Tablespace>(&tablespaces)) {
1168     return false;
1169   }
1170 
1171   for (const dd::Tablespace *tablespace : tablespaces) {
1172     if (tablespace->engine() != "ndbcluster") {
1173       // Skip non-NDB objects
1174       continue;
1175     }
1176 
1177     // Find out type of object
1178     object_type type;
1179 
1180     ndb_dd_disk_data_get_object_type(tablespace->se_private_data(), type);
1181 
1182     if (type != object_type::TABLESPACE) {
1183       // Skip logfile groups
1184       continue;
1185     }
1186 
1187     // Acquire lock in DD
1188     if (!mdl_lock_tablespace(tablespace->name().c_str(),
1189                              false /* intention_exclusive */)) {
1190       // Failed to acquire MDL lock
1191       return false;
1192     }
1193 
1194     names.insert(tablespace->name().c_str());
1195   }
1196   return true;
1197 }
1198 
install_tablespace(const char * tablespace_name,const std::vector<std::string> & data_file_names,int tablespace_id,int tablespace_version,bool force_overwrite)1199 bool Ndb_dd_client::install_tablespace(
1200     const char *tablespace_name,
1201     const std::vector<std::string> &data_file_names, int tablespace_id,
1202     int tablespace_version, bool force_overwrite) {
1203   DBUG_TRACE;
1204 
1205   bool exists;
1206   if (!tablespace_exists(tablespace_name, exists)) {
1207     // Could not detect if the tablespace exists or not
1208     return false;
1209   }
1210 
1211   if (exists) {
1212     if (force_overwrite) {
1213       if (!drop_tablespace(tablespace_name)) {
1214         // Failed to drop tablespace
1215         return false;
1216       }
1217     } else {
1218       // Error since tablespace exists but force_overwrite not set by caller
1219       // No point continuing since the subsequent store() will fail
1220       return false;
1221     }
1222   }
1223 
1224   std::unique_ptr<dd::Tablespace> tablespace(
1225       dd::create_object<dd::Tablespace>());
1226 
1227   // Set name
1228   tablespace->set_name(tablespace_name);
1229 
1230   // Engine type
1231   tablespace->set_engine("ndbcluster");
1232 
1233   // Add data files
1234   for (const auto &data_file_name : data_file_names) {
1235     ndb_dd_disk_data_add_file(tablespace.get(), data_file_name.c_str());
1236   }
1237 
1238   // Assign id and version
1239   ndb_dd_disk_data_set_object_id_and_version(tablespace.get(), tablespace_id,
1240                                              tablespace_version);
1241 
1242   // Assign object type as tablespace
1243   ndb_dd_disk_data_set_object_type(tablespace.get()->se_private_data(),
1244                                    object_type::TABLESPACE);
1245 
1246   // Write changes to dictionary.
1247   if (m_client->store(tablespace.get())) {
1248     return false;
1249   }
1250 
1251   return true;
1252 }
1253 
drop_tablespace(const char * tablespace_name,bool fail_if_not_exists)1254 bool Ndb_dd_client::drop_tablespace(const char *tablespace_name,
1255                                     bool fail_if_not_exists)
1256 
1257 {
1258   DBUG_TRACE;
1259 
1260   const dd::Tablespace *existing = nullptr;
1261   if (m_client->acquire(tablespace_name, &existing)) {
1262     return false;
1263   }
1264 
1265   if (existing == nullptr) {
1266     // Tablespace does not exist
1267     if (fail_if_not_exists) {
1268       return false;
1269     }
1270     return true;
1271   }
1272 
1273   if (m_client->drop(existing)) {
1274     return false;
1275   }
1276 
1277   return true;
1278 }
1279 
get_logfile_group(const char * logfile_group_name,const dd::Tablespace ** logfile_group_def)1280 bool Ndb_dd_client::get_logfile_group(
1281     const char *logfile_group_name, const dd::Tablespace **logfile_group_def) {
1282   if (m_client->acquire(logfile_group_name, logfile_group_def)) {
1283     return false;
1284   }
1285   return true;
1286 }
1287 
logfile_group_exists(const char * logfile_group_name,bool & exists)1288 bool Ndb_dd_client::logfile_group_exists(const char *logfile_group_name,
1289                                          bool &exists) {
1290   const dd::Tablespace *logfile_group;
1291   if (m_client->acquire(logfile_group_name, &logfile_group)) {
1292     // Failed to acquire the requested logfile group
1293     return false;
1294   }
1295 
1296   if (logfile_group == nullptr) {
1297     // The logfile group doesn't exist
1298     exists = false;
1299     return true;
1300   }
1301 
1302   // The logfile group exists
1303   exists = true;
1304   return true;
1305 }
1306 
fetch_ndb_logfile_group_names(std::unordered_set<std::string> & names)1307 bool Ndb_dd_client::fetch_ndb_logfile_group_names(
1308     std::unordered_set<std::string> &names) {
1309   DBUG_TRACE;
1310 
1311   std::vector<const dd::Tablespace *> tablespaces;
1312   if (m_client->fetch_global_components<dd::Tablespace>(&tablespaces)) {
1313     return false;
1314   }
1315 
1316   for (const dd::Tablespace *tablespace : tablespaces) {
1317     if (tablespace->engine() != "ndbcluster") {
1318       // Skip non-NDB objects
1319       continue;
1320     }
1321 
1322     // Find out type of object
1323     object_type type;
1324 
1325     ndb_dd_disk_data_get_object_type(tablespace->se_private_data(), type);
1326 
1327     if (type != object_type::LOGFILE_GROUP) {
1328       // Skip tablespaces
1329       continue;
1330     }
1331 
1332     // Acquire lock in DD
1333     if (!mdl_lock_logfile_group(tablespace->name().c_str(),
1334                                 false /* intention_exclusive */)) {
1335       // Failed to acquire MDL lock
1336       return false;
1337     }
1338 
1339     names.insert(tablespace->name().c_str());
1340   }
1341   return true;
1342 }
1343 
install_logfile_group(const char * logfile_group_name,const std::vector<std::string> & undo_file_names,int logfile_group_id,int logfile_group_version,bool force_overwrite)1344 bool Ndb_dd_client::install_logfile_group(
1345     const char *logfile_group_name,
1346     const std::vector<std::string> &undo_file_names, int logfile_group_id,
1347     int logfile_group_version, bool force_overwrite) {
1348   DBUG_TRACE;
1349 
1350   /*
1351    * Logfile groups are stored as tablespaces in the DD.
1352    * This is acceptable since the only reason for storing
1353    * them in the DD is to ensure that INFORMATION_SCHEMA
1354    * is aware of their presence. Thus, rather than
1355    * extending DD, we use tablespaces since they resemble
1356    * logfile groups in terms of metadata structure
1357    */
1358 
1359   bool exists;
1360   if (!logfile_group_exists(logfile_group_name, exists)) {
1361     // Could not detect if the logfile group exists or not
1362     return false;
1363   }
1364 
1365   if (exists) {
1366     if (force_overwrite) {
1367       if (!drop_logfile_group(logfile_group_name)) {
1368         // Failed to drop logfile group
1369         return false;
1370       }
1371     } else {
1372       // Error since logfile group exists but force_overwrite not set to true by
1373       // caller. No point continuing since the subsequent store() will fail
1374       return false;
1375     }
1376   }
1377 
1378   std::unique_ptr<dd::Tablespace> logfile_group(
1379       dd::create_object<dd::Tablespace>());
1380 
1381   // Set name
1382   logfile_group->set_name(logfile_group_name);
1383 
1384   // Engine type
1385   logfile_group->set_engine("ndbcluster");
1386 
1387   // Add undofiles
1388   for (const auto &undo_file_name : undo_file_names) {
1389     ndb_dd_disk_data_add_file(logfile_group.get(), undo_file_name.c_str());
1390   }
1391 
1392   // Assign id and version
1393   ndb_dd_disk_data_set_object_id_and_version(
1394       logfile_group.get(), logfile_group_id, logfile_group_version);
1395 
1396   // Assign object type as logfile group
1397   ndb_dd_disk_data_set_object_type(logfile_group.get()->se_private_data(),
1398                                    object_type::LOGFILE_GROUP);
1399 
1400   // Write changes to dictionary.
1401   if (m_client->store(logfile_group.get())) {
1402     return false;
1403   }
1404 
1405   return true;
1406 }
1407 
install_undo_file(const char * logfile_group_name,const char * undo_file_name)1408 bool Ndb_dd_client::install_undo_file(const char *logfile_group_name,
1409                                       const char *undo_file_name) {
1410   DBUG_TRACE;
1411 
1412   // Read logfile group from DD
1413   dd::Tablespace *new_logfile_group_def = nullptr;
1414   if (m_client->acquire_for_modification(logfile_group_name,
1415                                          &new_logfile_group_def))
1416     return false;
1417 
1418   if (!new_logfile_group_def) return false;
1419 
1420   ndb_dd_disk_data_add_file(new_logfile_group_def, undo_file_name);
1421 
1422   // Write changes to dictionary.
1423   if (m_client->update(new_logfile_group_def)) {
1424     return false;
1425   }
1426 
1427   return true;
1428 }
1429 
drop_logfile_group(const char * logfile_group_name,bool fail_if_not_exists)1430 bool Ndb_dd_client::drop_logfile_group(const char *logfile_group_name,
1431                                        bool fail_if_not_exists) {
1432   DBUG_TRACE;
1433 
1434   /*
1435    * Logfile groups are stored as tablespaces in the DD.
1436    * This is acceptable since the only reason for storing
1437    * them in the DD is to ensure that INFORMATION_SCHEMA
1438    * is aware of their presence. Thus, rather than
1439    * extending DD, we use tablespaces since they resemble
1440    * logfile groups in terms of metadata structure
1441    */
1442 
1443   const dd::Tablespace *existing = nullptr;
1444   if (m_client->acquire(logfile_group_name, &existing)) {
1445     return false;
1446   }
1447 
1448   if (existing == nullptr) {
1449     // Logfile group does not exist
1450     if (fail_if_not_exists) {
1451       return false;
1452     }
1453     return true;
1454   }
1455 
1456   if (m_client->drop(existing)) {
1457     return false;
1458   }
1459 
1460   return true;
1461 }
1462 
get_schema_uuid(dd::String_type * value) const1463 bool Ndb_dd_client::get_schema_uuid(dd::String_type *value) const {
1464   DBUG_TRACE;
1465 
1466   // Schema UUID will be stored in ndb_schema table definition in DD
1467   const dd::Table *table = nullptr;
1468   if (m_client->acquire(Ndb_schema_dist_table::DB_NAME.c_str(),
1469                         Ndb_schema_dist_table::TABLE_NAME.c_str(), &table)) {
1470     DBUG_ASSERT(false);
1471     return false;
1472   }
1473 
1474   if (table == nullptr) {
1475     // Table doesn't exists. This is OK as it might happen
1476     // if the function is called before ndb_schema is created.
1477     return true;
1478   }
1479 
1480   if (!ndb_dd_table_get_schema_uuid(table, value)) {
1481     // Table has invalid Schema UUID
1482     return false;
1483   }
1484 
1485   return true;
1486 }
1487 
update_schema_uuid(const char * value) const1488 bool Ndb_dd_client::update_schema_uuid(const char *value) const {
1489   DBUG_TRACE;
1490 
1491   // Schema UUID should be updated in ndb_schema table definition in DD
1492   dd::Table *table = nullptr;
1493   if (m_client->acquire_for_modification(
1494           Ndb_schema_dist_table::DB_NAME.c_str(),
1495           Ndb_schema_dist_table::TABLE_NAME.c_str(), &table)) {
1496     DBUG_ASSERT(false);
1497     return false;
1498   }
1499 
1500   if (table == nullptr) {
1501     // Table does not exist in DD
1502     DBUG_ASSERT(false);
1503     return false;
1504   }
1505 
1506   // Set the schema uuid value to the table object
1507   ndb_dd_table_set_schema_uuid(table, value);
1508 
1509   if (m_client->update(table)) {
1510     DBUG_ASSERT(false);
1511     return false;
1512   }
1513 
1514   return true;
1515 }
1516 
1517 /**
1518   Lock and add the given referenced table to the set of
1519   referenced tables maintained by the invalidator.
1520 
1521   @param schema_name  Schema name of the table.
1522   @param table_name   Name of the table.
1523 
1524   @return true        On success.
1525   @return false       Unable to lock the table to the list.
1526 */
add_and_lock_referenced_table(const char * schema_name,const char * table_name)1527 bool Ndb_referenced_tables_invalidator::add_and_lock_referenced_table(
1528     const char *schema_name, const char *table_name) {
1529   auto result =
1530       m_referenced_tables.insert(std::make_pair(schema_name, table_name));
1531   if (result.second) {
1532     // New parent added to invalidator. Lock it down
1533     DBUG_PRINT("info", ("Locking '%s.%s'", schema_name, table_name));
1534     if (!m_dd_client.mdl_locks_acquire_exclusive(schema_name, table_name)) {
1535       DBUG_PRINT("error", ("Unable to acquire lock to parent table '%s.%s'",
1536                            schema_name, table_name));
1537       return false;
1538     }
1539   }
1540   return true;
1541 }
1542 
1543 /**
1544   Fetch the list of referenced tables to add from the local Data Dictionary
1545   if available and also from the NDB Dictionary if available. Then lock
1546   them and add them to the unique list maintained by the invalidator.
1547 
1548   @param schema_name          Schema name of the table.
1549   @param table_name           Name of the table.
1550   @param table_def            Table object from the DD
1551   @param skip_ndb_dict_fetch  Bool value. If true, skip fetching the
1552                               referenced tables from NDB. Default value is
1553                               false. NDB Dictionary fetch has to be skipped
1554                               if the DDL being distributed would have dropped
1555                               the table in NDB dictionary already (like drop
1556                               table) or if reading the NDB dictionary is
1557                               redundant as the DDL won't  be adding/dropping
1558                               any FKs(like rename table).
1559 
1560   @return true        On success.
1561   @return false       Fetching failed.
1562 */
fetch_referenced_tables_to_invalidate(const char * schema_name,const char * table_name,const dd::Table * table_def,bool skip_ndb_dict_fetch)1563 bool Ndb_referenced_tables_invalidator::fetch_referenced_tables_to_invalidate(
1564     const char *schema_name, const char *table_name, const dd::Table *table_def,
1565     bool skip_ndb_dict_fetch) {
1566   DBUG_TRACE;
1567 
1568   DBUG_PRINT("info",
1569              ("Collecting parent tables of '%s.%s' that are to be invalidated",
1570               schema_name, table_name));
1571 
1572   if (table_def != nullptr) {
1573     /* Table exists in DD already. Lock and add the parents */
1574     for (const dd::Foreign_key *fk : table_def->foreign_keys()) {
1575       const char *parent_db = fk->referenced_table_schema_name().c_str();
1576       const char *parent_table = fk->referenced_table_name().c_str();
1577       if (strcmp(parent_db, schema_name) == 0 &&
1578           strcmp(parent_table, table_name) == 0) {
1579         // Given table is the parent of this FK. Skip adding.
1580         continue;
1581       }
1582       if (!add_and_lock_referenced_table(parent_db, parent_table)) {
1583         return false;
1584       }
1585     }
1586   }
1587 
1588   if (!skip_ndb_dict_fetch) {
1589     std::set<std::pair<std::string, std::string>> referenced_tables;
1590 
1591     /* fetch the foreign key definitions from NDB dictionary */
1592     if (!fetch_referenced_tables_from_ndb_dictionary(
1593             m_thd, schema_name, table_name, referenced_tables)) {
1594       return false;
1595     }
1596 
1597     /* lock and add any missing parents */
1598     for (auto const &parent_name : referenced_tables) {
1599       if (!add_and_lock_referenced_table(parent_name.first.c_str(),
1600                                          parent_name.second.c_str())) {
1601         return false;
1602       }
1603     }
1604   }
1605 
1606   return true;
1607 }
1608 
1609 /**
1610   Invalidate all the tables in the referenced_tables set by closing
1611   any cached instances in the table definition cache and invalidating
1612   the same from the local DD.
1613 
1614   @return true        On success.
1615   @return false       Invalidation failed.
1616 */
invalidate() const1617 bool Ndb_referenced_tables_invalidator::invalidate() const {
1618   DBUG_TRACE;
1619   for (auto parent_it : m_referenced_tables) {
1620     // Invalidate Table and Table Definition Caches too.
1621     const char *schema_name = parent_it.first.c_str();
1622     const char *table_name = parent_it.second.c_str();
1623     DBUG_PRINT("info",
1624                ("Invalidating parent table '%s.%s'", schema_name, table_name));
1625     if (ndb_tdc_close_cached_table(m_thd, schema_name, table_name) ||
1626         m_thd->dd_client()->invalidate(schema_name, table_name) != 0) {
1627       DBUG_PRINT("error", ("Unable to invalidate table '%s.%s'", schema_name,
1628                            table_name));
1629       return false;
1630     }
1631   }
1632   return true;
1633 }
1634