1 /*
2 Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "storage/ndb/plugin/ndb_dd_client.h"
26
27 #include <assert.h>
28 #include <iostream>
29
30 #include "my_dbug.h"
31 #include "sql/auth/auth_common.h" // check_readonly()
32 #include "sql/dd/cache/dictionary_client.h"
33 #include "sql/dd/dd.h"
34 #include "sql/dd/dd_table.h"
35 #include "sql/dd/properties.h"
36 #include "sql/dd/types/schema.h"
37 #include "sql/dd/types/table.h"
38 #include "sql/query_options.h" // OPTION_AUTOCOMMIT
39 #include "sql/sql_class.h" // THD
40 #include "sql/sql_table.h"
41 #include "sql/sql_trigger.h" // remove_all_triggers_from_perfschema
42 #include "sql/system_variables.h"
43 #include "sql/transaction.h" // trans_*
44 #include "storage/ndb/plugin/ndb_dd.h" // ndb_dd_fs_name_case
45 #include "storage/ndb/plugin/ndb_dd_disk_data.h"
46 #include "storage/ndb/plugin/ndb_dd_schema.h"
47 #include "storage/ndb/plugin/ndb_dd_sdi.h"
48 #include "storage/ndb/plugin/ndb_dd_table.h"
49 #include "storage/ndb/plugin/ndb_dd_upgrade_table.h"
50 #include "storage/ndb/plugin/ndb_fk_util.h"
51 #include "storage/ndb/plugin/ndb_log.h"
52 #include "storage/ndb/plugin/ndb_schema_dist_table.h"
53 #include "storage/ndb/plugin/ndb_tdc.h"
54 #include "storage/ndb/plugin/ndb_thd.h"
55
Ndb_dd_client(THD * thd)56 Ndb_dd_client::Ndb_dd_client(THD *thd)
57 : m_thd(thd),
58 m_client(thd->dd_client()),
59 m_save_mdl_locks(thd->mdl_context.mdl_savepoint()) {
60 DBUG_TRACE;
61 disable_autocommit();
62
63 // Create dictionary client auto releaser, stored as
64 // opaque pointer in order to avoid including all of
65 // Dictionary_client in the ndb_dd_client header file
66 m_auto_releaser =
67 (void *)new dd::cache::Dictionary_client::Auto_releaser(m_client);
68 }
69
~Ndb_dd_client()70 Ndb_dd_client::~Ndb_dd_client() {
71 DBUG_TRACE;
72 // Automatically restore the option_bits in THD if they have
73 // been modified
74 if (m_save_option_bits) m_thd->variables.option_bits = m_save_option_bits;
75
76 if (m_auto_rollback) {
77 // Automatically rollback unless commit has been called
78 if (!m_comitted) rollback();
79 }
80
81 // Release MDL locks
82 mdl_locks_release();
83
84 // Free the dictionary client auto releaser
85 dd::cache::Dictionary_client::Auto_releaser *ar =
86 (dd::cache::Dictionary_client::Auto_releaser *)m_auto_releaser;
87 delete ar;
88 }
89
mdl_lock_table(const char * schema_name,const char * table_name)90 bool Ndb_dd_client::mdl_lock_table(const char *schema_name,
91 const char *table_name) {
92 MDL_request_list mdl_requests;
93 MDL_request schema_request;
94 MDL_request mdl_request;
95 MDL_REQUEST_INIT(&schema_request, MDL_key::SCHEMA, schema_name, "",
96 MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
97 MDL_REQUEST_INIT(&mdl_request, MDL_key::TABLE, schema_name, table_name,
98 MDL_SHARED, MDL_EXPLICIT);
99
100 mdl_requests.push_front(&schema_request);
101 mdl_requests.push_front(&mdl_request);
102
103 if (m_thd->mdl_context.acquire_locks(&mdl_requests,
104 m_thd->variables.lock_wait_timeout)) {
105 return false;
106 }
107
108 // Remember tickets of the acquired mdl locks
109 m_acquired_mdl_tickets.push_back(schema_request.ticket);
110 m_acquired_mdl_tickets.push_back(mdl_request.ticket);
111
112 return true;
113 }
114
mdl_lock_schema_exclusive(const char * schema_name,bool custom_lock_wait,ulong lock_wait_timeout)115 bool Ndb_dd_client::mdl_lock_schema_exclusive(const char *schema_name,
116 bool custom_lock_wait,
117 ulong lock_wait_timeout) {
118 MDL_request_list mdl_requests;
119 MDL_request schema_request;
120 MDL_request backup_lock_request;
121 MDL_request grl_request;
122
123 // If protection against GRL can't be acquired, err out early.
124 if (m_thd->global_read_lock.can_acquire_protection()) {
125 return false;
126 }
127
128 MDL_REQUEST_INIT(&schema_request, MDL_key::SCHEMA, schema_name, "",
129 MDL_EXCLUSIVE, MDL_EXPLICIT);
130 MDL_REQUEST_INIT(&backup_lock_request, MDL_key::BACKUP_LOCK, "", "",
131 MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
132 MDL_REQUEST_INIT(&grl_request, MDL_key::GLOBAL, "", "",
133 MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
134
135 mdl_requests.push_front(&schema_request);
136 mdl_requests.push_front(&backup_lock_request);
137 mdl_requests.push_front(&grl_request);
138
139 if (!custom_lock_wait) {
140 lock_wait_timeout = m_thd->variables.lock_wait_timeout;
141 }
142
143 if (m_thd->mdl_context.acquire_locks(&mdl_requests, lock_wait_timeout)) {
144 return false;
145 }
146
147 // Remember tickets of the acquired mdl locks
148 m_acquired_mdl_tickets.push_back(schema_request.ticket);
149 m_acquired_mdl_tickets.push_back(backup_lock_request.ticket);
150 m_acquired_mdl_tickets.push_back(grl_request.ticket);
151
152 return true;
153 }
154
mdl_lock_schema(const char * schema_name)155 bool Ndb_dd_client::mdl_lock_schema(const char *schema_name) {
156 MDL_request_list mdl_requests;
157 MDL_request schema_request;
158
159 MDL_REQUEST_INIT(&schema_request, MDL_key::SCHEMA, schema_name, "",
160 MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
161 mdl_requests.push_front(&schema_request);
162
163 if (m_thd->mdl_context.acquire_locks(&mdl_requests,
164 m_thd->variables.lock_wait_timeout)) {
165 return false;
166 }
167
168 /*
169 Now when we have protection against concurrent change of read_only
170 option we can safely re-check its value.
171 */
172 if (check_readonly(m_thd, true)) return false;
173
174 // Remember ticket(s) of the acquired mdl lock
175 m_acquired_mdl_tickets.push_back(schema_request.ticket);
176
177 return true;
178 }
179
mdl_lock_logfile_group_exclusive(const char * logfile_group_name,bool custom_lock_wait,ulong lock_wait_timeout)180 bool Ndb_dd_client::mdl_lock_logfile_group_exclusive(
181 const char *logfile_group_name, bool custom_lock_wait,
182 ulong lock_wait_timeout) {
183 MDL_request_list mdl_requests;
184 MDL_request logfile_group_request;
185 MDL_request backup_lock_request;
186 MDL_request grl_request;
187
188 // If protection against GRL can't be acquired, err out early.
189 if (m_thd->global_read_lock.can_acquire_protection()) {
190 return false;
191 }
192
193 MDL_REQUEST_INIT(&logfile_group_request, MDL_key::TABLESPACE, "",
194 logfile_group_name, MDL_EXCLUSIVE, MDL_EXPLICIT);
195 MDL_REQUEST_INIT(&backup_lock_request, MDL_key::BACKUP_LOCK, "", "",
196 MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
197 MDL_REQUEST_INIT(&grl_request, MDL_key::GLOBAL, "", "",
198 MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
199
200 mdl_requests.push_front(&logfile_group_request);
201 mdl_requests.push_front(&backup_lock_request);
202 mdl_requests.push_front(&grl_request);
203
204 if (!custom_lock_wait) {
205 lock_wait_timeout = m_thd->variables.lock_wait_timeout;
206 }
207
208 if (m_thd->mdl_context.acquire_locks(&mdl_requests, lock_wait_timeout)) {
209 return false;
210 }
211
212 /*
213 Now when we have protection against concurrent change of read_only
214 option we can safely re-check its value.
215 */
216 if (check_readonly(m_thd, true)) return false;
217
218 // Remember tickets of the acquired mdl locks
219 m_acquired_mdl_tickets.push_back(logfile_group_request.ticket);
220 m_acquired_mdl_tickets.push_back(backup_lock_request.ticket);
221 m_acquired_mdl_tickets.push_back(grl_request.ticket);
222
223 return true;
224 }
225
mdl_lock_logfile_group(const char * logfile_group_name,bool intention_exclusive)226 bool Ndb_dd_client::mdl_lock_logfile_group(const char *logfile_group_name,
227 bool intention_exclusive) {
228 MDL_request_list mdl_requests;
229 MDL_request logfile_group_request;
230
231 enum_mdl_type mdl_type =
232 intention_exclusive ? MDL_INTENTION_EXCLUSIVE : MDL_SHARED_READ;
233 MDL_REQUEST_INIT(&logfile_group_request, MDL_key::TABLESPACE, "",
234 logfile_group_name, mdl_type, MDL_EXPLICIT);
235
236 mdl_requests.push_front(&logfile_group_request);
237
238 if (m_thd->mdl_context.acquire_locks(&mdl_requests,
239 m_thd->variables.lock_wait_timeout)) {
240 return false;
241 }
242
243 // Remember tickets of the acquired mdl locks
244 m_acquired_mdl_tickets.push_back(logfile_group_request.ticket);
245
246 return true;
247 }
248
mdl_lock_tablespace_exclusive(const char * tablespace_name,bool custom_lock_wait,ulong lock_wait_timeout)249 bool Ndb_dd_client::mdl_lock_tablespace_exclusive(const char *tablespace_name,
250 bool custom_lock_wait,
251 ulong lock_wait_timeout) {
252 MDL_request_list mdl_requests;
253 MDL_request tablespace_request;
254 MDL_request backup_lock_request;
255 MDL_request grl_request;
256
257 // If protection against GRL can't be acquired, err out early.
258 if (m_thd->global_read_lock.can_acquire_protection()) {
259 return false;
260 }
261
262 MDL_REQUEST_INIT(&tablespace_request, MDL_key::TABLESPACE, "",
263 tablespace_name, MDL_EXCLUSIVE, MDL_EXPLICIT);
264 MDL_REQUEST_INIT(&backup_lock_request, MDL_key::BACKUP_LOCK, "", "",
265 MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
266 MDL_REQUEST_INIT(&grl_request, MDL_key::GLOBAL, "", "",
267 MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
268
269 mdl_requests.push_front(&tablespace_request);
270 mdl_requests.push_front(&backup_lock_request);
271 mdl_requests.push_front(&grl_request);
272
273 if (!custom_lock_wait) {
274 lock_wait_timeout = m_thd->variables.lock_wait_timeout;
275 }
276
277 if (m_thd->mdl_context.acquire_locks(&mdl_requests, lock_wait_timeout)) {
278 return false;
279 }
280
281 /*
282 Now when we have protection against concurrent change of read_only
283 option we can safely re-check its value.
284 */
285 if (check_readonly(m_thd, true)) return false;
286
287 // Remember tickets of the acquired mdl locks
288 m_acquired_mdl_tickets.push_back(tablespace_request.ticket);
289 m_acquired_mdl_tickets.push_back(backup_lock_request.ticket);
290 m_acquired_mdl_tickets.push_back(grl_request.ticket);
291
292 return true;
293 }
294
mdl_lock_tablespace(const char * tablespace_name,bool intention_exclusive)295 bool Ndb_dd_client::mdl_lock_tablespace(const char *tablespace_name,
296 bool intention_exclusive) {
297 MDL_request_list mdl_requests;
298 MDL_request tablespace_request;
299
300 enum_mdl_type mdl_type =
301 intention_exclusive ? MDL_INTENTION_EXCLUSIVE : MDL_SHARED_READ;
302 MDL_REQUEST_INIT(&tablespace_request, MDL_key::TABLESPACE, "",
303 tablespace_name, mdl_type, MDL_EXPLICIT);
304
305 mdl_requests.push_front(&tablespace_request);
306
307 if (m_thd->mdl_context.acquire_locks(&mdl_requests,
308 m_thd->variables.lock_wait_timeout)) {
309 return false;
310 }
311
312 // Remember tickets of the acquired mdl locks
313 m_acquired_mdl_tickets.push_back(tablespace_request.ticket);
314
315 return true;
316 }
317
mdl_locks_acquire_exclusive(const char * schema_name,const char * table_name,bool custom_lock_wait,ulong lock_wait_timeout)318 bool Ndb_dd_client::mdl_locks_acquire_exclusive(const char *schema_name,
319 const char *table_name,
320 bool custom_lock_wait,
321 ulong lock_wait_timeout) {
322 MDL_request_list mdl_requests;
323 MDL_request schema_request;
324 MDL_request mdl_request;
325 MDL_request backup_lock_request;
326 MDL_request grl_request;
327
328 // If we cannot acquire protection against GRL, err out early.
329 if (m_thd->global_read_lock.can_acquire_protection()) return false;
330
331 MDL_REQUEST_INIT(&schema_request, MDL_key::SCHEMA, schema_name, "",
332 MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
333 MDL_REQUEST_INIT(&mdl_request, MDL_key::TABLE, schema_name, table_name,
334 MDL_EXCLUSIVE, MDL_EXPLICIT);
335 MDL_REQUEST_INIT(&backup_lock_request, MDL_key::BACKUP_LOCK, "", "",
336 MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
337 MDL_REQUEST_INIT(&grl_request, MDL_key::GLOBAL, "", "",
338 MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
339
340 mdl_requests.push_front(&schema_request);
341 mdl_requests.push_front(&mdl_request);
342 mdl_requests.push_front(&backup_lock_request);
343 mdl_requests.push_front(&grl_request);
344
345 if (!custom_lock_wait) {
346 lock_wait_timeout = m_thd->variables.lock_wait_timeout;
347 }
348
349 if (m_thd->mdl_context.acquire_locks(&mdl_requests, lock_wait_timeout)) {
350 return false;
351 }
352
353 /*
354 Now when we have protection against concurrent change of read_only
355 option we can safely re-check its value.
356 */
357 if (check_readonly(m_thd, true)) return false;
358
359 // Remember tickets of the acquired mdl locks
360 m_acquired_mdl_tickets.push_back(schema_request.ticket);
361 m_acquired_mdl_tickets.push_back(mdl_request.ticket);
362 m_acquired_mdl_tickets.push_back(backup_lock_request.ticket);
363 m_acquired_mdl_tickets.push_back(grl_request.ticket);
364
365 return true;
366 }
367
mdl_locks_release()368 void Ndb_dd_client::mdl_locks_release() {
369 // Release MDL locks acquired in EXPLICIT scope
370 for (MDL_ticket *ticket : m_acquired_mdl_tickets) {
371 m_thd->mdl_context.release_lock(ticket);
372 }
373 // Release new MDL locks acquired in TRANSACTIONAL and STATEMENT scope
374 m_thd->mdl_context.rollback_to_savepoint(m_save_mdl_locks);
375 }
376
disable_autocommit()377 void Ndb_dd_client::disable_autocommit() {
378 /*
379 Implementation details from which storage the DD uses leaks out
380 and the user of these functions magically need to turn auto commit
381 off.
382
383 I.e as in sql_table.cc, execute_ddl_log_recovery()
384 'Prevent InnoDB from automatically committing InnoDB transaction
385 each time data-dictionary tables are closed after being updated.'
386 */
387
388 // Don't allow empty bits as zero is used as indicator
389 // to restore the saved bits
390 assert(m_thd->variables.option_bits);
391 m_save_option_bits = m_thd->variables.option_bits;
392
393 m_thd->variables.option_bits &= ~OPTION_AUTOCOMMIT;
394 m_thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT;
395 }
396
commit()397 void Ndb_dd_client::commit() {
398 trans_commit_stmt(m_thd);
399 trans_commit(m_thd);
400 m_comitted = true;
401 }
402
rollback()403 void Ndb_dd_client::rollback() {
404 trans_rollback_stmt(m_thd);
405 trans_rollback(m_thd);
406 }
407
get_engine(const char * schema_name,const char * table_name,dd::String_type * engine)408 bool Ndb_dd_client::get_engine(const char *schema_name, const char *table_name,
409 dd::String_type *engine) {
410 const dd::Table *existing = nullptr;
411 if (m_client->acquire(schema_name, table_name, &existing)) {
412 return false;
413 }
414
415 if (existing == nullptr) {
416 // Table does not exist in DD
417 return false;
418 }
419
420 *engine = existing->engine();
421
422 return true;
423 }
424
rename_table(const char * old_schema_name,const char * old_table_name,const char * new_schema_name,const char * new_table_name,int new_table_id,int new_table_version,Ndb_referenced_tables_invalidator * invalidator)425 bool Ndb_dd_client::rename_table(
426 const char *old_schema_name, const char *old_table_name,
427 const char *new_schema_name, const char *new_table_name, int new_table_id,
428 int new_table_version, Ndb_referenced_tables_invalidator *invalidator) {
429 // Read new schema from DD
430 const dd::Schema *new_schema = nullptr;
431 if (m_client->acquire(new_schema_name, &new_schema)) {
432 return false;
433 }
434 if (new_schema == nullptr) {
435 // Database does not exist, unexpected
436 DBUG_ASSERT(false);
437 return false;
438 }
439
440 // Read table from DD
441 dd::Table *to_table_def = nullptr;
442 if (m_client->acquire_for_modification(old_schema_name, old_table_name,
443 &to_table_def))
444 return false;
445
446 if (invalidator != nullptr &&
447 !invalidator->fetch_referenced_tables_to_invalidate(
448 old_schema_name, old_table_name, to_table_def, true)) {
449 return false;
450 }
451
452 // Collect and lock all the tables referencing this table,
453 // referenced by this table and the foreign key names.
454 // Note : This re-attempts to lock the referenced tables that were already
455 // locked by fetch_referenced_tables_to_invalidate(). This will be
456 // fixed when Bug#30500825 gets fixed.
457 if (collect_and_lock_fk_tables_for_rename_table(
458 m_thd, old_schema_name, old_table_name, to_table_def, new_schema_name,
459 new_table_name, ndbcluster_hton, nullptr)) {
460 return false;
461 }
462
463 // Set schema id and table name
464 to_table_def->set_schema_id(new_schema->id());
465 to_table_def->set_name(new_table_name);
466
467 ndb_dd_table_set_object_id_and_version(to_table_def, new_table_id,
468 new_table_version);
469
470 // Rename foreign keys
471 if (dd::rename_foreign_keys(m_thd, old_schema_name, old_table_name,
472 ndbcluster_hton, new_schema_name, to_table_def)) {
473 // Failed to rename foreign keys or commit/rollback, unexpected
474 DBUG_ASSERT(false);
475 return false;
476 }
477
478 // Adjust parent table for self-referencing foreign keys.
479 dd::Table::Foreign_key_collection *foreign_keys =
480 to_table_def->foreign_keys();
481 for (dd::Foreign_key *fk : *foreign_keys) {
482 if (strcmp(fk->referenced_table_schema_name().c_str(), old_schema_name) ==
483 0 &&
484 strcmp(fk->referenced_table_name().c_str(), old_table_name) == 0) {
485 fk->set_referenced_table_schema_name(new_schema_name);
486 fk->set_referenced_table_name(new_table_name);
487 }
488 }
489
490 // Save table in DD
491 if (m_client->update(to_table_def)) {
492 // Failed to save, unexpected
493 DBUG_ASSERT(false);
494 return false;
495 }
496
497 // Update the foreign key information of tables referencing this table in DD
498 if (adjust_fks_for_rename_table(m_thd, old_schema_name, old_table_name,
499 new_schema_name, new_table_name,
500 ndbcluster_hton)) {
501 return false;
502 }
503
504 return true;
505 }
506
remove_table(const char * schema_name,const char * table_name,Ndb_referenced_tables_invalidator * invalidator)507 bool Ndb_dd_client::remove_table(const char *schema_name,
508 const char *table_name,
509 Ndb_referenced_tables_invalidator *invalidator)
510
511 {
512 DBUG_TRACE;
513 DBUG_PRINT("enter",
514 ("schema_name: '%s', table_name: '%s'", schema_name, table_name));
515
516 const dd::Table *existing = nullptr;
517 if (m_client->acquire(schema_name, table_name, &existing)) {
518 return false;
519 }
520
521 if (existing == nullptr) {
522 // Table does not exist
523 return true;
524 }
525
526 if (invalidator != nullptr &&
527 !invalidator->fetch_referenced_tables_to_invalidate(
528 schema_name, table_name, existing, true)) {
529 return false;
530 }
531
532 #ifdef HAVE_PSI_SP_INTERFACE
533 // Remove statistics, table is not using trigger(s) anymore
534 remove_all_triggers_from_perfschema(schema_name, *existing);
535 #endif
536
537 DBUG_PRINT("info", ("removing existing table"));
538 if (m_client->drop(existing)) {
539 // Failed to remove existing
540 DBUG_ASSERT(false); // Catch in debug, unexpected error
541 return false;
542 }
543
544 return true;
545 }
546
deserialize_table(const dd::sdi_t & sdi,dd::Table * table_def)547 bool Ndb_dd_client::deserialize_table(const dd::sdi_t &sdi,
548 dd::Table *table_def) {
549 if (ndb_dd_sdi_deserialize(m_thd, sdi, table_def)) {
550 return false;
551 }
552 return true;
553 }
554
store_table(dd::Table * install_table) const555 bool Ndb_dd_client::store_table(dd::Table *install_table) const {
556 DBUG_TRACE;
557
558 if (!m_client->store(install_table)) {
559 return true; // OK
560 }
561 return false;
562 }
563
store_table(dd::Table * install_table,int ndb_table_id)564 bool Ndb_dd_client::store_table(dd::Table *install_table, int ndb_table_id) {
565 DBUG_TRACE;
566
567 if (!m_client->store(install_table)) {
568 return true; // OK
569 }
570
571 DBUG_PRINT("error", ("Failed to store table, error: '%d %s'",
572 m_thd->get_stmt_da()->mysql_errno(),
573 m_thd->get_stmt_da()->message_text()));
574
575 if (m_thd->get_stmt_da()->mysql_errno() == ER_DUP_ENTRY) {
576 // Try to handle the failure which may occur when the DD already
577 // have a table definition from an old NDB table which used the
578 // same table id but with a different name.
579 // This may happen when the MySQL Server reconnects to the cluster
580 // and synchronizes its DD with NDB dictionary. Of course it indicates
581 // that the DD is out of synch with the dictionary in NDB but that's
582 // normal when the MySQL Server haven't taken part in DDL operations.
583 // And as usual NDB is the master for all NDB tables.
584
585 // Remove the current ER_DUP_ENTRY error, subsequent failures
586 // will set a new error
587 m_thd->clear_error();
588
589 // Find old table using the NDB tables id
590 dd::Table *old_table_def;
591 if (m_client->acquire_uncached_table_by_se_private_id(
592 "ndbcluster", ndb_table_id, &old_table_def)) {
593 // There was no old table
594 return false;
595 }
596
597 // Double check that old table is in NDB
598 if (old_table_def->engine() != "ndbcluster") {
599 DBUG_ASSERT(false);
600 return false;
601 }
602
603 // Lookup schema name of old table
604 dd::Schema *old_schema;
605 if (m_client->acquire_uncached(old_table_def->schema_id(), &old_schema)) {
606 return false;
607 }
608
609 if (old_schema == nullptr) {
610 DBUG_ASSERT(false); // Database does not exist
611 return false;
612 }
613
614 const char *old_schema_name = old_schema->name().c_str();
615 const char *old_table_name = old_table_def->name().c_str();
616 DBUG_PRINT("info", ("Found old table '%s.%s', will try to remove it",
617 old_schema_name, old_table_name));
618
619 // Take exclusive locks on old table
620 if (!mdl_locks_acquire_exclusive(old_schema_name, old_table_name)) {
621 // Failed to MDL lock old table
622 return false;
623 }
624
625 if (!remove_table(old_schema_name, old_table_name)) {
626 // Failed to remove old table from DD
627 return false;
628 }
629
630 // Try to store the new table again
631 if (m_client->store(install_table)) {
632 DBUG_PRINT("error", ("Failed to store table, error: '%d %s'",
633 m_thd->get_stmt_da()->mysql_errno(),
634 m_thd->get_stmt_da()->message_text()));
635 return false;
636 }
637
638 // Removed old table and stored the new, return OK
639 DBUG_ASSERT(!m_thd->is_error());
640 return true;
641 }
642
643 return false;
644 }
645
install_table(const char * schema_name,const char * table_name,const dd::sdi_t & sdi,int ndb_table_id,int ndb_table_version,size_t ndb_num_partitions,const std::string & tablespace_name,bool force_overwrite,Ndb_referenced_tables_invalidator * invalidator)646 bool Ndb_dd_client::install_table(
647 const char *schema_name, const char *table_name, const dd::sdi_t &sdi,
648 int ndb_table_id, int ndb_table_version, size_t ndb_num_partitions,
649 const std::string &tablespace_name, bool force_overwrite,
650 Ndb_referenced_tables_invalidator *invalidator) {
651 const dd::Schema *schema = nullptr;
652
653 if (m_client->acquire(schema_name, &schema)) {
654 return false;
655 }
656 if (schema == nullptr) {
657 DBUG_ASSERT(false); // Database does not exist
658 return false;
659 }
660
661 std::unique_ptr<dd::Table> install_table{dd::create_object<dd::Table>()};
662 if (ndb_dd_sdi_deserialize(m_thd, sdi, install_table.get())) {
663 return false;
664 }
665
666 // Verify that table_name in the unpacked table definition
667 // matches the table name to install
668 DBUG_ASSERT(ndb_dd_fs_name_case(install_table->name()) == table_name);
669
670 // Verify that table defintion unpacked from NDB
671 // does not have any se_private fields set, those will be set
672 // from the NDB table metadata
673 DBUG_ASSERT(install_table->se_private_id() == dd::INVALID_OBJECT_ID);
674 DBUG_ASSERT(install_table->se_private_data().raw_string() == "");
675
676 // Assign the id of the schema to the table_object
677 install_table->set_schema_id(schema->id());
678
679 // Assign NDB id and version of the table
680 ndb_dd_table_set_object_id_and_version(install_table.get(), ndb_table_id,
681 ndb_table_version);
682
683 // Check if the DD table object has the correct number of partitions.
684 // Correct the number of partitions in the DD table object in case of
685 // a mismatch
686 const bool check_partition_count_result = ndb_dd_table_check_partition_count(
687 install_table.get(), ndb_num_partitions);
688 if (!check_partition_count_result) {
689 ndb_dd_table_fix_partition_count(install_table.get(), ndb_num_partitions);
690 }
691
692 // Set the tablespace id if applicable
693 if (!tablespace_name.empty()) {
694 dd::Object_id tablespace_id;
695 if (!lookup_tablespace_id(tablespace_name.c_str(), &tablespace_id)) {
696 return false;
697 }
698 ndb_dd_table_set_tablespace_id(install_table.get(), tablespace_id);
699 }
700
701 const dd::Table *existing = nullptr;
702 if (m_client->acquire(schema_name, table_name, &existing)) {
703 return false;
704 }
705
706 if (invalidator != nullptr &&
707 !invalidator->fetch_referenced_tables_to_invalidate(
708 schema_name, table_name, existing)) {
709 return false;
710 }
711
712 if (existing != nullptr) {
713 // Get id and version of existing table
714 int object_id, object_version;
715 if (!ndb_dd_table_get_object_id_and_version(existing, object_id,
716 object_version)) {
717 DBUG_PRINT("error", ("Could not extract object_id and object_version "
718 "from table definition"));
719 DBUG_ASSERT(false);
720 return false;
721 }
722
723 // Check that id and version of the existing table in DD
724 // matches NDB, otherwise it's a programming error
725 // not to request "force_overwrite"
726 if (ndb_table_id == object_id && ndb_table_version == object_version) {
727 // Table is already installed, with same id and version
728 // return sucess
729 return true;
730 }
731
732 // Table already exists
733 if (!force_overwrite) {
734 // Don't overwrite existing table
735 DBUG_ASSERT(false);
736 return false;
737 }
738
739 // Continue and remove the old table before
740 // installing the new
741 DBUG_PRINT("info", ("dropping existing table"));
742 if (m_client->drop(existing)) {
743 // Failed to drop existing
744 DBUG_ASSERT(false); // Catch in debug, unexpected error
745 return false;
746 }
747 }
748
749 if (!store_table(install_table.get(), ndb_table_id)) {
750 ndb_log_error("Failed to store table: '%s.%s'", schema_name, table_name);
751 ndb_log_error_dump("sdi for new table: %s",
752 ndb_dd_sdi_prettify(sdi).c_str());
753 if (existing) {
754 const dd::sdi_t existing_sdi =
755 ndb_dd_sdi_serialize(m_thd, *existing, dd::String_type(schema_name));
756 ndb_log_error_dump("sdi for existing table: %s",
757 ndb_dd_sdi_prettify(existing_sdi).c_str());
758 }
759 DBUG_ABORT();
760 return false;
761 }
762
763 return true; // OK
764 }
765
migrate_table(const char * schema_name,const char * table_name,const unsigned char * frm_data,unsigned int unpacked_len,bool force_overwrite)766 bool Ndb_dd_client::migrate_table(const char *schema_name,
767 const char *table_name,
768 const unsigned char *frm_data,
769 unsigned int unpacked_len,
770 bool force_overwrite) {
771 if (force_overwrite) {
772 // Remove the old table before migrating
773 DBUG_PRINT("info", ("dropping existing table"));
774 if (!remove_table(schema_name, table_name)) {
775 return false;
776 }
777
778 commit();
779 }
780
781 const bool migrate_result = dd::ndb_upgrade::migrate_table_to_dd(
782 m_thd, this, schema_name, table_name, frm_data, unpacked_len);
783
784 return migrate_result;
785 }
786
get_table(const char * schema_name,const char * table_name,const dd::Table ** table_def)787 bool Ndb_dd_client::get_table(const char *schema_name, const char *table_name,
788 const dd::Table **table_def) {
789 if (m_client->acquire(schema_name, table_name, table_def)) {
790 my_error(ER_NO_SUCH_TABLE, MYF(0), schema_name, table_name);
791 return false;
792 }
793 return true;
794 }
795
table_exists(const char * schema_name,const char * table_name,bool & exists)796 bool Ndb_dd_client::table_exists(const char *schema_name,
797 const char *table_name, bool &exists) {
798 const dd::Table *table;
799 if (m_client->acquire(schema_name, table_name, &table)) {
800 // Failed to acquire the requested table
801 return false;
802 }
803
804 if (table == nullptr) {
805 // The table doesn't exist
806 exists = false;
807 return true;
808 }
809
810 // The table exists
811 exists = true;
812 return true;
813 }
814
set_tablespace_id_in_table(const char * schema_name,const char * table_name,dd::Object_id tablespace_id)815 bool Ndb_dd_client::set_tablespace_id_in_table(const char *schema_name,
816 const char *table_name,
817 dd::Object_id tablespace_id) {
818 dd::Table *table_def = nullptr;
819 if (m_client->acquire_for_modification(schema_name, table_name, &table_def)) {
820 return false;
821 }
822 if (table_def == nullptr) {
823 DBUG_ASSERT(false);
824 return false;
825 }
826
827 ndb_dd_table_set_tablespace_id(table_def, tablespace_id);
828
829 if (m_client->update(table_def)) {
830 return false;
831 }
832 return true;
833 }
834
set_object_id_and_version_in_table(const char * schema_name,const char * table_name,int object_id,int object_version)835 bool Ndb_dd_client::set_object_id_and_version_in_table(const char *schema_name,
836 const char *table_name,
837 int object_id,
838 int object_version) {
839 DBUG_TRACE;
840
841 /* Acquire the table */
842 dd::Table *table_def = nullptr;
843 if (m_client->acquire_for_modification(schema_name, table_name, &table_def)) {
844 DBUG_PRINT("error", ("Failed to load the table from DD"));
845 return false;
846 }
847
848 /* Update id and version */
849 ndb_dd_table_set_object_id_and_version(table_def, object_id, object_version);
850
851 /* Update it to DD */
852 if (m_client->update(table_def)) {
853 return false;
854 }
855
856 return true;
857 }
858
fetch_all_schemas(std::map<std::string,const dd::Schema * > & schemas)859 bool Ndb_dd_client::fetch_all_schemas(
860 std::map<std::string, const dd::Schema *> &schemas) {
861 DBUG_TRACE;
862
863 std::vector<const dd::Schema *> schemas_list;
864 if (m_client->fetch_global_components(&schemas_list)) {
865 DBUG_PRINT("error", ("Failed to fetch all schemas"));
866 return false;
867 }
868
869 for (const dd::Schema *schema : schemas_list) {
870 // Convert the schema name to lower case on platforms that have
871 // lower_case_table_names set to 2
872 const std::string schema_name = ndb_dd_fs_name_case(schema->name());
873 schemas.insert(std::make_pair(schema_name.c_str(), schema));
874 }
875 return true;
876 }
877
fetch_schema_names(std::vector<std::string> * names)878 bool Ndb_dd_client::fetch_schema_names(std::vector<std::string> *names) {
879 DBUG_TRACE;
880
881 std::vector<const dd::Schema *> schemas;
882 if (m_client->fetch_global_components(&schemas)) {
883 return false;
884 }
885
886 for (const dd::Schema *schema : schemas) {
887 // Convert the schema name to lower case on platforms that have
888 // lower_case_table_names set to 2
889 const std::string schema_name = ndb_dd_fs_name_case(schema->name());
890 names->push_back(schema_name.c_str());
891 }
892 return true;
893 }
894
get_ndb_table_names_in_schema(const char * schema_name,std::unordered_set<std::string> * names)895 bool Ndb_dd_client::get_ndb_table_names_in_schema(
896 const char *schema_name, std::unordered_set<std::string> *names) {
897 DBUG_TRACE;
898
899 const dd::Schema *schema;
900 if (m_client->acquire(schema_name, &schema)) {
901 // Failed to open the requested Schema object
902 return false;
903 }
904
905 if (schema == nullptr) {
906 // Database does not exist
907 return false;
908 }
909
910 std::vector<dd::String_type> table_names;
911 if (m_client->fetch_schema_table_names_by_engine(schema, "ndbcluster",
912 &table_names)) {
913 return false;
914 }
915
916 for (const auto &name : table_names) {
917 if (!mdl_lock_table(schema_name, name.c_str())) {
918 // Failed to MDL lock table
919 return false;
920 }
921
922 // Convert the table name to lower case on platforms that have
923 // lower_case_table_names set to 2
924 const std::string table_name = ndb_dd_fs_name_case(name);
925 names->insert(table_name);
926 }
927 return true;
928 }
929
get_table_names_in_schema(const char * schema_name,std::unordered_set<std::string> * ndb_tables,std::unordered_set<std::string> * local_tables)930 bool Ndb_dd_client::get_table_names_in_schema(
931 const char *schema_name, std::unordered_set<std::string> *ndb_tables,
932 std::unordered_set<std::string> *local_tables) {
933 DBUG_TRACE;
934
935 const dd::Schema *schema;
936 if (m_client->acquire(schema_name, &schema)) {
937 // Failed to open the requested Schema object
938 return false;
939 }
940
941 if (schema == nullptr) {
942 // Database does not exist
943 return false;
944 }
945
946 // Fetch NDB table names
947 std::vector<dd::String_type> ndb_table_names;
948 if (m_client->fetch_schema_table_names_by_engine(schema, "ndbcluster",
949 &ndb_table_names)) {
950 return false;
951 }
952 for (const auto &name : ndb_table_names) {
953 // Lock the table in DD
954 if (!mdl_lock_table(schema_name, name.c_str())) {
955 // Failed to acquire MDL
956 return false;
957 }
958 // Convert the table name to lower case on platforms that have
959 // lower_case_table_names set to 2
960 const std::string table_name = ndb_dd_fs_name_case(name);
961 ndb_tables->insert(table_name);
962 }
963
964 // Fetch all table names
965 std::vector<dd::String_type> all_table_names;
966 if (m_client->fetch_schema_table_names_not_hidden_by_se(schema,
967 &all_table_names)) {
968 return false;
969 }
970 for (const auto &name : all_table_names) {
971 // Convert the table name to lower case on platforms that have
972 // lower_case_table_names set to 2
973 const std::string table_name = ndb_dd_fs_name_case(name);
974 if (ndb_tables->find(table_name) != ndb_tables->end()) {
975 // Skip NDB table
976 continue;
977 }
978 // Lock the table in DD
979 if (!mdl_lock_table(schema_name, name.c_str())) {
980 // Failed to acquire MDL
981 return false;
982 }
983 local_tables->insert(table_name);
984 }
985 return true;
986 }
987
988 /*
989 Check given schema for local tables(i.e not in NDB)
990
991 @param schema_name Name of the schema to check for tables
992 @param [out] found_local_tables Return parameter indicating if the schema
993 contained local tables or not.
994
995 @return false Failure
996 @return true Success.
997 */
998
have_local_tables_in_schema(const char * schema_name,bool * found_local_tables)999 bool Ndb_dd_client::have_local_tables_in_schema(const char *schema_name,
1000 bool *found_local_tables) {
1001 DBUG_TRACE;
1002
1003 const dd::Schema *schema;
1004 if (m_client->acquire(schema_name, &schema)) {
1005 // Failed to open the requested schema
1006 return false;
1007 }
1008
1009 if (schema == nullptr) {
1010 // The schema didn't exist, thus it can't have any local tables
1011 *found_local_tables = false;
1012 return true;
1013 }
1014
1015 // Fetch all table names
1016 std::vector<dd::String_type> all_table_names;
1017 if (m_client->fetch_schema_table_names_not_hidden_by_se(schema,
1018 &all_table_names)) {
1019 return false;
1020 }
1021 // Fetch NDB table names
1022 std::vector<dd::String_type> ndb_table_names;
1023 if (m_client->fetch_schema_table_names_by_engine(schema, "ndbcluster",
1024 &ndb_table_names)) {
1025 return false;
1026 }
1027
1028 *found_local_tables = all_table_names.size() > ndb_table_names.size();
1029
1030 return true;
1031 }
1032
is_local_table(const char * schema_name,const char * table_name,bool & local_table)1033 bool Ndb_dd_client::is_local_table(const char *schema_name,
1034 const char *table_name, bool &local_table) {
1035 const dd::Table *table;
1036 if (m_client->acquire(schema_name, table_name, &table)) {
1037 // Failed to acquire the requested table
1038 return false;
1039 }
1040 if (table == nullptr) {
1041 // The table doesn't exist
1042 DBUG_ASSERT(false);
1043 return false;
1044 }
1045 local_table = table->engine() != "ndbcluster";
1046 return true;
1047 }
1048
get_schema(const char * schema_name,const dd::Schema ** schema_def) const1049 bool Ndb_dd_client::get_schema(const char *schema_name,
1050 const dd::Schema **schema_def) const {
1051 if (m_client->acquire(schema_name, schema_def)) {
1052 // Error is reported by the dictionary subsystem.
1053 return false;
1054 }
1055 return true;
1056 }
1057
schema_exists(const char * schema_name,bool * schema_exists)1058 bool Ndb_dd_client::schema_exists(const char *schema_name,
1059 bool *schema_exists) {
1060 DBUG_TRACE;
1061
1062 const dd::Schema *schema;
1063 if (m_client->acquire(schema_name, &schema)) {
1064 // Failed to open the requested schema
1065 return false;
1066 }
1067
1068 if (schema == nullptr) {
1069 // The schema didn't exist
1070 *schema_exists = false;
1071 return true;
1072 }
1073
1074 // The schema exists
1075 *schema_exists = true;
1076 return true;
1077 }
1078
update_schema_version(const char * schema_name,unsigned int counter,unsigned int node_id)1079 bool Ndb_dd_client::update_schema_version(const char *schema_name,
1080 unsigned int counter,
1081 unsigned int node_id) {
1082 DBUG_TRACE;
1083 DBUG_PRINT("enter", ("Schema : %s, counter : %u, node_id : %u", schema_name,
1084 counter, node_id));
1085
1086 DBUG_ASSERT(m_thd->mdl_context.owns_equal_or_stronger_lock(
1087 MDL_key::SCHEMA, schema_name, "", MDL_EXCLUSIVE));
1088
1089 dd::Schema *schema;
1090
1091 if (m_client->acquire_for_modification(schema_name, &schema) ||
1092 schema == nullptr) {
1093 DBUG_PRINT("error", ("Failed to fetch the Schema object"));
1094 return false;
1095 }
1096
1097 // Set the values
1098 ndb_dd_schema_set_counter_and_nodeid(schema, counter, node_id);
1099
1100 // Update Schema in DD
1101 if (m_client->update(schema)) {
1102 DBUG_PRINT("error", ("Failed to update the Schema in DD"));
1103 return false;
1104 }
1105
1106 return true;
1107 }
1108
lookup_tablespace_id(const char * tablespace_name,dd::Object_id * tablespace_id)1109 bool Ndb_dd_client::lookup_tablespace_id(const char *tablespace_name,
1110 dd::Object_id *tablespace_id) {
1111 DBUG_TRACE;
1112 DBUG_PRINT("enter", ("tablespace_name: %s", tablespace_name));
1113
1114 DBUG_ASSERT(m_thd->mdl_context.owns_equal_or_stronger_lock(
1115 MDL_key::TABLESPACE, "", tablespace_name, MDL_INTENTION_EXCLUSIVE));
1116
1117 // Acquire tablespace.
1118 const dd::Tablespace *ts_obj = NULL;
1119 if (m_client->acquire(tablespace_name, &ts_obj)) {
1120 // acquire() always fails with an error being reported.
1121 return false;
1122 }
1123
1124 if (!ts_obj) {
1125 my_error(ER_TABLESPACE_MISSING_WITH_NAME, MYF(0), tablespace_name);
1126 return false;
1127 }
1128
1129 *tablespace_id = ts_obj->id();
1130 DBUG_PRINT("exit", ("tablespace_id: %llu", *tablespace_id));
1131
1132 return true;
1133 }
1134
get_tablespace(const char * tablespace_name,const dd::Tablespace ** tablespace_def)1135 bool Ndb_dd_client::get_tablespace(const char *tablespace_name,
1136 const dd::Tablespace **tablespace_def) {
1137 if (m_client->acquire(tablespace_name, tablespace_def)) {
1138 return false;
1139 }
1140 return true;
1141 }
1142
tablespace_exists(const char * tablespace_name,bool & exists)1143 bool Ndb_dd_client::tablespace_exists(const char *tablespace_name,
1144 bool &exists) {
1145 const dd::Tablespace *tablespace;
1146 if (m_client->acquire(tablespace_name, &tablespace)) {
1147 // Failed to acquire the requested tablespace
1148 return false;
1149 }
1150
1151 if (tablespace == nullptr) {
1152 // The tablespace doesn't exist
1153 exists = false;
1154 return true;
1155 }
1156
1157 // The tablespace exists
1158 exists = true;
1159 return true;
1160 }
1161
fetch_ndb_tablespace_names(std::unordered_set<std::string> & names)1162 bool Ndb_dd_client::fetch_ndb_tablespace_names(
1163 std::unordered_set<std::string> &names) {
1164 DBUG_TRACE;
1165
1166 std::vector<const dd::Tablespace *> tablespaces;
1167 if (m_client->fetch_global_components<dd::Tablespace>(&tablespaces)) {
1168 return false;
1169 }
1170
1171 for (const dd::Tablespace *tablespace : tablespaces) {
1172 if (tablespace->engine() != "ndbcluster") {
1173 // Skip non-NDB objects
1174 continue;
1175 }
1176
1177 // Find out type of object
1178 object_type type;
1179
1180 ndb_dd_disk_data_get_object_type(tablespace->se_private_data(), type);
1181
1182 if (type != object_type::TABLESPACE) {
1183 // Skip logfile groups
1184 continue;
1185 }
1186
1187 // Acquire lock in DD
1188 if (!mdl_lock_tablespace(tablespace->name().c_str(),
1189 false /* intention_exclusive */)) {
1190 // Failed to acquire MDL lock
1191 return false;
1192 }
1193
1194 names.insert(tablespace->name().c_str());
1195 }
1196 return true;
1197 }
1198
install_tablespace(const char * tablespace_name,const std::vector<std::string> & data_file_names,int tablespace_id,int tablespace_version,bool force_overwrite)1199 bool Ndb_dd_client::install_tablespace(
1200 const char *tablespace_name,
1201 const std::vector<std::string> &data_file_names, int tablespace_id,
1202 int tablespace_version, bool force_overwrite) {
1203 DBUG_TRACE;
1204
1205 bool exists;
1206 if (!tablespace_exists(tablespace_name, exists)) {
1207 // Could not detect if the tablespace exists or not
1208 return false;
1209 }
1210
1211 if (exists) {
1212 if (force_overwrite) {
1213 if (!drop_tablespace(tablespace_name)) {
1214 // Failed to drop tablespace
1215 return false;
1216 }
1217 } else {
1218 // Error since tablespace exists but force_overwrite not set by caller
1219 // No point continuing since the subsequent store() will fail
1220 return false;
1221 }
1222 }
1223
1224 std::unique_ptr<dd::Tablespace> tablespace(
1225 dd::create_object<dd::Tablespace>());
1226
1227 // Set name
1228 tablespace->set_name(tablespace_name);
1229
1230 // Engine type
1231 tablespace->set_engine("ndbcluster");
1232
1233 // Add data files
1234 for (const auto &data_file_name : data_file_names) {
1235 ndb_dd_disk_data_add_file(tablespace.get(), data_file_name.c_str());
1236 }
1237
1238 // Assign id and version
1239 ndb_dd_disk_data_set_object_id_and_version(tablespace.get(), tablespace_id,
1240 tablespace_version);
1241
1242 // Assign object type as tablespace
1243 ndb_dd_disk_data_set_object_type(tablespace.get()->se_private_data(),
1244 object_type::TABLESPACE);
1245
1246 // Write changes to dictionary.
1247 if (m_client->store(tablespace.get())) {
1248 return false;
1249 }
1250
1251 return true;
1252 }
1253
drop_tablespace(const char * tablespace_name,bool fail_if_not_exists)1254 bool Ndb_dd_client::drop_tablespace(const char *tablespace_name,
1255 bool fail_if_not_exists)
1256
1257 {
1258 DBUG_TRACE;
1259
1260 const dd::Tablespace *existing = nullptr;
1261 if (m_client->acquire(tablespace_name, &existing)) {
1262 return false;
1263 }
1264
1265 if (existing == nullptr) {
1266 // Tablespace does not exist
1267 if (fail_if_not_exists) {
1268 return false;
1269 }
1270 return true;
1271 }
1272
1273 if (m_client->drop(existing)) {
1274 return false;
1275 }
1276
1277 return true;
1278 }
1279
get_logfile_group(const char * logfile_group_name,const dd::Tablespace ** logfile_group_def)1280 bool Ndb_dd_client::get_logfile_group(
1281 const char *logfile_group_name, const dd::Tablespace **logfile_group_def) {
1282 if (m_client->acquire(logfile_group_name, logfile_group_def)) {
1283 return false;
1284 }
1285 return true;
1286 }
1287
logfile_group_exists(const char * logfile_group_name,bool & exists)1288 bool Ndb_dd_client::logfile_group_exists(const char *logfile_group_name,
1289 bool &exists) {
1290 const dd::Tablespace *logfile_group;
1291 if (m_client->acquire(logfile_group_name, &logfile_group)) {
1292 // Failed to acquire the requested logfile group
1293 return false;
1294 }
1295
1296 if (logfile_group == nullptr) {
1297 // The logfile group doesn't exist
1298 exists = false;
1299 return true;
1300 }
1301
1302 // The logfile group exists
1303 exists = true;
1304 return true;
1305 }
1306
fetch_ndb_logfile_group_names(std::unordered_set<std::string> & names)1307 bool Ndb_dd_client::fetch_ndb_logfile_group_names(
1308 std::unordered_set<std::string> &names) {
1309 DBUG_TRACE;
1310
1311 std::vector<const dd::Tablespace *> tablespaces;
1312 if (m_client->fetch_global_components<dd::Tablespace>(&tablespaces)) {
1313 return false;
1314 }
1315
1316 for (const dd::Tablespace *tablespace : tablespaces) {
1317 if (tablespace->engine() != "ndbcluster") {
1318 // Skip non-NDB objects
1319 continue;
1320 }
1321
1322 // Find out type of object
1323 object_type type;
1324
1325 ndb_dd_disk_data_get_object_type(tablespace->se_private_data(), type);
1326
1327 if (type != object_type::LOGFILE_GROUP) {
1328 // Skip tablespaces
1329 continue;
1330 }
1331
1332 // Acquire lock in DD
1333 if (!mdl_lock_logfile_group(tablespace->name().c_str(),
1334 false /* intention_exclusive */)) {
1335 // Failed to acquire MDL lock
1336 return false;
1337 }
1338
1339 names.insert(tablespace->name().c_str());
1340 }
1341 return true;
1342 }
1343
install_logfile_group(const char * logfile_group_name,const std::vector<std::string> & undo_file_names,int logfile_group_id,int logfile_group_version,bool force_overwrite)1344 bool Ndb_dd_client::install_logfile_group(
1345 const char *logfile_group_name,
1346 const std::vector<std::string> &undo_file_names, int logfile_group_id,
1347 int logfile_group_version, bool force_overwrite) {
1348 DBUG_TRACE;
1349
1350 /*
1351 * Logfile groups are stored as tablespaces in the DD.
1352 * This is acceptable since the only reason for storing
1353 * them in the DD is to ensure that INFORMATION_SCHEMA
1354 * is aware of their presence. Thus, rather than
1355 * extending DD, we use tablespaces since they resemble
1356 * logfile groups in terms of metadata structure
1357 */
1358
1359 bool exists;
1360 if (!logfile_group_exists(logfile_group_name, exists)) {
1361 // Could not detect if the logfile group exists or not
1362 return false;
1363 }
1364
1365 if (exists) {
1366 if (force_overwrite) {
1367 if (!drop_logfile_group(logfile_group_name)) {
1368 // Failed to drop logfile group
1369 return false;
1370 }
1371 } else {
1372 // Error since logfile group exists but force_overwrite not set to true by
1373 // caller. No point continuing since the subsequent store() will fail
1374 return false;
1375 }
1376 }
1377
1378 std::unique_ptr<dd::Tablespace> logfile_group(
1379 dd::create_object<dd::Tablespace>());
1380
1381 // Set name
1382 logfile_group->set_name(logfile_group_name);
1383
1384 // Engine type
1385 logfile_group->set_engine("ndbcluster");
1386
1387 // Add undofiles
1388 for (const auto &undo_file_name : undo_file_names) {
1389 ndb_dd_disk_data_add_file(logfile_group.get(), undo_file_name.c_str());
1390 }
1391
1392 // Assign id and version
1393 ndb_dd_disk_data_set_object_id_and_version(
1394 logfile_group.get(), logfile_group_id, logfile_group_version);
1395
1396 // Assign object type as logfile group
1397 ndb_dd_disk_data_set_object_type(logfile_group.get()->se_private_data(),
1398 object_type::LOGFILE_GROUP);
1399
1400 // Write changes to dictionary.
1401 if (m_client->store(logfile_group.get())) {
1402 return false;
1403 }
1404
1405 return true;
1406 }
1407
install_undo_file(const char * logfile_group_name,const char * undo_file_name)1408 bool Ndb_dd_client::install_undo_file(const char *logfile_group_name,
1409 const char *undo_file_name) {
1410 DBUG_TRACE;
1411
1412 // Read logfile group from DD
1413 dd::Tablespace *new_logfile_group_def = nullptr;
1414 if (m_client->acquire_for_modification(logfile_group_name,
1415 &new_logfile_group_def))
1416 return false;
1417
1418 if (!new_logfile_group_def) return false;
1419
1420 ndb_dd_disk_data_add_file(new_logfile_group_def, undo_file_name);
1421
1422 // Write changes to dictionary.
1423 if (m_client->update(new_logfile_group_def)) {
1424 return false;
1425 }
1426
1427 return true;
1428 }
1429
drop_logfile_group(const char * logfile_group_name,bool fail_if_not_exists)1430 bool Ndb_dd_client::drop_logfile_group(const char *logfile_group_name,
1431 bool fail_if_not_exists) {
1432 DBUG_TRACE;
1433
1434 /*
1435 * Logfile groups are stored as tablespaces in the DD.
1436 * This is acceptable since the only reason for storing
1437 * them in the DD is to ensure that INFORMATION_SCHEMA
1438 * is aware of their presence. Thus, rather than
1439 * extending DD, we use tablespaces since they resemble
1440 * logfile groups in terms of metadata structure
1441 */
1442
1443 const dd::Tablespace *existing = nullptr;
1444 if (m_client->acquire(logfile_group_name, &existing)) {
1445 return false;
1446 }
1447
1448 if (existing == nullptr) {
1449 // Logfile group does not exist
1450 if (fail_if_not_exists) {
1451 return false;
1452 }
1453 return true;
1454 }
1455
1456 if (m_client->drop(existing)) {
1457 return false;
1458 }
1459
1460 return true;
1461 }
1462
get_schema_uuid(dd::String_type * value) const1463 bool Ndb_dd_client::get_schema_uuid(dd::String_type *value) const {
1464 DBUG_TRACE;
1465
1466 // Schema UUID will be stored in ndb_schema table definition in DD
1467 const dd::Table *table = nullptr;
1468 if (m_client->acquire(Ndb_schema_dist_table::DB_NAME.c_str(),
1469 Ndb_schema_dist_table::TABLE_NAME.c_str(), &table)) {
1470 DBUG_ASSERT(false);
1471 return false;
1472 }
1473
1474 if (table == nullptr) {
1475 // Table doesn't exists. This is OK as it might happen
1476 // if the function is called before ndb_schema is created.
1477 return true;
1478 }
1479
1480 if (!ndb_dd_table_get_schema_uuid(table, value)) {
1481 // Table has invalid Schema UUID
1482 return false;
1483 }
1484
1485 return true;
1486 }
1487
update_schema_uuid(const char * value) const1488 bool Ndb_dd_client::update_schema_uuid(const char *value) const {
1489 DBUG_TRACE;
1490
1491 // Schema UUID should be updated in ndb_schema table definition in DD
1492 dd::Table *table = nullptr;
1493 if (m_client->acquire_for_modification(
1494 Ndb_schema_dist_table::DB_NAME.c_str(),
1495 Ndb_schema_dist_table::TABLE_NAME.c_str(), &table)) {
1496 DBUG_ASSERT(false);
1497 return false;
1498 }
1499
1500 if (table == nullptr) {
1501 // Table does not exist in DD
1502 DBUG_ASSERT(false);
1503 return false;
1504 }
1505
1506 // Set the schema uuid value to the table object
1507 ndb_dd_table_set_schema_uuid(table, value);
1508
1509 if (m_client->update(table)) {
1510 DBUG_ASSERT(false);
1511 return false;
1512 }
1513
1514 return true;
1515 }
1516
1517 /**
1518 Lock and add the given referenced table to the set of
1519 referenced tables maintained by the invalidator.
1520
1521 @param schema_name Schema name of the table.
1522 @param table_name Name of the table.
1523
1524 @return true On success.
1525 @return false Unable to lock the table to the list.
1526 */
add_and_lock_referenced_table(const char * schema_name,const char * table_name)1527 bool Ndb_referenced_tables_invalidator::add_and_lock_referenced_table(
1528 const char *schema_name, const char *table_name) {
1529 auto result =
1530 m_referenced_tables.insert(std::make_pair(schema_name, table_name));
1531 if (result.second) {
1532 // New parent added to invalidator. Lock it down
1533 DBUG_PRINT("info", ("Locking '%s.%s'", schema_name, table_name));
1534 if (!m_dd_client.mdl_locks_acquire_exclusive(schema_name, table_name)) {
1535 DBUG_PRINT("error", ("Unable to acquire lock to parent table '%s.%s'",
1536 schema_name, table_name));
1537 return false;
1538 }
1539 }
1540 return true;
1541 }
1542
1543 /**
1544 Fetch the list of referenced tables to add from the local Data Dictionary
1545 if available and also from the NDB Dictionary if available. Then lock
1546 them and add them to the unique list maintained by the invalidator.
1547
1548 @param schema_name Schema name of the table.
1549 @param table_name Name of the table.
1550 @param table_def Table object from the DD
1551 @param skip_ndb_dict_fetch Bool value. If true, skip fetching the
1552 referenced tables from NDB. Default value is
1553 false. NDB Dictionary fetch has to be skipped
1554 if the DDL being distributed would have dropped
1555 the table in NDB dictionary already (like drop
1556 table) or if reading the NDB dictionary is
1557 redundant as the DDL won't be adding/dropping
1558 any FKs(like rename table).
1559
1560 @return true On success.
1561 @return false Fetching failed.
1562 */
fetch_referenced_tables_to_invalidate(const char * schema_name,const char * table_name,const dd::Table * table_def,bool skip_ndb_dict_fetch)1563 bool Ndb_referenced_tables_invalidator::fetch_referenced_tables_to_invalidate(
1564 const char *schema_name, const char *table_name, const dd::Table *table_def,
1565 bool skip_ndb_dict_fetch) {
1566 DBUG_TRACE;
1567
1568 DBUG_PRINT("info",
1569 ("Collecting parent tables of '%s.%s' that are to be invalidated",
1570 schema_name, table_name));
1571
1572 if (table_def != nullptr) {
1573 /* Table exists in DD already. Lock and add the parents */
1574 for (const dd::Foreign_key *fk : table_def->foreign_keys()) {
1575 const char *parent_db = fk->referenced_table_schema_name().c_str();
1576 const char *parent_table = fk->referenced_table_name().c_str();
1577 if (strcmp(parent_db, schema_name) == 0 &&
1578 strcmp(parent_table, table_name) == 0) {
1579 // Given table is the parent of this FK. Skip adding.
1580 continue;
1581 }
1582 if (!add_and_lock_referenced_table(parent_db, parent_table)) {
1583 return false;
1584 }
1585 }
1586 }
1587
1588 if (!skip_ndb_dict_fetch) {
1589 std::set<std::pair<std::string, std::string>> referenced_tables;
1590
1591 /* fetch the foreign key definitions from NDB dictionary */
1592 if (!fetch_referenced_tables_from_ndb_dictionary(
1593 m_thd, schema_name, table_name, referenced_tables)) {
1594 return false;
1595 }
1596
1597 /* lock and add any missing parents */
1598 for (auto const &parent_name : referenced_tables) {
1599 if (!add_and_lock_referenced_table(parent_name.first.c_str(),
1600 parent_name.second.c_str())) {
1601 return false;
1602 }
1603 }
1604 }
1605
1606 return true;
1607 }
1608
1609 /**
1610 Invalidate all the tables in the referenced_tables set by closing
1611 any cached instances in the table definition cache and invalidating
1612 the same from the local DD.
1613
1614 @return true On success.
1615 @return false Invalidation failed.
1616 */
invalidate() const1617 bool Ndb_referenced_tables_invalidator::invalidate() const {
1618 DBUG_TRACE;
1619 for (auto parent_it : m_referenced_tables) {
1620 // Invalidate Table and Table Definition Caches too.
1621 const char *schema_name = parent_it.first.c_str();
1622 const char *table_name = parent_it.second.c_str();
1623 DBUG_PRINT("info",
1624 ("Invalidating parent table '%s.%s'", schema_name, table_name));
1625 if (ndb_tdc_close_cached_table(m_thd, schema_name, table_name) ||
1626 m_thd->dd_client()->invalidate(schema_name, table_name) != 0) {
1627 DBUG_PRINT("error", ("Unable to invalidate table '%s.%s'", schema_name,
1628 table_name));
1629 return false;
1630 }
1631 }
1632 return true;
1633 }
1634