1 /***************************************************************************** 2 3 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved. 4 Copyright (c) 2016, 2020, MariaDB Corporation. 5 6 This program is free software; you can redistribute it and/or modify it under 7 the terms of the GNU General Public License as published by the Free Software 8 Foundation; version 2 of the License. 9 10 This program is distributed in the hope that it will be useful, but WITHOUT 11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. 13 14 You should have received a copy of the GNU General Public License along with 15 this program; if not, write to the Free Software Foundation, Inc., 16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA 17 18 *****************************************************************************/ 19 20 /**************************************************//** 21 @file dict/dict0load.cc 22 Loads to the memory cache database object definitions 23 from dictionary tables 24 25 Created 4/24/1996 Heikki Tuuri 26 *******************************************************/ 27 28 #include "dict0load.h" 29 30 #include "mysql_version.h" 31 #include "btr0pcur.h" 32 #include "btr0btr.h" 33 #include "dict0boot.h" 34 #include "dict0crea.h" 35 #include "dict0dict.h" 36 #include "dict0mem.h" 37 #include "dict0priv.h" 38 #include "dict0stats.h" 39 #include "fsp0file.h" 40 #include "fts0priv.h" 41 #include "mach0data.h" 42 #include "page0page.h" 43 #include "rem0cmp.h" 44 #include "srv0start.h" 45 #include "srv0srv.h" 46 #include "fts0opt.h" 47 48 /** Following are the InnoDB system tables. The positions in 49 this array are referenced by enum dict_system_table_id. */ 50 static const char* SYSTEM_TABLE_NAME[] = { 51 "SYS_TABLES", 52 "SYS_INDEXES", 53 "SYS_COLUMNS", 54 "SYS_FIELDS", 55 "SYS_FOREIGN", 56 "SYS_FOREIGN_COLS", 57 "SYS_TABLESPACES", 58 "SYS_DATAFILES", 59 "SYS_VIRTUAL" 60 }; 61 62 /** Loads a table definition and also all its index definitions. 63 64 Loads those foreign key constraints whose referenced table is already in 65 dictionary cache. If a foreign key constraint is not loaded, then the 66 referenced table is pushed into the output stack (fk_tables), if it is not 67 NULL. These tables must be subsequently loaded so that all the foreign 68 key constraints are loaded into memory. 69 70 @param[in] name Table name in the db/tablename format 71 @param[in] ignore_err Error to be ignored when loading table 72 and its index definition 73 @param[out] fk_tables Related table names that must also be 74 loaded to ensure that all foreign key 75 constraints are loaded. 76 @return table, NULL if does not exist; if the table is stored in an 77 .ibd file, but the file does not exist, then we set the 78 file_unreadable flag in the table object we return */ 79 static 80 dict_table_t* 81 dict_load_table_one( 82 const table_name_t& name, 83 dict_err_ignore_t ignore_err, 84 dict_names_t& fk_tables); 85 86 /** Load a table definition from a SYS_TABLES record to dict_table_t. 87 Do not load any columns or indexes. 88 @param[in] name Table name 89 @param[in] rec SYS_TABLES record 90 @param[out,own] table table, or NULL 91 @return error message 92 @retval NULL on success */ 93 static const char* dict_load_table_low(const table_name_t& name, 94 const rec_t* rec, dict_table_t** table) 95 MY_ATTRIBUTE((nonnull, warn_unused_result)); 96 97 /** Load an index definition from a SYS_INDEXES record to dict_index_t. 98 If allocate=TRUE, we will create a dict_index_t structure and fill it 99 accordingly. If allocated=FALSE, the dict_index_t will be supplied by 100 the caller and filled with information read from the record. 101 @return error message 102 @retval NULL on success */ 103 static 104 const char* 105 dict_load_index_low( 106 byte* table_id, /*!< in/out: table id (8 bytes), 107 an "in" value if allocate=TRUE 108 and "out" when allocate=FALSE */ 109 mem_heap_t* heap, /*!< in/out: temporary memory heap */ 110 const rec_t* rec, /*!< in: SYS_INDEXES record */ 111 ibool allocate, /*!< in: TRUE=allocate *index, 112 FALSE=fill in a pre-allocated 113 *index */ 114 dict_index_t** index); /*!< out,own: index, or NULL */ 115 116 /** Load a table column definition from a SYS_COLUMNS record to dict_table_t. 117 @return error message 118 @retval NULL on success */ 119 static 120 const char* 121 dict_load_column_low( 122 dict_table_t* table, /*!< in/out: table, could be NULL 123 if we just populate a dict_column_t 124 struct with information from 125 a SYS_COLUMNS record */ 126 mem_heap_t* heap, /*!< in/out: memory heap 127 for temporary storage */ 128 dict_col_t* column, /*!< out: dict_column_t to fill, 129 or NULL if table != NULL */ 130 table_id_t* table_id, /*!< out: table id */ 131 const char** col_name, /*!< out: column name */ 132 const rec_t* rec, /*!< in: SYS_COLUMNS record */ 133 ulint* nth_v_col); /*!< out: if not NULL, this 134 records the "n" of "nth" virtual 135 column */ 136 137 /** Load a virtual column "mapping" (to base columns) information 138 from a SYS_VIRTUAL record 139 @param[in,out] table table 140 @param[in,out] column mapped base column's dict_column_t 141 @param[in,out] table_id table id 142 @param[in,out] pos virtual column position 143 @param[in,out] base_pos base column position 144 @param[in] rec SYS_VIRTUAL record 145 @return error message 146 @retval NULL on success */ 147 static 148 const char* 149 dict_load_virtual_low( 150 dict_table_t* table, 151 dict_col_t** column, 152 table_id_t* table_id, 153 ulint* pos, 154 ulint* base_pos, 155 const rec_t* rec); 156 157 /** Load an index field definition from a SYS_FIELDS record to dict_index_t. 158 @return error message 159 @retval NULL on success */ 160 static 161 const char* 162 dict_load_field_low( 163 byte* index_id, /*!< in/out: index id (8 bytes) 164 an "in" value if index != NULL 165 and "out" if index == NULL */ 166 dict_index_t* index, /*!< in/out: index, could be NULL 167 if we just populate a dict_field_t 168 struct with information from 169 a SYS_FIELDS record */ 170 dict_field_t* sys_field, /*!< out: dict_field_t to be 171 filled */ 172 ulint* pos, /*!< out: Field position */ 173 byte* last_index_id, /*!< in: last index id */ 174 mem_heap_t* heap, /*!< in/out: memory heap 175 for temporary storage */ 176 const rec_t* rec); /*!< in: SYS_FIELDS record */ 177 178 /* If this flag is TRUE, then we will load the cluster index's (and tables') 179 metadata even if it is marked as "corrupted". */ 180 my_bool srv_load_corrupted; 181 182 #ifdef UNIV_DEBUG 183 /****************************************************************//** 184 Compare the name of an index column. 185 @return TRUE if the i'th column of index is 'name'. */ 186 static 187 ibool 188 name_of_col_is( 189 /*===========*/ 190 const dict_table_t* table, /*!< in: table */ 191 const dict_index_t* index, /*!< in: index */ 192 ulint i, /*!< in: index field offset */ 193 const char* name) /*!< in: name to compare to */ 194 { 195 ulint tmp = dict_col_get_no(dict_field_get_col( 196 dict_index_get_nth_field( 197 index, i))); 198 199 return(strcmp(name, dict_table_get_col_name(table, tmp)) == 0); 200 } 201 #endif /* UNIV_DEBUG */ 202 203 /********************************************************************//** 204 Finds the first table name in the given database. 205 @return own: table name, NULL if does not exist; the caller must free 206 the memory in the string! */ 207 char* 208 dict_get_first_table_name_in_db( 209 /*============================*/ 210 const char* name) /*!< in: database name which ends in '/' */ 211 { 212 dict_table_t* sys_tables; 213 btr_pcur_t pcur; 214 dict_index_t* sys_index; 215 dtuple_t* tuple; 216 mem_heap_t* heap; 217 dfield_t* dfield; 218 const rec_t* rec; 219 const byte* field; 220 ulint len; 221 mtr_t mtr; 222 223 ut_ad(mutex_own(&dict_sys->mutex)); 224 225 heap = mem_heap_create(1000); 226 227 mtr_start(&mtr); 228 229 sys_tables = dict_table_get_low("SYS_TABLES"); 230 sys_index = UT_LIST_GET_FIRST(sys_tables->indexes); 231 ut_ad(!dict_table_is_comp(sys_tables)); 232 233 tuple = dtuple_create(heap, 1); 234 dfield = dtuple_get_nth_field(tuple, 0); 235 236 dfield_set_data(dfield, name, ut_strlen(name)); 237 dict_index_copy_types(tuple, sys_index, 1); 238 239 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE, 240 BTR_SEARCH_LEAF, &pcur, &mtr); 241 loop: 242 rec = btr_pcur_get_rec(&pcur); 243 244 if (!btr_pcur_is_on_user_rec(&pcur)) { 245 /* Not found */ 246 247 btr_pcur_close(&pcur); 248 mtr_commit(&mtr); 249 mem_heap_free(heap); 250 251 return(NULL); 252 } 253 254 field = rec_get_nth_field_old( 255 rec, DICT_FLD__SYS_TABLES__NAME, &len); 256 257 if (len < strlen(name) 258 || ut_memcmp(name, field, strlen(name)) != 0) { 259 /* Not found */ 260 261 btr_pcur_close(&pcur); 262 mtr_commit(&mtr); 263 mem_heap_free(heap); 264 265 return(NULL); 266 } 267 268 if (!rec_get_deleted_flag(rec, 0)) { 269 270 /* We found one */ 271 272 char* table_name = mem_strdupl((char*) field, len); 273 274 btr_pcur_close(&pcur); 275 mtr_commit(&mtr); 276 mem_heap_free(heap); 277 278 return(table_name); 279 } 280 281 btr_pcur_move_to_next_user_rec(&pcur, &mtr); 282 283 goto loop; 284 } 285 286 /********************************************************************//** 287 This function gets the next system table record as it scans the table. 288 @return the next record if found, NULL if end of scan */ 289 static 290 const rec_t* 291 dict_getnext_system_low( 292 /*====================*/ 293 btr_pcur_t* pcur, /*!< in/out: persistent cursor to the 294 record*/ 295 mtr_t* mtr) /*!< in: the mini-transaction */ 296 { 297 rec_t* rec = NULL; 298 299 while (!rec || rec_get_deleted_flag(rec, 0)) { 300 btr_pcur_move_to_next_user_rec(pcur, mtr); 301 302 rec = btr_pcur_get_rec(pcur); 303 304 if (!btr_pcur_is_on_user_rec(pcur)) { 305 /* end of index */ 306 btr_pcur_close(pcur); 307 308 return(NULL); 309 } 310 } 311 312 /* Get a record, let's save the position */ 313 btr_pcur_store_position(pcur, mtr); 314 315 return(rec); 316 } 317 318 /********************************************************************//** 319 This function opens a system table, and returns the first record. 320 @return first record of the system table */ 321 const rec_t* 322 dict_startscan_system( 323 /*==================*/ 324 btr_pcur_t* pcur, /*!< out: persistent cursor to 325 the record */ 326 mtr_t* mtr, /*!< in: the mini-transaction */ 327 dict_system_id_t system_id) /*!< in: which system table to open */ 328 { 329 dict_table_t* system_table; 330 dict_index_t* clust_index; 331 const rec_t* rec; 332 333 ut_a(system_id < SYS_NUM_SYSTEM_TABLES); 334 335 system_table = dict_table_get_low(SYSTEM_TABLE_NAME[system_id]); 336 337 clust_index = UT_LIST_GET_FIRST(system_table->indexes); 338 339 btr_pcur_open_at_index_side(true, clust_index, BTR_SEARCH_LEAF, pcur, 340 true, 0, mtr); 341 342 rec = dict_getnext_system_low(pcur, mtr); 343 344 return(rec); 345 } 346 347 /********************************************************************//** 348 This function gets the next system table record as it scans the table. 349 @return the next record if found, NULL if end of scan */ 350 const rec_t* 351 dict_getnext_system( 352 /*================*/ 353 btr_pcur_t* pcur, /*!< in/out: persistent cursor 354 to the record */ 355 mtr_t* mtr) /*!< in: the mini-transaction */ 356 { 357 const rec_t* rec; 358 359 /* Restore the position */ 360 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr); 361 362 /* Get the next record */ 363 rec = dict_getnext_system_low(pcur, mtr); 364 365 return(rec); 366 } 367 368 /********************************************************************//** 369 This function processes one SYS_TABLES record and populate the dict_table_t 370 struct for the table. 371 @return error message, or NULL on success */ 372 const char* 373 dict_process_sys_tables_rec_and_mtr_commit( 374 /*=======================================*/ 375 mem_heap_t* heap, /*!< in/out: temporary memory heap */ 376 const rec_t* rec, /*!< in: SYS_TABLES record */ 377 dict_table_t** table, /*!< out: dict_table_t to fill */ 378 bool cached, /*!< in: whether to load from cache */ 379 mtr_t* mtr) /*!< in/out: mini-transaction, 380 will be committed */ 381 { 382 ulint len; 383 const char* field; 384 385 field = (const char*) rec_get_nth_field_old( 386 rec, DICT_FLD__SYS_TABLES__NAME, &len); 387 388 ut_a(!rec_get_deleted_flag(rec, 0)); 389 390 ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX)); 391 392 /* Get the table name */ 393 table_name_t table_name(mem_heap_strdupl(heap, field, len)); 394 395 if (cached) { 396 /* Commit before load the table again */ 397 mtr_commit(mtr); 398 399 *table = dict_table_get_low(table_name.m_name); 400 return *table ? NULL : "Table not found in cache"; 401 } else { 402 const char* err = dict_load_table_low(table_name, rec, table); 403 mtr_commit(mtr); 404 return err; 405 } 406 } 407 408 /********************************************************************//** 409 This function parses a SYS_INDEXES record and populate a dict_index_t 410 structure with the information from the record. For detail information 411 about SYS_INDEXES fields, please refer to dict_boot() function. 412 @return error message, or NULL on success */ 413 const char* 414 dict_process_sys_indexes_rec( 415 /*=========================*/ 416 mem_heap_t* heap, /*!< in/out: heap memory */ 417 const rec_t* rec, /*!< in: current SYS_INDEXES rec */ 418 dict_index_t* index, /*!< out: index to be filled */ 419 table_id_t* table_id) /*!< out: index table id */ 420 { 421 const char* err_msg; 422 byte* buf; 423 424 buf = static_cast<byte*>(mem_heap_alloc(heap, 8)); 425 426 /* Parse the record, and get "dict_index_t" struct filled */ 427 err_msg = dict_load_index_low(buf, heap, rec, FALSE, &index); 428 429 *table_id = mach_read_from_8(buf); 430 431 return(err_msg); 432 } 433 434 /********************************************************************//** 435 This function parses a SYS_COLUMNS record and populate a dict_column_t 436 structure with the information from the record. 437 @return error message, or NULL on success */ 438 const char* 439 dict_process_sys_columns_rec( 440 /*=========================*/ 441 mem_heap_t* heap, /*!< in/out: heap memory */ 442 const rec_t* rec, /*!< in: current SYS_COLUMNS rec */ 443 dict_col_t* column, /*!< out: dict_col_t to be filled */ 444 table_id_t* table_id, /*!< out: table id */ 445 const char** col_name, /*!< out: column name */ 446 ulint* nth_v_col) /*!< out: if virtual col, this is 447 record's sequence number */ 448 { 449 const char* err_msg; 450 451 /* Parse the record, and get "dict_col_t" struct filled */ 452 err_msg = dict_load_column_low(NULL, heap, column, 453 table_id, col_name, rec, nth_v_col); 454 455 return(err_msg); 456 } 457 458 /** This function parses a SYS_VIRTUAL record and extracts virtual column 459 information 460 @param[in] rec current SYS_COLUMNS rec 461 @param[in,out] table_id table id 462 @param[in,out] pos virtual column position 463 @param[in,out] base_pos base column position 464 @return error message, or NULL on success */ 465 const char* 466 dict_process_sys_virtual_rec( 467 const rec_t* rec, 468 table_id_t* table_id, 469 ulint* pos, 470 ulint* base_pos) 471 { 472 const char* err_msg; 473 474 /* Parse the record, and get "dict_col_t" struct filled */ 475 err_msg = dict_load_virtual_low(NULL, NULL, table_id, 476 pos, base_pos, rec); 477 478 return(err_msg); 479 } 480 481 /********************************************************************//** 482 This function parses a SYS_FIELDS record and populates a dict_field_t 483 structure with the information from the record. 484 @return error message, or NULL on success */ 485 const char* 486 dict_process_sys_fields_rec( 487 /*========================*/ 488 mem_heap_t* heap, /*!< in/out: heap memory */ 489 const rec_t* rec, /*!< in: current SYS_FIELDS rec */ 490 dict_field_t* sys_field, /*!< out: dict_field_t to be 491 filled */ 492 ulint* pos, /*!< out: Field position */ 493 index_id_t* index_id, /*!< out: current index id */ 494 index_id_t last_id) /*!< in: previous index id */ 495 { 496 byte* buf; 497 byte* last_index_id; 498 const char* err_msg; 499 500 buf = static_cast<byte*>(mem_heap_alloc(heap, 8)); 501 502 last_index_id = static_cast<byte*>(mem_heap_alloc(heap, 8)); 503 mach_write_to_8(last_index_id, last_id); 504 505 err_msg = dict_load_field_low(buf, NULL, sys_field, 506 pos, last_index_id, heap, rec); 507 508 *index_id = mach_read_from_8(buf); 509 510 return(err_msg); 511 512 } 513 514 /********************************************************************//** 515 This function parses a SYS_FOREIGN record and populate a dict_foreign_t 516 structure with the information from the record. For detail information 517 about SYS_FOREIGN fields, please refer to dict_load_foreign() function. 518 @return error message, or NULL on success */ 519 const char* 520 dict_process_sys_foreign_rec( 521 /*=========================*/ 522 mem_heap_t* heap, /*!< in/out: heap memory */ 523 const rec_t* rec, /*!< in: current SYS_FOREIGN rec */ 524 dict_foreign_t* foreign) /*!< out: dict_foreign_t struct 525 to be filled */ 526 { 527 ulint len; 528 const byte* field; 529 ulint n_fields_and_type; 530 531 if (rec_get_deleted_flag(rec, 0)) { 532 return("delete-marked record in SYS_FOREIGN"); 533 } 534 535 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN) { 536 return("wrong number of columns in SYS_FOREIGN record"); 537 } 538 539 field = rec_get_nth_field_old( 540 rec, DICT_FLD__SYS_FOREIGN__ID, &len); 541 if (len == 0 || len == UNIV_SQL_NULL) { 542 err_len: 543 return("incorrect column length in SYS_FOREIGN"); 544 } 545 546 /* This receives a dict_foreign_t* that points to a stack variable. 547 So dict_foreign_free(foreign) is not used as elsewhere. 548 Since the heap used here is freed elsewhere, foreign->heap 549 is not assigned. */ 550 foreign->id = mem_heap_strdupl(heap, (const char*) field, len); 551 552 rec_get_nth_field_offs_old( 553 rec, DICT_FLD__SYS_FOREIGN__DB_TRX_ID, &len); 554 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) { 555 goto err_len; 556 } 557 rec_get_nth_field_offs_old( 558 rec, DICT_FLD__SYS_FOREIGN__DB_ROLL_PTR, &len); 559 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) { 560 goto err_len; 561 } 562 563 /* The _lookup versions of the referenced and foreign table names 564 are not assigned since they are not used in this dict_foreign_t */ 565 566 field = rec_get_nth_field_old( 567 rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len); 568 if (len == 0 || len == UNIV_SQL_NULL) { 569 goto err_len; 570 } 571 foreign->foreign_table_name = mem_heap_strdupl( 572 heap, (const char*) field, len); 573 574 field = rec_get_nth_field_old( 575 rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len); 576 if (len == 0 || len == UNIV_SQL_NULL) { 577 goto err_len; 578 } 579 foreign->referenced_table_name = mem_heap_strdupl( 580 heap, (const char*) field, len); 581 582 field = rec_get_nth_field_old( 583 rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len); 584 if (len != 4) { 585 goto err_len; 586 } 587 n_fields_and_type = mach_read_from_4(field); 588 589 foreign->type = (unsigned int) (n_fields_and_type >> 24); 590 foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL); 591 592 return(NULL); 593 } 594 595 /********************************************************************//** 596 This function parses a SYS_FOREIGN_COLS record and extract necessary 597 information from the record and return to caller. 598 @return error message, or NULL on success */ 599 const char* 600 dict_process_sys_foreign_col_rec( 601 /*=============================*/ 602 mem_heap_t* heap, /*!< in/out: heap memory */ 603 const rec_t* rec, /*!< in: current SYS_FOREIGN_COLS rec */ 604 const char** name, /*!< out: foreign key constraint name */ 605 const char** for_col_name, /*!< out: referencing column name */ 606 const char** ref_col_name, /*!< out: referenced column name 607 in referenced table */ 608 ulint* pos) /*!< out: column position */ 609 { 610 ulint len; 611 const byte* field; 612 613 if (rec_get_deleted_flag(rec, 0)) { 614 return("delete-marked record in SYS_FOREIGN_COLS"); 615 } 616 617 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN_COLS) { 618 return("wrong number of columns in SYS_FOREIGN_COLS record"); 619 } 620 621 field = rec_get_nth_field_old( 622 rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len); 623 if (len == 0 || len == UNIV_SQL_NULL) { 624 err_len: 625 return("incorrect column length in SYS_FOREIGN_COLS"); 626 } 627 *name = mem_heap_strdupl(heap, (char*) field, len); 628 629 field = rec_get_nth_field_old( 630 rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len); 631 if (len != 4) { 632 goto err_len; 633 } 634 *pos = mach_read_from_4(field); 635 636 rec_get_nth_field_offs_old( 637 rec, DICT_FLD__SYS_FOREIGN_COLS__DB_TRX_ID, &len); 638 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) { 639 goto err_len; 640 } 641 rec_get_nth_field_offs_old( 642 rec, DICT_FLD__SYS_FOREIGN_COLS__DB_ROLL_PTR, &len); 643 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) { 644 goto err_len; 645 } 646 647 field = rec_get_nth_field_old( 648 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len); 649 if (len == 0 || len == UNIV_SQL_NULL) { 650 goto err_len; 651 } 652 *for_col_name = mem_heap_strdupl(heap, (char*) field, len); 653 654 field = rec_get_nth_field_old( 655 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len); 656 if (len == 0 || len == UNIV_SQL_NULL) { 657 goto err_len; 658 } 659 *ref_col_name = mem_heap_strdupl(heap, (char*) field, len); 660 661 return(NULL); 662 } 663 664 /********************************************************************//** 665 This function parses a SYS_TABLESPACES record, extracts necessary 666 information from the record and returns to caller. 667 @return error message, or NULL on success */ 668 const char* 669 dict_process_sys_tablespaces( 670 /*=========================*/ 671 mem_heap_t* heap, /*!< in/out: heap memory */ 672 const rec_t* rec, /*!< in: current SYS_TABLESPACES rec */ 673 ulint* space, /*!< out: space id */ 674 const char** name, /*!< out: tablespace name */ 675 ulint* flags) /*!< out: tablespace flags */ 676 { 677 ulint len; 678 const byte* field; 679 680 /* Initialize the output values */ 681 *space = ULINT_UNDEFINED; 682 *name = NULL; 683 *flags = ULINT_UNDEFINED; 684 685 if (rec_get_deleted_flag(rec, 0)) { 686 return("delete-marked record in SYS_TABLESPACES"); 687 } 688 689 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLESPACES) { 690 return("wrong number of columns in SYS_TABLESPACES record"); 691 } 692 693 field = rec_get_nth_field_old( 694 rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len); 695 if (len != DICT_FLD_LEN_SPACE) { 696 err_len: 697 return("incorrect column length in SYS_TABLESPACES"); 698 } 699 *space = mach_read_from_4(field); 700 701 rec_get_nth_field_offs_old( 702 rec, DICT_FLD__SYS_TABLESPACES__DB_TRX_ID, &len); 703 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) { 704 goto err_len; 705 } 706 707 rec_get_nth_field_offs_old( 708 rec, DICT_FLD__SYS_TABLESPACES__DB_ROLL_PTR, &len); 709 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) { 710 goto err_len; 711 } 712 713 field = rec_get_nth_field_old( 714 rec, DICT_FLD__SYS_TABLESPACES__NAME, &len); 715 if (len == 0 || len == UNIV_SQL_NULL) { 716 goto err_len; 717 } 718 *name = mem_heap_strdupl(heap, (char*) field, len); 719 720 field = rec_get_nth_field_old( 721 rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len); 722 if (len != DICT_FLD_LEN_FLAGS) { 723 goto err_len; 724 } 725 *flags = mach_read_from_4(field); 726 727 return(NULL); 728 } 729 730 /********************************************************************//** 731 This function parses a SYS_DATAFILES record, extracts necessary 732 information from the record and returns it to the caller. 733 @return error message, or NULL on success */ 734 const char* 735 dict_process_sys_datafiles( 736 /*=======================*/ 737 mem_heap_t* heap, /*!< in/out: heap memory */ 738 const rec_t* rec, /*!< in: current SYS_DATAFILES rec */ 739 ulint* space, /*!< out: space id */ 740 const char** path) /*!< out: datafile paths */ 741 { 742 ulint len; 743 const byte* field; 744 745 if (rec_get_deleted_flag(rec, 0)) { 746 return("delete-marked record in SYS_DATAFILES"); 747 } 748 749 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_DATAFILES) { 750 return("wrong number of columns in SYS_DATAFILES record"); 751 } 752 753 field = rec_get_nth_field_old( 754 rec, DICT_FLD__SYS_DATAFILES__SPACE, &len); 755 if (len != DICT_FLD_LEN_SPACE) { 756 err_len: 757 return("incorrect column length in SYS_DATAFILES"); 758 } 759 *space = mach_read_from_4(field); 760 761 rec_get_nth_field_offs_old( 762 rec, DICT_FLD__SYS_DATAFILES__DB_TRX_ID, &len); 763 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) { 764 goto err_len; 765 } 766 767 rec_get_nth_field_offs_old( 768 rec, DICT_FLD__SYS_DATAFILES__DB_ROLL_PTR, &len); 769 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) { 770 goto err_len; 771 } 772 773 field = rec_get_nth_field_old( 774 rec, DICT_FLD__SYS_DATAFILES__PATH, &len); 775 if (len == 0 || len == UNIV_SQL_NULL) { 776 goto err_len; 777 } 778 *path = mem_heap_strdupl(heap, (char*) field, len); 779 780 return(NULL); 781 } 782 783 /** Get the first filepath from SYS_DATAFILES for a given space_id. 784 @param[in] space_id Tablespace ID 785 @return First filepath (caller must invoke ut_free() on it) 786 @retval NULL if no SYS_DATAFILES entry was found. */ 787 static char* 788 dict_get_first_path( 789 ulint space_id) 790 { 791 mtr_t mtr; 792 dict_table_t* sys_datafiles; 793 dict_index_t* sys_index; 794 dtuple_t* tuple; 795 dfield_t* dfield; 796 byte* buf; 797 btr_pcur_t pcur; 798 const rec_t* rec; 799 const byte* field; 800 ulint len; 801 char* filepath = NULL; 802 mem_heap_t* heap = mem_heap_create(1024); 803 804 ut_ad(mutex_own(&dict_sys->mutex)); 805 806 mtr_start(&mtr); 807 808 sys_datafiles = dict_table_get_low("SYS_DATAFILES"); 809 sys_index = UT_LIST_GET_FIRST(sys_datafiles->indexes); 810 811 ut_ad(!dict_table_is_comp(sys_datafiles)); 812 ut_ad(name_of_col_is(sys_datafiles, sys_index, 813 DICT_FLD__SYS_DATAFILES__SPACE, "SPACE")); 814 ut_ad(name_of_col_is(sys_datafiles, sys_index, 815 DICT_FLD__SYS_DATAFILES__PATH, "PATH")); 816 817 tuple = dtuple_create(heap, 1); 818 dfield = dtuple_get_nth_field(tuple, DICT_FLD__SYS_DATAFILES__SPACE); 819 820 buf = static_cast<byte*>(mem_heap_alloc(heap, 4)); 821 mach_write_to_4(buf, space_id); 822 823 dfield_set_data(dfield, buf, 4); 824 dict_index_copy_types(tuple, sys_index, 1); 825 826 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE, 827 BTR_SEARCH_LEAF, &pcur, &mtr); 828 829 rec = btr_pcur_get_rec(&pcur); 830 831 /* Get the filepath from this SYS_DATAFILES record. */ 832 if (btr_pcur_is_on_user_rec(&pcur)) { 833 field = rec_get_nth_field_old( 834 rec, DICT_FLD__SYS_DATAFILES__SPACE, &len); 835 ut_a(len == 4); 836 837 if (space_id == mach_read_from_4(field)) { 838 /* A record for this space ID was found. */ 839 field = rec_get_nth_field_old( 840 rec, DICT_FLD__SYS_DATAFILES__PATH, &len); 841 842 ut_ad(len > 0); 843 ut_ad(len < OS_FILE_MAX_PATH); 844 845 if (len > 0 && len < UNIV_SQL_NULL) { 846 filepath = mem_strdupl( 847 reinterpret_cast<const char*>(field), 848 len); 849 ut_ad(filepath != NULL); 850 851 /* The dictionary may have been written on 852 another OS. */ 853 os_normalize_path(filepath); 854 } 855 } 856 } 857 858 btr_pcur_close(&pcur); 859 mtr_commit(&mtr); 860 mem_heap_free(heap); 861 862 return(filepath); 863 } 864 865 /** Update the record for space_id in SYS_TABLESPACES to this filepath. 866 @param[in] space_id Tablespace ID 867 @param[in] filepath Tablespace filepath 868 @return DB_SUCCESS if OK, dberr_t if the insert failed */ 869 dberr_t 870 dict_update_filepath( 871 ulint space_id, 872 const char* filepath) 873 { 874 if (!srv_sys_tablespaces_open) { 875 /* Startup procedure is not yet ready for updates. */ 876 return(DB_SUCCESS); 877 } 878 879 dberr_t err = DB_SUCCESS; 880 trx_t* trx; 881 882 ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X)); 883 ut_ad(mutex_own(&dict_sys->mutex)); 884 885 trx = trx_create(); 886 trx->op_info = "update filepath"; 887 trx->dict_operation_lock_mode = RW_X_LATCH; 888 trx_start_for_ddl(trx, TRX_DICT_OP_INDEX); 889 890 pars_info_t* info = pars_info_create(); 891 892 pars_info_add_int4_literal(info, "space", space_id); 893 pars_info_add_str_literal(info, "path", filepath); 894 895 err = que_eval_sql(info, 896 "PROCEDURE UPDATE_FILEPATH () IS\n" 897 "BEGIN\n" 898 "UPDATE SYS_DATAFILES" 899 " SET PATH = :path\n" 900 " WHERE SPACE = :space;\n" 901 "END;\n", FALSE, trx); 902 903 trx_commit_for_mysql(trx); 904 trx->dict_operation_lock_mode = 0; 905 trx->free(); 906 907 if (UNIV_LIKELY(err == DB_SUCCESS)) { 908 /* We just updated SYS_DATAFILES due to the contents in 909 a link file. Make a note that we did this. */ 910 ib::info() << "The InnoDB data dictionary table SYS_DATAFILES" 911 " for tablespace ID " << space_id 912 << " was updated to use file " << filepath << "."; 913 } else { 914 ib::warn() << "Error occurred while updating InnoDB data" 915 " dictionary table SYS_DATAFILES for tablespace ID " 916 << space_id << " to file " << filepath << ": " 917 << err << "."; 918 } 919 920 return(err); 921 } 922 923 /** Replace records in SYS_TABLESPACES and SYS_DATAFILES associated with 924 the given space_id using an independent transaction. 925 @param[in] space_id Tablespace ID 926 @param[in] name Tablespace name 927 @param[in] filepath First filepath 928 @param[in] fsp_flags Tablespace flags 929 @return DB_SUCCESS if OK, dberr_t if the insert failed */ 930 dberr_t 931 dict_replace_tablespace_and_filepath( 932 ulint space_id, 933 const char* name, 934 const char* filepath, 935 ulint fsp_flags) 936 { 937 if (!srv_sys_tablespaces_open) { 938 /* Startup procedure is not yet ready for updates. 939 Return success since this will likely get updated 940 later. */ 941 return(DB_SUCCESS); 942 } 943 944 dberr_t err = DB_SUCCESS; 945 trx_t* trx; 946 947 DBUG_EXECUTE_IF("innodb_fail_to_update_tablespace_dict", 948 return(DB_INTERRUPTED);); 949 950 ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X)); 951 ut_ad(mutex_own(&dict_sys->mutex)); 952 ut_ad(filepath); 953 954 trx = trx_create(); 955 trx->op_info = "insert tablespace and filepath"; 956 trx->dict_operation_lock_mode = RW_X_LATCH; 957 trx_start_for_ddl(trx, TRX_DICT_OP_INDEX); 958 959 /* A record for this space ID was not found in 960 SYS_DATAFILES. Assume the record is also missing in 961 SYS_TABLESPACES. Insert records into them both. */ 962 err = dict_replace_tablespace_in_dictionary( 963 space_id, name, fsp_flags, filepath, trx); 964 965 trx_commit_for_mysql(trx); 966 trx->dict_operation_lock_mode = 0; 967 trx->free(); 968 969 return(err); 970 } 971 972 /** Check the validity of a SYS_TABLES record 973 Make sure the fields are the right length and that they 974 do not contain invalid contents. 975 @param[in] rec SYS_TABLES record 976 @return error message, or NULL on success */ 977 static 978 const char* 979 dict_sys_tables_rec_check( 980 const rec_t* rec) 981 { 982 const byte* field; 983 ulint len; 984 985 ut_ad(mutex_own(&dict_sys->mutex)); 986 987 if (rec_get_deleted_flag(rec, 0)) { 988 return("delete-marked record in SYS_TABLES"); 989 } 990 991 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLES) { 992 return("wrong number of columns in SYS_TABLES record"); 993 } 994 995 rec_get_nth_field_offs_old( 996 rec, DICT_FLD__SYS_TABLES__NAME, &len); 997 if (len == 0 || len == UNIV_SQL_NULL) { 998 err_len: 999 return("incorrect column length in SYS_TABLES"); 1000 } 1001 rec_get_nth_field_offs_old( 1002 rec, DICT_FLD__SYS_TABLES__DB_TRX_ID, &len); 1003 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) { 1004 goto err_len; 1005 } 1006 rec_get_nth_field_offs_old( 1007 rec, DICT_FLD__SYS_TABLES__DB_ROLL_PTR, &len); 1008 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) { 1009 goto err_len; 1010 } 1011 1012 rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__ID, &len); 1013 if (len != 8) { 1014 goto err_len; 1015 } 1016 1017 field = rec_get_nth_field_old( 1018 rec, DICT_FLD__SYS_TABLES__N_COLS, &len); 1019 if (field == NULL || len != 4) { 1020 goto err_len; 1021 } 1022 1023 rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__TYPE, &len); 1024 if (len != 4) { 1025 goto err_len; 1026 } 1027 1028 rec_get_nth_field_offs_old( 1029 rec, DICT_FLD__SYS_TABLES__MIX_ID, &len); 1030 if (len != 8) { 1031 goto err_len; 1032 } 1033 1034 field = rec_get_nth_field_old( 1035 rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len); 1036 if (field == NULL || len != 4) { 1037 goto err_len; 1038 } 1039 1040 rec_get_nth_field_offs_old( 1041 rec, DICT_FLD__SYS_TABLES__CLUSTER_ID, &len); 1042 if (len != UNIV_SQL_NULL) { 1043 goto err_len; 1044 } 1045 1046 field = rec_get_nth_field_old( 1047 rec, DICT_FLD__SYS_TABLES__SPACE, &len); 1048 if (field == NULL || len != 4) { 1049 goto err_len; 1050 } 1051 1052 return(NULL); 1053 } 1054 1055 /** Read and return the contents of a SYS_TABLESPACES record. 1056 @param[in] rec A record of SYS_TABLESPACES 1057 @param[out] id Pointer to the space_id for this table 1058 @param[in,out] name Buffer for Tablespace Name of length NAME_LEN 1059 @param[out] flags Pointer to tablespace flags 1060 @return true if the record was read correctly, false if not. */ 1061 bool 1062 dict_sys_tablespaces_rec_read( 1063 const rec_t* rec, 1064 ulint* id, 1065 char* name, 1066 ulint* flags) 1067 { 1068 const byte* field; 1069 ulint len; 1070 1071 field = rec_get_nth_field_old( 1072 rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len); 1073 if (len != DICT_FLD_LEN_SPACE) { 1074 ib::error() << "Wrong field length in SYS_TABLESPACES.SPACE: " 1075 << len; 1076 return(false); 1077 } 1078 *id = mach_read_from_4(field); 1079 1080 field = rec_get_nth_field_old( 1081 rec, DICT_FLD__SYS_TABLESPACES__NAME, &len); 1082 if (len == 0 || len == UNIV_SQL_NULL) { 1083 ib::error() << "Wrong field length in SYS_TABLESPACES.NAME: " 1084 << len; 1085 return(false); 1086 } 1087 strncpy(name, reinterpret_cast<const char*>(field), NAME_LEN); 1088 1089 /* read the 4 byte flags from the TYPE field */ 1090 field = rec_get_nth_field_old( 1091 rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len); 1092 if (len != 4) { 1093 ib::error() << "Wrong field length in SYS_TABLESPACES.FLAGS: " 1094 << len; 1095 return(false); 1096 } 1097 *flags = mach_read_from_4(field); 1098 1099 return(true); 1100 } 1101 1102 /** Check if SYS_TABLES.TYPE is valid 1103 @param[in] type SYS_TABLES.TYPE 1104 @param[in] not_redundant whether ROW_FORMAT=REDUNDANT is not used 1105 @return whether the SYS_TABLES.TYPE value is valid */ 1106 static 1107 bool 1108 dict_sys_tables_type_valid(ulint type, bool not_redundant) 1109 { 1110 /* The DATA_DIRECTORY flag can be assigned fully independently 1111 of all other persistent table flags. */ 1112 type &= ~DICT_TF_MASK_DATA_DIR; 1113 1114 if (type == 1) { 1115 return(true); /* ROW_FORMAT=REDUNDANT or ROW_FORMAT=COMPACT */ 1116 } 1117 1118 if (!(type & 1)) { 1119 /* For ROW_FORMAT=REDUNDANT and ROW_FORMAT=COMPACT, 1120 SYS_TABLES.TYPE=1. Else, it is the same as 1121 dict_table_t::flags, and the least significant bit 1122 would be set. So, the bit never can be 0. */ 1123 return(false); 1124 } 1125 1126 if (!not_redundant) { 1127 /* SYS_TABLES.TYPE must be 1 or 1|DICT_TF_MASK_NO_ROLLBACK 1128 for ROW_FORMAT=REDUNDANT. */ 1129 return !(type & ~(1U | DICT_TF_MASK_NO_ROLLBACK)); 1130 } 1131 1132 if (type >= 1U << DICT_TF_POS_UNUSED) { 1133 /* Some unknown bits are set. */ 1134 return(false); 1135 } 1136 1137 return(dict_tf_is_valid_not_redundant(type)); 1138 } 1139 1140 /** Convert SYS_TABLES.TYPE to dict_table_t::flags. 1141 @param[in] type SYS_TABLES.TYPE 1142 @param[in] not_redundant whether ROW_FORMAT=REDUNDANT is not used 1143 @return table flags */ 1144 static 1145 ulint 1146 dict_sys_tables_type_to_tf(ulint type, bool not_redundant) 1147 { 1148 ut_ad(dict_sys_tables_type_valid(type, not_redundant)); 1149 ulint flags = not_redundant ? 1 : 0; 1150 1151 /* ZIP_SSIZE, ATOMIC_BLOBS, DATA_DIR, PAGE_COMPRESSION, 1152 PAGE_COMPRESSION_LEVEL are the same. */ 1153 flags |= type & (DICT_TF_MASK_ZIP_SSIZE 1154 | DICT_TF_MASK_ATOMIC_BLOBS 1155 | DICT_TF_MASK_DATA_DIR 1156 | DICT_TF_MASK_PAGE_COMPRESSION 1157 | DICT_TF_MASK_PAGE_COMPRESSION_LEVEL 1158 | DICT_TF_MASK_NO_ROLLBACK); 1159 1160 ut_ad(dict_tf_is_valid(flags)); 1161 return(flags); 1162 } 1163 1164 /** Read and return 5 integer fields from a SYS_TABLES record. 1165 @param[in] rec A record of SYS_TABLES 1166 @param[in] name Table Name, the same as SYS_TABLES.NAME 1167 @param[out] table_id Pointer to the table_id for this table 1168 @param[out] space_id Pointer to the space_id for this table 1169 @param[out] n_cols Pointer to number of columns for this table. 1170 @param[out] flags Pointer to table flags 1171 @param[out] flags2 Pointer to table flags2 1172 @return true if the record was read correctly, false if not. */ 1173 MY_ATTRIBUTE((warn_unused_result)) 1174 static 1175 bool 1176 dict_sys_tables_rec_read( 1177 const rec_t* rec, 1178 const table_name_t& table_name, 1179 table_id_t* table_id, 1180 ulint* space_id, 1181 ulint* n_cols, 1182 ulint* flags, 1183 ulint* flags2) 1184 { 1185 const byte* field; 1186 ulint len; 1187 ulint type; 1188 1189 field = rec_get_nth_field_old( 1190 rec, DICT_FLD__SYS_TABLES__ID, &len); 1191 ut_ad(len == 8); 1192 *table_id = static_cast<table_id_t>(mach_read_from_8(field)); 1193 1194 field = rec_get_nth_field_old( 1195 rec, DICT_FLD__SYS_TABLES__SPACE, &len); 1196 ut_ad(len == 4); 1197 *space_id = mach_read_from_4(field); 1198 1199 /* Read the 4 byte flags from the TYPE field */ 1200 field = rec_get_nth_field_old( 1201 rec, DICT_FLD__SYS_TABLES__TYPE, &len); 1202 ut_a(len == 4); 1203 type = mach_read_from_4(field); 1204 1205 /* Handle MDEV-12873 InnoDB SYS_TABLES.TYPE incompatibility 1206 for PAGE_COMPRESSED=YES in MariaDB 10.2.2 to 10.2.6. 1207 1208 MariaDB 10.2.2 introduced the SHARED_SPACE flag from MySQL 5.7, 1209 shifting the flags PAGE_COMPRESSION, PAGE_COMPRESSION_LEVEL, 1210 ATOMIC_WRITES (repurposed to NO_ROLLBACK in 10.3.1) by one bit. 1211 The SHARED_SPACE flag would always 1212 be written as 0 by MariaDB, because MariaDB does not support 1213 CREATE TABLESPACE or CREATE TABLE...TABLESPACE for InnoDB. 1214 1215 So, instead of the bits AALLLLCxxxxxxx we would have 1216 AALLLLC0xxxxxxx if the table was created with MariaDB 10.2.2 1217 to 10.2.6. (AA=ATOMIC_WRITES, LLLL=PAGE_COMPRESSION_LEVEL, 1218 C=PAGE_COMPRESSED, xxxxxxx=7 bits that were not moved.) 1219 1220 The case LLLLC=00000 is not a problem. The problem is the case 1221 AALLLL10DB00001 where D is the (mostly ignored) DATA_DIRECTORY 1222 flag and B is the ATOMIC_BLOBS flag (1 for ROW_FORMAT=DYNAMIC 1223 and 0 for ROW_FORMAT=COMPACT in this case). Other low-order 1224 bits must be so, because PAGE_COMPRESSED=YES is only allowed 1225 for ROW_FORMAT=DYNAMIC and ROW_FORMAT=COMPACT, not for 1226 ROW_FORMAT=REDUNDANT or ROW_FORMAT=COMPRESSED. 1227 1228 Starting with MariaDB 10.2.4, the flags would be 1229 00LLLL10DB00001, because ATOMIC_WRITES is always written as 0. 1230 1231 We will concentrate on the PAGE_COMPRESSION_LEVEL and 1232 PAGE_COMPRESSED=YES. PAGE_COMPRESSED=NO implies 1233 PAGE_COMPRESSION_LEVEL=0, and in that case all the affected 1234 bits will be 0. For PAGE_COMPRESSED=YES, the values 1..9 are 1235 allowed for PAGE_COMPRESSION_LEVEL. That is, we must interpret 1236 the bits AALLLL10DB00001 as AALLLL1DB00001. 1237 1238 If someone created a table in MariaDB 10.2.2 or 10.2.3 with 1239 the attribute ATOMIC_WRITES=OFF (value 2) and without 1240 PAGE_COMPRESSED=YES or PAGE_COMPRESSION_LEVEL, that should be 1241 rejected. The value ATOMIC_WRITES=ON (1) would look like 1242 ATOMIC_WRITES=OFF, but it would be ignored starting with 1243 MariaDB 10.2.4. */ 1244 compile_time_assert(DICT_TF_POS_PAGE_COMPRESSION == 7); 1245 compile_time_assert(DICT_TF_POS_UNUSED == 14); 1246 1247 if ((type & 0x19f) != 0x101) { 1248 /* The table cannot have been created with MariaDB 1249 10.2.2 to 10.2.6, because they would write the 1250 low-order bits of SYS_TABLES.TYPE as 0b10xx00001 for 1251 PAGE_COMPRESSED=YES. No adjustment is applicable. */ 1252 } else if (type >= 3 << 13) { 1253 /* 10.2.2 and 10.2.3 write ATOMIC_WRITES less than 3, 1254 and no other flags above that can be set for the 1255 SYS_TABLES.TYPE to be in the 10.2.2..10.2.6 format. 1256 This would in any case be invalid format for 10.2 and 1257 earlier releases. */ 1258 ut_ad(!dict_sys_tables_type_valid(type, true)); 1259 } else { 1260 /* SYS_TABLES.TYPE is of the form AALLLL10DB00001. We 1261 must still validate that the LLLL bits are between 0 1262 and 9 before we can discard the extraneous 0 bit. */ 1263 ut_ad(!DICT_TF_GET_PAGE_COMPRESSION(type)); 1264 1265 if ((((type >> 9) & 0xf) - 1) < 9) { 1266 ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) & 1); 1267 1268 type = (type & 0x7fU) | (type >> 1 & ~0x7fU); 1269 1270 ut_ad(DICT_TF_GET_PAGE_COMPRESSION(type)); 1271 ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) >= 1); 1272 ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) <= 9); 1273 } else { 1274 ut_ad(!dict_sys_tables_type_valid(type, true)); 1275 } 1276 } 1277 1278 /* The low order bit of SYS_TABLES.TYPE is always set to 1. But in 1279 dict_table_t::flags the low order bit is used to determine if the 1280 ROW_FORMAT=REDUNDANT (0) or anything else (1). 1281 Read the 4 byte N_COLS field and look at the high order bit. It 1282 should be set for COMPACT and later. It should not be set for 1283 REDUNDANT. */ 1284 field = rec_get_nth_field_old( 1285 rec, DICT_FLD__SYS_TABLES__N_COLS, &len); 1286 ut_a(len == 4); 1287 *n_cols = mach_read_from_4(field); 1288 1289 const bool not_redundant = 0 != (*n_cols & DICT_N_COLS_COMPACT); 1290 1291 if (!dict_sys_tables_type_valid(type, not_redundant)) { 1292 ib::error() << "Table " << table_name << " in InnoDB" 1293 " data dictionary contains invalid flags." 1294 " SYS_TABLES.TYPE=" << type << 1295 " SYS_TABLES.N_COLS=" << *n_cols; 1296 return(false); 1297 } 1298 1299 *flags = dict_sys_tables_type_to_tf(type, not_redundant); 1300 1301 /* For tables created before MySQL 4.1, there may be 1302 garbage in SYS_TABLES.MIX_LEN where flags2 are found. Such tables 1303 would always be in ROW_FORMAT=REDUNDANT which do not have the 1304 high bit set in n_cols, and flags would be zero. 1305 MySQL 4.1 was the first version to support innodb_file_per_table, 1306 that is, *space_id != 0. */ 1307 if (not_redundant || *space_id != 0 || *n_cols & DICT_N_COLS_COMPACT) { 1308 1309 /* Get flags2 from SYS_TABLES.MIX_LEN */ 1310 field = rec_get_nth_field_old( 1311 rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len); 1312 *flags2 = mach_read_from_4(field); 1313 1314 if (!dict_tf2_is_valid(*flags, *flags2)) { 1315 ib::error() << "Table " << table_name << " in InnoDB" 1316 " data dictionary contains invalid flags." 1317 " SYS_TABLES.TYPE=" << type 1318 << " SYS_TABLES.MIX_LEN=" << *flags2; 1319 return(false); 1320 } 1321 1322 /* DICT_TF2_FTS will be set when indexes are being loaded */ 1323 *flags2 &= ~DICT_TF2_FTS; 1324 1325 /* Now that we have used this bit, unset it. */ 1326 *n_cols &= ~DICT_N_COLS_COMPACT; 1327 } else { 1328 *flags2 = 0; 1329 } 1330 1331 return(true); 1332 } 1333 1334 /** Load and check each non-predefined tablespace mentioned in SYS_TABLES. 1335 Search SYS_TABLES and check each tablespace mentioned that has not 1336 already been added to the fil_system. If it is valid, add it to the 1337 file_system list. 1338 @return the highest space ID found. */ 1339 static ulint dict_check_sys_tables() 1340 { 1341 ulint max_space_id = 0; 1342 btr_pcur_t pcur; 1343 const rec_t* rec; 1344 mtr_t mtr; 1345 1346 DBUG_ENTER("dict_check_sys_tables"); 1347 1348 ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X)); 1349 ut_ad(mutex_own(&dict_sys->mutex)); 1350 1351 mtr_start(&mtr); 1352 1353 /* Before traversing SYS_TABLES, let's make sure we have 1354 SYS_TABLESPACES and SYS_DATAFILES loaded. */ 1355 dict_table_t* sys_tablespaces; 1356 dict_table_t* sys_datafiles; 1357 sys_tablespaces = dict_table_get_low("SYS_TABLESPACES"); 1358 ut_a(sys_tablespaces != NULL); 1359 sys_datafiles = dict_table_get_low("SYS_DATAFILES"); 1360 ut_a(sys_datafiles != NULL); 1361 1362 for (rec = dict_startscan_system(&pcur, &mtr, SYS_TABLES); 1363 rec != NULL; 1364 mtr.commit(), mtr.start(), 1365 rec = dict_getnext_system(&pcur, &mtr)) { 1366 const byte* field; 1367 ulint len; 1368 table_id_t table_id; 1369 ulint space_id; 1370 ulint n_cols; 1371 ulint flags; 1372 ulint flags2; 1373 1374 /* If a table record is not useable, ignore it and continue 1375 on to the next record. Error messages were logged. */ 1376 if (dict_sys_tables_rec_check(rec) != NULL) { 1377 continue; 1378 } 1379 1380 /* Copy the table name from rec */ 1381 field = rec_get_nth_field_old( 1382 rec, DICT_FLD__SYS_TABLES__NAME, &len); 1383 1384 table_name_t table_name(mem_strdupl((char*) field, len)); 1385 DBUG_PRINT("dict_check_sys_tables", 1386 ("name: %p, '%s'", table_name.m_name, 1387 table_name.m_name)); 1388 1389 if (!dict_sys_tables_rec_read(rec, table_name, 1390 &table_id, &space_id, 1391 &n_cols, &flags, &flags2) 1392 || space_id == TRX_SYS_SPACE) { 1393 next: 1394 ut_free(table_name.m_name); 1395 continue; 1396 } 1397 1398 if (strstr(table_name.m_name, "/" TEMP_FILE_PREFIX "-")) { 1399 /* This table will be dropped by 1400 row_mysql_drop_garbage_tables(). 1401 We do not care if the file exists. */ 1402 goto next; 1403 } 1404 1405 if (flags2 & DICT_TF2_DISCARDED) { 1406 ib::info() << "Ignoring tablespace for " << table_name 1407 << " because the DISCARD flag is set ."; 1408 goto next; 1409 } 1410 1411 /* For tables or partitions using .ibd files, the flag 1412 DICT_TF2_USE_FILE_PER_TABLE was not set in MIX_LEN 1413 before MySQL 5.6.5. The flag should not have been 1414 introduced in persistent storage. MariaDB will keep 1415 setting the flag when writing SYS_TABLES entries for 1416 newly created or rebuilt tables or partitions, but 1417 will otherwise ignore the flag. */ 1418 1419 /* Now that we have the proper name for this tablespace, 1420 look to see if it is already in the tablespace cache. */ 1421 if (const fil_space_t* space 1422 = fil_space_for_table_exists_in_mem( 1423 space_id, table_name.m_name, flags)) { 1424 /* Recovery can open a datafile that does not 1425 match SYS_DATAFILES. If they don't match, update 1426 SYS_DATAFILES. */ 1427 char *dict_path = dict_get_first_path(space_id); 1428 const char *fil_path = space->chain.start->name; 1429 if (dict_path 1430 && strcmp(dict_path, fil_path)) { 1431 dict_update_filepath(space_id, fil_path); 1432 } 1433 ut_free(dict_path); 1434 ut_free(table_name.m_name); 1435 continue; 1436 } 1437 1438 /* Set the expected filepath from the data dictionary. 1439 If the file is found elsewhere (from an ISL or the default 1440 location) or this path is the same file but looks different, 1441 fil_ibd_open() will update the dictionary with what is 1442 opened. */ 1443 char* filepath = dict_get_first_path(space_id); 1444 1445 /* Check that the .ibd file exists. */ 1446 if (!fil_ibd_open( 1447 false, 1448 !srv_read_only_mode && srv_log_file_size != 0, 1449 FIL_TYPE_TABLESPACE, 1450 space_id, dict_tf_to_fsp_flags(flags), 1451 table_name, filepath)) { 1452 ib::warn() << "Ignoring tablespace for " 1453 << table_name 1454 << " because it could not be opened."; 1455 } 1456 1457 max_space_id = ut_max(max_space_id, space_id); 1458 1459 ut_free(table_name.m_name); 1460 ut_free(filepath); 1461 } 1462 1463 mtr_commit(&mtr); 1464 1465 DBUG_RETURN(max_space_id); 1466 } 1467 1468 /** Check each tablespace found in the data dictionary. 1469 Then look at each table defined in SYS_TABLES that has a space_id > 0 1470 to find all the file-per-table tablespaces. 1471 1472 In a crash recovery we already have some tablespace objects created from 1473 processing the REDO log. Any other tablespace in SYS_TABLESPACES not 1474 previously used in recovery will be opened here. We will compare the 1475 space_id information in the data dictionary to what we find in the 1476 tablespace file. In addition, more validation will be done if recovery 1477 was needed and force_recovery is not set. 1478 1479 We also scan the biggest space id, and store it to fil_system. */ 1480 void dict_check_tablespaces_and_store_max_id() 1481 { 1482 mtr_t mtr; 1483 1484 DBUG_ENTER("dict_check_tablespaces_and_store_max_id"); 1485 1486 rw_lock_x_lock(&dict_operation_lock); 1487 mutex_enter(&dict_sys->mutex); 1488 1489 /* Initialize the max space_id from sys header */ 1490 mtr_start(&mtr); 1491 ulint max_space_id = mtr_read_ulint( 1492 dict_hdr_get(&mtr) + DICT_HDR_MAX_SPACE_ID, 1493 MLOG_4BYTES, &mtr); 1494 mtr_commit(&mtr); 1495 1496 fil_set_max_space_id_if_bigger(max_space_id); 1497 1498 /* Open all tablespaces referenced in SYS_TABLES. 1499 This will update SYS_TABLESPACES and SYS_DATAFILES if it 1500 finds any file-per-table tablespaces not already there. */ 1501 max_space_id = dict_check_sys_tables(); 1502 fil_set_max_space_id_if_bigger(max_space_id); 1503 1504 mutex_exit(&dict_sys->mutex); 1505 rw_lock_x_unlock(&dict_operation_lock); 1506 1507 DBUG_VOID_RETURN; 1508 } 1509 1510 /** Error message for a delete-marked record in dict_load_column_low() */ 1511 static const char* dict_load_column_del = "delete-marked record in SYS_COLUMN"; 1512 1513 /** Load a table column definition from a SYS_COLUMNS record to dict_table_t. 1514 @return error message 1515 @retval NULL on success */ 1516 static 1517 const char* 1518 dict_load_column_low( 1519 dict_table_t* table, /*!< in/out: table, could be NULL 1520 if we just populate a dict_column_t 1521 struct with information from 1522 a SYS_COLUMNS record */ 1523 mem_heap_t* heap, /*!< in/out: memory heap 1524 for temporary storage */ 1525 dict_col_t* column, /*!< out: dict_column_t to fill, 1526 or NULL if table != NULL */ 1527 table_id_t* table_id, /*!< out: table id */ 1528 const char** col_name, /*!< out: column name */ 1529 const rec_t* rec, /*!< in: SYS_COLUMNS record */ 1530 ulint* nth_v_col) /*!< out: if not NULL, this 1531 records the "n" of "nth" virtual 1532 column */ 1533 { 1534 char* name; 1535 const byte* field; 1536 ulint len; 1537 ulint mtype; 1538 ulint prtype; 1539 ulint col_len; 1540 ulint pos; 1541 ulint num_base; 1542 1543 ut_ad(!table == !!column); 1544 1545 if (rec_get_deleted_flag(rec, 0)) { 1546 return(dict_load_column_del); 1547 } 1548 1549 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_COLUMNS) { 1550 return("wrong number of columns in SYS_COLUMNS record"); 1551 } 1552 1553 field = rec_get_nth_field_old( 1554 rec, DICT_FLD__SYS_COLUMNS__TABLE_ID, &len); 1555 if (len != 8) { 1556 err_len: 1557 return("incorrect column length in SYS_COLUMNS"); 1558 } 1559 1560 if (table_id) { 1561 *table_id = mach_read_from_8(field); 1562 } else if (table->id != mach_read_from_8(field)) { 1563 return("SYS_COLUMNS.TABLE_ID mismatch"); 1564 } 1565 1566 field = rec_get_nth_field_old( 1567 rec, DICT_FLD__SYS_COLUMNS__POS, &len); 1568 if (len != 4) { 1569 goto err_len; 1570 } 1571 1572 pos = mach_read_from_4(field); 1573 1574 rec_get_nth_field_offs_old( 1575 rec, DICT_FLD__SYS_COLUMNS__DB_TRX_ID, &len); 1576 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) { 1577 goto err_len; 1578 } 1579 rec_get_nth_field_offs_old( 1580 rec, DICT_FLD__SYS_COLUMNS__DB_ROLL_PTR, &len); 1581 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) { 1582 goto err_len; 1583 } 1584 1585 field = rec_get_nth_field_old( 1586 rec, DICT_FLD__SYS_COLUMNS__NAME, &len); 1587 if (len == 0 || len == UNIV_SQL_NULL) { 1588 goto err_len; 1589 } 1590 1591 name = mem_heap_strdupl(heap, (const char*) field, len); 1592 1593 if (col_name) { 1594 *col_name = name; 1595 } 1596 1597 field = rec_get_nth_field_old( 1598 rec, DICT_FLD__SYS_COLUMNS__MTYPE, &len); 1599 if (len != 4) { 1600 goto err_len; 1601 } 1602 1603 mtype = mach_read_from_4(field); 1604 1605 field = rec_get_nth_field_old( 1606 rec, DICT_FLD__SYS_COLUMNS__PRTYPE, &len); 1607 if (len != 4) { 1608 goto err_len; 1609 } 1610 prtype = mach_read_from_4(field); 1611 1612 if (dtype_get_charset_coll(prtype) == 0 1613 && dtype_is_string_type(mtype)) { 1614 /* The table was created with < 4.1.2. */ 1615 1616 if (dtype_is_binary_string_type(mtype, prtype)) { 1617 /* Use the binary collation for 1618 string columns of binary type. */ 1619 1620 prtype = dtype_form_prtype( 1621 prtype, 1622 DATA_MYSQL_BINARY_CHARSET_COLL); 1623 } else { 1624 /* Use the default charset for 1625 other than binary columns. */ 1626 1627 prtype = dtype_form_prtype( 1628 prtype, 1629 data_mysql_default_charset_coll); 1630 } 1631 } 1632 1633 if (table && table->n_def != pos && !(prtype & DATA_VIRTUAL)) { 1634 return("SYS_COLUMNS.POS mismatch"); 1635 } 1636 1637 field = rec_get_nth_field_old( 1638 rec, DICT_FLD__SYS_COLUMNS__LEN, &len); 1639 if (len != 4) { 1640 goto err_len; 1641 } 1642 col_len = mach_read_from_4(field); 1643 field = rec_get_nth_field_old( 1644 rec, DICT_FLD__SYS_COLUMNS__PREC, &len); 1645 if (len != 4) { 1646 goto err_len; 1647 } 1648 num_base = mach_read_from_4(field); 1649 1650 if (table) { 1651 if (prtype & DATA_VIRTUAL) { 1652 #ifdef UNIV_DEBUG 1653 dict_v_col_t* vcol = 1654 #endif 1655 dict_mem_table_add_v_col( 1656 table, heap, name, mtype, 1657 prtype, col_len, 1658 dict_get_v_col_mysql_pos(pos), num_base); 1659 ut_ad(vcol->v_pos == dict_get_v_col_pos(pos)); 1660 } else { 1661 ut_ad(num_base == 0); 1662 dict_mem_table_add_col(table, heap, name, mtype, 1663 prtype, col_len); 1664 } 1665 } else { 1666 dict_mem_fill_column_struct(column, pos, mtype, 1667 prtype, col_len); 1668 } 1669 1670 /* Report the virtual column number */ 1671 if ((prtype & DATA_VIRTUAL) && nth_v_col != NULL) { 1672 *nth_v_col = dict_get_v_col_pos(pos); 1673 } 1674 1675 return(NULL); 1676 } 1677 1678 /** Error message for a delete-marked record in dict_load_virtual_low() */ 1679 static const char* dict_load_virtual_del = "delete-marked record in SYS_VIRTUAL"; 1680 1681 /** Load a virtual column "mapping" (to base columns) information 1682 from a SYS_VIRTUAL record 1683 @param[in,out] table table 1684 @param[in,out] column mapped base column's dict_column_t 1685 @param[in,out] table_id table id 1686 @param[in,out] pos virtual column position 1687 @param[in,out] base_pos base column position 1688 @param[in] rec SYS_VIRTUAL record 1689 @return error message 1690 @retval NULL on success */ 1691 static 1692 const char* 1693 dict_load_virtual_low( 1694 dict_table_t* table, 1695 dict_col_t** column, 1696 table_id_t* table_id, 1697 ulint* pos, 1698 ulint* base_pos, 1699 const rec_t* rec) 1700 { 1701 const byte* field; 1702 ulint len; 1703 ulint base; 1704 1705 if (rec_get_deleted_flag(rec, 0)) { 1706 return(dict_load_virtual_del); 1707 } 1708 1709 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_VIRTUAL) { 1710 return("wrong number of columns in SYS_VIRTUAL record"); 1711 } 1712 1713 field = rec_get_nth_field_old( 1714 rec, DICT_FLD__SYS_VIRTUAL__TABLE_ID, &len); 1715 if (len != 8) { 1716 err_len: 1717 return("incorrect column length in SYS_VIRTUAL"); 1718 } 1719 1720 if (table_id != NULL) { 1721 *table_id = mach_read_from_8(field); 1722 } else if (table->id != mach_read_from_8(field)) { 1723 return("SYS_VIRTUAL.TABLE_ID mismatch"); 1724 } 1725 1726 field = rec_get_nth_field_old( 1727 rec, DICT_FLD__SYS_VIRTUAL__POS, &len); 1728 if (len != 4) { 1729 goto err_len; 1730 } 1731 1732 if (pos != NULL) { 1733 *pos = mach_read_from_4(field); 1734 } 1735 1736 field = rec_get_nth_field_old( 1737 rec, DICT_FLD__SYS_VIRTUAL__BASE_POS, &len); 1738 if (len != 4) { 1739 goto err_len; 1740 } 1741 1742 base = mach_read_from_4(field); 1743 1744 if (base_pos != NULL) { 1745 *base_pos = base; 1746 } 1747 1748 rec_get_nth_field_offs_old( 1749 rec, DICT_FLD__SYS_VIRTUAL__DB_TRX_ID, &len); 1750 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) { 1751 goto err_len; 1752 } 1753 1754 rec_get_nth_field_offs_old( 1755 rec, DICT_FLD__SYS_VIRTUAL__DB_ROLL_PTR, &len); 1756 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) { 1757 goto err_len; 1758 } 1759 1760 if (column != NULL) { 1761 *column = dict_table_get_nth_col(table, base); 1762 } 1763 1764 return(NULL); 1765 } 1766 1767 /********************************************************************//** 1768 Loads definitions for table columns. */ 1769 static 1770 void 1771 dict_load_columns( 1772 /*==============*/ 1773 dict_table_t* table, /*!< in/out: table */ 1774 mem_heap_t* heap) /*!< in/out: memory heap 1775 for temporary storage */ 1776 { 1777 dict_table_t* sys_columns; 1778 dict_index_t* sys_index; 1779 btr_pcur_t pcur; 1780 dtuple_t* tuple; 1781 dfield_t* dfield; 1782 const rec_t* rec; 1783 byte* buf; 1784 ulint i; 1785 mtr_t mtr; 1786 ulint n_skipped = 0; 1787 1788 ut_ad(mutex_own(&dict_sys->mutex)); 1789 1790 mtr_start(&mtr); 1791 1792 sys_columns = dict_table_get_low("SYS_COLUMNS"); 1793 sys_index = UT_LIST_GET_FIRST(sys_columns->indexes); 1794 ut_ad(!dict_table_is_comp(sys_columns)); 1795 1796 ut_ad(name_of_col_is(sys_columns, sys_index, 1797 DICT_FLD__SYS_COLUMNS__NAME, "NAME")); 1798 ut_ad(name_of_col_is(sys_columns, sys_index, 1799 DICT_FLD__SYS_COLUMNS__PREC, "PREC")); 1800 1801 tuple = dtuple_create(heap, 1); 1802 dfield = dtuple_get_nth_field(tuple, 0); 1803 1804 buf = static_cast<byte*>(mem_heap_alloc(heap, 8)); 1805 mach_write_to_8(buf, table->id); 1806 1807 dfield_set_data(dfield, buf, 8); 1808 dict_index_copy_types(tuple, sys_index, 1); 1809 1810 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE, 1811 BTR_SEARCH_LEAF, &pcur, &mtr); 1812 1813 ut_ad(table->n_t_cols == static_cast<ulint>( 1814 table->n_cols) + static_cast<ulint>(table->n_v_cols)); 1815 1816 for (i = 0; 1817 i + DATA_N_SYS_COLS < table->n_t_cols + n_skipped; 1818 i++) { 1819 const char* err_msg; 1820 const char* name = NULL; 1821 ulint nth_v_col = ULINT_UNDEFINED; 1822 1823 rec = btr_pcur_get_rec(&pcur); 1824 1825 ut_a(btr_pcur_is_on_user_rec(&pcur)); 1826 1827 err_msg = dict_load_column_low(table, heap, NULL, NULL, 1828 &name, rec, &nth_v_col); 1829 1830 if (err_msg == dict_load_column_del) { 1831 n_skipped++; 1832 goto next_rec; 1833 } else if (err_msg) { 1834 ib::fatal() << err_msg; 1835 } 1836 1837 /* Note: Currently we have one DOC_ID column that is 1838 shared by all FTS indexes on a table. And only non-virtual 1839 column can be used for FULLTEXT index */ 1840 if (innobase_strcasecmp(name, 1841 FTS_DOC_ID_COL_NAME) == 0 1842 && nth_v_col == ULINT_UNDEFINED) { 1843 dict_col_t* col; 1844 /* As part of normal loading of tables the 1845 table->flag is not set for tables with FTS 1846 till after the FTS indexes are loaded. So we 1847 create the fts_t instance here if there isn't 1848 one already created. 1849 1850 This case does not arise for table create as 1851 the flag is set before the table is created. */ 1852 if (table->fts == NULL) { 1853 table->fts = fts_create(table); 1854 fts_optimize_add_table(table); 1855 } 1856 1857 ut_a(table->fts->doc_col == ULINT_UNDEFINED); 1858 1859 col = dict_table_get_nth_col(table, i - n_skipped); 1860 1861 ut_ad(col->len == sizeof(doc_id_t)); 1862 1863 if (col->prtype & DATA_FTS_DOC_ID) { 1864 DICT_TF2_FLAG_SET( 1865 table, DICT_TF2_FTS_HAS_DOC_ID); 1866 DICT_TF2_FLAG_UNSET( 1867 table, DICT_TF2_FTS_ADD_DOC_ID); 1868 } 1869 1870 table->fts->doc_col = i - n_skipped; 1871 } 1872 next_rec: 1873 btr_pcur_move_to_next_user_rec(&pcur, &mtr); 1874 } 1875 1876 btr_pcur_close(&pcur); 1877 mtr_commit(&mtr); 1878 } 1879 1880 /** Loads SYS_VIRTUAL info for one virtual column 1881 @param[in,out] table table 1882 @param[in] nth_v_col virtual column sequence num 1883 @param[in,out] v_col virtual column 1884 @param[in,out] heap memory heap 1885 */ 1886 static 1887 void 1888 dict_load_virtual_one_col( 1889 dict_table_t* table, 1890 ulint nth_v_col, 1891 dict_v_col_t* v_col, 1892 mem_heap_t* heap) 1893 { 1894 dict_table_t* sys_virtual; 1895 dict_index_t* sys_virtual_index; 1896 btr_pcur_t pcur; 1897 dtuple_t* tuple; 1898 dfield_t* dfield; 1899 const rec_t* rec; 1900 byte* buf; 1901 ulint i = 0; 1902 mtr_t mtr; 1903 ulint skipped = 0; 1904 1905 ut_ad(mutex_own(&dict_sys->mutex)); 1906 1907 if (v_col->num_base == 0) { 1908 return; 1909 } 1910 1911 mtr_start(&mtr); 1912 1913 sys_virtual = dict_table_get_low("SYS_VIRTUAL"); 1914 sys_virtual_index = UT_LIST_GET_FIRST(sys_virtual->indexes); 1915 ut_ad(!dict_table_is_comp(sys_virtual)); 1916 1917 ut_ad(name_of_col_is(sys_virtual, sys_virtual_index, 1918 DICT_FLD__SYS_VIRTUAL__POS, "POS")); 1919 1920 tuple = dtuple_create(heap, 2); 1921 1922 /* table ID field */ 1923 dfield = dtuple_get_nth_field(tuple, 0); 1924 1925 buf = static_cast<byte*>(mem_heap_alloc(heap, 8)); 1926 mach_write_to_8(buf, table->id); 1927 1928 dfield_set_data(dfield, buf, 8); 1929 1930 /* virtual column pos field */ 1931 dfield = dtuple_get_nth_field(tuple, 1); 1932 1933 buf = static_cast<byte*>(mem_heap_alloc(heap, 4)); 1934 ulint vcol_pos = dict_create_v_col_pos(nth_v_col, v_col->m_col.ind); 1935 mach_write_to_4(buf, vcol_pos); 1936 1937 dfield_set_data(dfield, buf, 4); 1938 1939 dict_index_copy_types(tuple, sys_virtual_index, 2); 1940 1941 btr_pcur_open_on_user_rec(sys_virtual_index, tuple, PAGE_CUR_GE, 1942 BTR_SEARCH_LEAF, &pcur, &mtr); 1943 1944 for (i = 0; i < v_col->num_base + skipped; i++) { 1945 const char* err_msg; 1946 ulint pos; 1947 1948 ut_ad(btr_pcur_is_on_user_rec(&pcur)); 1949 1950 rec = btr_pcur_get_rec(&pcur); 1951 1952 ut_a(btr_pcur_is_on_user_rec(&pcur)); 1953 1954 err_msg = dict_load_virtual_low(table, 1955 &v_col->base_col[i - skipped], 1956 NULL, 1957 &pos, NULL, rec); 1958 1959 if (err_msg) { 1960 if (err_msg != dict_load_virtual_del) { 1961 ib::fatal() << err_msg; 1962 } else { 1963 skipped++; 1964 } 1965 } else { 1966 ut_ad(pos == vcol_pos); 1967 } 1968 1969 btr_pcur_move_to_next_user_rec(&pcur, &mtr); 1970 } 1971 1972 btr_pcur_close(&pcur); 1973 mtr_commit(&mtr); 1974 } 1975 1976 /** Loads info from SYS_VIRTUAL for virtual columns. 1977 @param[in,out] table table 1978 @param[in] heap memory heap 1979 */ 1980 static 1981 void 1982 dict_load_virtual( 1983 dict_table_t* table, 1984 mem_heap_t* heap) 1985 { 1986 for (ulint i = 0; i < table->n_v_cols; i++) { 1987 dict_v_col_t* v_col = dict_table_get_nth_v_col(table, i); 1988 1989 dict_load_virtual_one_col(table, i, v_col, heap); 1990 } 1991 } 1992 1993 /** Error message for a delete-marked record in dict_load_field_low() */ 1994 static const char* dict_load_field_del = "delete-marked record in SYS_FIELDS"; 1995 1996 /** Load an index field definition from a SYS_FIELDS record to dict_index_t. 1997 @return error message 1998 @retval NULL on success */ 1999 static 2000 const char* 2001 dict_load_field_low( 2002 byte* index_id, /*!< in/out: index id (8 bytes) 2003 an "in" value if index != NULL 2004 and "out" if index == NULL */ 2005 dict_index_t* index, /*!< in/out: index, could be NULL 2006 if we just populate a dict_field_t 2007 struct with information from 2008 a SYS_FIELDS record */ 2009 dict_field_t* sys_field, /*!< out: dict_field_t to be 2010 filled */ 2011 ulint* pos, /*!< out: Field position */ 2012 byte* last_index_id, /*!< in: last index id */ 2013 mem_heap_t* heap, /*!< in/out: memory heap 2014 for temporary storage */ 2015 const rec_t* rec) /*!< in: SYS_FIELDS record */ 2016 { 2017 const byte* field; 2018 ulint len; 2019 unsigned pos_and_prefix_len; 2020 unsigned prefix_len; 2021 bool first_field; 2022 ulint position; 2023 2024 /* Either index or sys_field is supplied, not both */ 2025 ut_a((!index) || (!sys_field)); 2026 2027 if (rec_get_deleted_flag(rec, 0)) { 2028 return(dict_load_field_del); 2029 } 2030 2031 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FIELDS) { 2032 return("wrong number of columns in SYS_FIELDS record"); 2033 } 2034 2035 field = rec_get_nth_field_old( 2036 rec, DICT_FLD__SYS_FIELDS__INDEX_ID, &len); 2037 if (len != 8) { 2038 err_len: 2039 return("incorrect column length in SYS_FIELDS"); 2040 } 2041 2042 if (!index) { 2043 ut_a(last_index_id); 2044 memcpy(index_id, (const char*) field, 8); 2045 first_field = memcmp(index_id, last_index_id, 8); 2046 } else { 2047 first_field = (index->n_def == 0); 2048 if (memcmp(field, index_id, 8)) { 2049 return("SYS_FIELDS.INDEX_ID mismatch"); 2050 } 2051 } 2052 2053 /* The next field stores the field position in the index and a 2054 possible column prefix length if the index field does not 2055 contain the whole column. The storage format is like this: if 2056 there is at least one prefix field in the index, then the HIGH 2057 2 bytes contain the field number (index->n_def) and the low 2 2058 bytes the prefix length for the field. Otherwise the field 2059 number (index->n_def) is contained in the 2 LOW bytes. */ 2060 2061 field = rec_get_nth_field_old( 2062 rec, DICT_FLD__SYS_FIELDS__POS, &len); 2063 if (len != 4) { 2064 goto err_len; 2065 } 2066 2067 pos_and_prefix_len = mach_read_from_4(field); 2068 2069 if (index && UNIV_UNLIKELY 2070 ((pos_and_prefix_len & 0xFFFFUL) != index->n_def 2071 && (pos_and_prefix_len >> 16 & 0xFFFF) != index->n_def)) { 2072 return("SYS_FIELDS.POS mismatch"); 2073 } 2074 2075 if (first_field || pos_and_prefix_len > 0xFFFFUL) { 2076 prefix_len = pos_and_prefix_len & 0xFFFFUL; 2077 position = (pos_and_prefix_len & 0xFFFF0000UL) >> 16; 2078 } else { 2079 prefix_len = 0; 2080 position = pos_and_prefix_len & 0xFFFFUL; 2081 } 2082 2083 rec_get_nth_field_offs_old( 2084 rec, DICT_FLD__SYS_FIELDS__DB_TRX_ID, &len); 2085 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) { 2086 goto err_len; 2087 } 2088 rec_get_nth_field_offs_old( 2089 rec, DICT_FLD__SYS_FIELDS__DB_ROLL_PTR, &len); 2090 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) { 2091 goto err_len; 2092 } 2093 2094 field = rec_get_nth_field_old( 2095 rec, DICT_FLD__SYS_FIELDS__COL_NAME, &len); 2096 if (len == 0 || len == UNIV_SQL_NULL) { 2097 goto err_len; 2098 } 2099 2100 if (index) { 2101 dict_mem_index_add_field( 2102 index, mem_heap_strdupl(heap, (const char*) field, len), 2103 prefix_len); 2104 } else { 2105 ut_a(sys_field); 2106 ut_a(pos); 2107 2108 sys_field->name = mem_heap_strdupl( 2109 heap, (const char*) field, len); 2110 sys_field->prefix_len = prefix_len; 2111 *pos = position; 2112 } 2113 2114 return(NULL); 2115 } 2116 2117 /********************************************************************//** 2118 Loads definitions for index fields. 2119 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption */ 2120 static 2121 ulint 2122 dict_load_fields( 2123 /*=============*/ 2124 dict_index_t* index, /*!< in/out: index whose fields to load */ 2125 mem_heap_t* heap) /*!< in: memory heap for temporary storage */ 2126 { 2127 dict_table_t* sys_fields; 2128 dict_index_t* sys_index; 2129 btr_pcur_t pcur; 2130 dtuple_t* tuple; 2131 dfield_t* dfield; 2132 const rec_t* rec; 2133 byte* buf; 2134 ulint i; 2135 mtr_t mtr; 2136 dberr_t error; 2137 2138 ut_ad(mutex_own(&dict_sys->mutex)); 2139 2140 mtr_start(&mtr); 2141 2142 sys_fields = dict_table_get_low("SYS_FIELDS"); 2143 sys_index = UT_LIST_GET_FIRST(sys_fields->indexes); 2144 ut_ad(!dict_table_is_comp(sys_fields)); 2145 ut_ad(name_of_col_is(sys_fields, sys_index, 2146 DICT_FLD__SYS_FIELDS__COL_NAME, "COL_NAME")); 2147 2148 tuple = dtuple_create(heap, 1); 2149 dfield = dtuple_get_nth_field(tuple, 0); 2150 2151 buf = static_cast<byte*>(mem_heap_alloc(heap, 8)); 2152 mach_write_to_8(buf, index->id); 2153 2154 dfield_set_data(dfield, buf, 8); 2155 dict_index_copy_types(tuple, sys_index, 1); 2156 2157 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE, 2158 BTR_SEARCH_LEAF, &pcur, &mtr); 2159 for (i = 0; i < index->n_fields; i++) { 2160 const char* err_msg; 2161 2162 rec = btr_pcur_get_rec(&pcur); 2163 2164 ut_a(btr_pcur_is_on_user_rec(&pcur)); 2165 2166 err_msg = dict_load_field_low(buf, index, NULL, NULL, NULL, 2167 heap, rec); 2168 2169 if (err_msg == dict_load_field_del) { 2170 /* There could be delete marked records in 2171 SYS_FIELDS because SYS_FIELDS.INDEX_ID can be 2172 updated by ALTER TABLE ADD INDEX. */ 2173 2174 goto next_rec; 2175 } else if (err_msg) { 2176 ib::error() << err_msg; 2177 error = DB_CORRUPTION; 2178 goto func_exit; 2179 } 2180 next_rec: 2181 btr_pcur_move_to_next_user_rec(&pcur, &mtr); 2182 } 2183 2184 error = DB_SUCCESS; 2185 func_exit: 2186 btr_pcur_close(&pcur); 2187 mtr_commit(&mtr); 2188 return(error); 2189 } 2190 2191 /** Error message for a delete-marked record in dict_load_index_low() */ 2192 static const char* dict_load_index_del = "delete-marked record in SYS_INDEXES"; 2193 /** Error message for table->id mismatch in dict_load_index_low() */ 2194 static const char* dict_load_index_id_err = "SYS_INDEXES.TABLE_ID mismatch"; 2195 /** Error message for SYS_TABLES flags mismatch in dict_load_table_low() */ 2196 static const char* dict_load_table_flags = "incorrect flags in SYS_TABLES"; 2197 2198 /** Load an index definition from a SYS_INDEXES record to dict_index_t. 2199 If allocate=TRUE, we will create a dict_index_t structure and fill it 2200 accordingly. If allocated=FALSE, the dict_index_t will be supplied by 2201 the caller and filled with information read from the record. 2202 @return error message 2203 @retval NULL on success */ 2204 static 2205 const char* 2206 dict_load_index_low( 2207 byte* table_id, /*!< in/out: table id (8 bytes), 2208 an "in" value if allocate=TRUE 2209 and "out" when allocate=FALSE */ 2210 mem_heap_t* heap, /*!< in/out: temporary memory heap */ 2211 const rec_t* rec, /*!< in: SYS_INDEXES record */ 2212 ibool allocate, /*!< in: TRUE=allocate *index, 2213 FALSE=fill in a pre-allocated 2214 *index */ 2215 dict_index_t** index) /*!< out,own: index, or NULL */ 2216 { 2217 const byte* field; 2218 ulint len; 2219 ulint name_len; 2220 char* name_buf; 2221 index_id_t id; 2222 ulint n_fields; 2223 ulint type; 2224 unsigned merge_threshold; 2225 2226 if (allocate) { 2227 /* If allocate=TRUE, no dict_index_t will 2228 be supplied. Initialize "*index" to NULL */ 2229 *index = NULL; 2230 } 2231 2232 if (rec_get_deleted_flag(rec, 0)) { 2233 return(dict_load_index_del); 2234 } 2235 2236 if (rec_get_n_fields_old(rec) == DICT_NUM_FIELDS__SYS_INDEXES) { 2237 /* MERGE_THRESHOLD exists */ 2238 field = rec_get_nth_field_old( 2239 rec, DICT_FLD__SYS_INDEXES__MERGE_THRESHOLD, &len); 2240 switch (len) { 2241 case 4: 2242 merge_threshold = mach_read_from_4(field); 2243 break; 2244 case UNIV_SQL_NULL: 2245 merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT; 2246 break; 2247 default: 2248 return("incorrect MERGE_THRESHOLD length" 2249 " in SYS_INDEXES"); 2250 } 2251 } else if (rec_get_n_fields_old(rec) 2252 == DICT_NUM_FIELDS__SYS_INDEXES - 1) { 2253 /* MERGE_THRESHOLD doesn't exist */ 2254 2255 merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT; 2256 } else { 2257 return("wrong number of columns in SYS_INDEXES record"); 2258 } 2259 2260 field = rec_get_nth_field_old( 2261 rec, DICT_FLD__SYS_INDEXES__TABLE_ID, &len); 2262 if (len != 8) { 2263 err_len: 2264 return("incorrect column length in SYS_INDEXES"); 2265 } 2266 2267 if (!allocate) { 2268 /* We are reading a SYS_INDEXES record. Copy the table_id */ 2269 memcpy(table_id, (const char*) field, 8); 2270 } else if (memcmp(field, table_id, 8)) { 2271 /* Caller supplied table_id, verify it is the same 2272 id as on the index record */ 2273 return(dict_load_index_id_err); 2274 } 2275 2276 field = rec_get_nth_field_old( 2277 rec, DICT_FLD__SYS_INDEXES__ID, &len); 2278 if (len != 8) { 2279 goto err_len; 2280 } 2281 2282 id = mach_read_from_8(field); 2283 2284 rec_get_nth_field_offs_old( 2285 rec, DICT_FLD__SYS_INDEXES__DB_TRX_ID, &len); 2286 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) { 2287 goto err_len; 2288 } 2289 rec_get_nth_field_offs_old( 2290 rec, DICT_FLD__SYS_INDEXES__DB_ROLL_PTR, &len); 2291 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) { 2292 goto err_len; 2293 } 2294 2295 field = rec_get_nth_field_old( 2296 rec, DICT_FLD__SYS_INDEXES__NAME, &name_len); 2297 if (name_len == UNIV_SQL_NULL) { 2298 goto err_len; 2299 } 2300 2301 name_buf = mem_heap_strdupl(heap, (const char*) field, 2302 name_len); 2303 2304 field = rec_get_nth_field_old( 2305 rec, DICT_FLD__SYS_INDEXES__N_FIELDS, &len); 2306 if (len != 4) { 2307 goto err_len; 2308 } 2309 n_fields = mach_read_from_4(field); 2310 2311 field = rec_get_nth_field_old( 2312 rec, DICT_FLD__SYS_INDEXES__TYPE, &len); 2313 if (len != 4) { 2314 goto err_len; 2315 } 2316 type = mach_read_from_4(field); 2317 if (type & (~0U << DICT_IT_BITS)) { 2318 return("unknown SYS_INDEXES.TYPE bits"); 2319 } 2320 2321 field = rec_get_nth_field_old( 2322 rec, DICT_FLD__SYS_INDEXES__PAGE_NO, &len); 2323 if (len != 4) { 2324 goto err_len; 2325 } 2326 2327 if (allocate) { 2328 *index = dict_mem_index_create(NULL, name_buf, type, n_fields); 2329 } else { 2330 ut_a(*index); 2331 2332 dict_mem_fill_index_struct(*index, NULL, name_buf, 2333 type, n_fields); 2334 } 2335 2336 (*index)->id = id; 2337 (*index)->page = mach_read_from_4(field); 2338 ut_ad((*index)->page); 2339 (*index)->merge_threshold = merge_threshold; 2340 2341 return(NULL); 2342 } 2343 2344 /********************************************************************//** 2345 Loads definitions for table indexes. Adds them to the data dictionary 2346 cache. 2347 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption of dictionary 2348 table or DB_UNSUPPORTED if table has unknown index type */ 2349 static MY_ATTRIBUTE((nonnull)) 2350 dberr_t 2351 dict_load_indexes( 2352 /*==============*/ 2353 dict_table_t* table, /*!< in/out: table */ 2354 mem_heap_t* heap, /*!< in: memory heap for temporary storage */ 2355 dict_err_ignore_t ignore_err) 2356 /*!< in: error to be ignored when 2357 loading the index definition */ 2358 { 2359 dict_table_t* sys_indexes; 2360 dict_index_t* sys_index; 2361 btr_pcur_t pcur; 2362 dtuple_t* tuple; 2363 dfield_t* dfield; 2364 const rec_t* rec; 2365 byte* buf; 2366 mtr_t mtr; 2367 dberr_t error = DB_SUCCESS; 2368 2369 ut_ad(mutex_own(&dict_sys->mutex)); 2370 2371 mtr_start(&mtr); 2372 2373 sys_indexes = dict_table_get_low("SYS_INDEXES"); 2374 sys_index = UT_LIST_GET_FIRST(sys_indexes->indexes); 2375 ut_ad(!dict_table_is_comp(sys_indexes)); 2376 ut_ad(name_of_col_is(sys_indexes, sys_index, 2377 DICT_FLD__SYS_INDEXES__NAME, "NAME")); 2378 ut_ad(name_of_col_is(sys_indexes, sys_index, 2379 DICT_FLD__SYS_INDEXES__PAGE_NO, "PAGE_NO")); 2380 2381 tuple = dtuple_create(heap, 1); 2382 dfield = dtuple_get_nth_field(tuple, 0); 2383 2384 buf = static_cast<byte*>(mem_heap_alloc(heap, 8)); 2385 mach_write_to_8(buf, table->id); 2386 2387 dfield_set_data(dfield, buf, 8); 2388 dict_index_copy_types(tuple, sys_index, 1); 2389 2390 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE, 2391 BTR_SEARCH_LEAF, &pcur, &mtr); 2392 for (;;) { 2393 dict_index_t* index = NULL; 2394 const char* err_msg; 2395 2396 if (!btr_pcur_is_on_user_rec(&pcur)) { 2397 2398 /* We should allow the table to open even 2399 without index when DICT_ERR_IGNORE_CORRUPT is set. 2400 DICT_ERR_IGNORE_CORRUPT is currently only set 2401 for drop table */ 2402 if (dict_table_get_first_index(table) == NULL 2403 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) { 2404 ib::warn() << "Cannot load table " 2405 << table->name 2406 << " because it has no indexes in" 2407 " InnoDB internal data dictionary."; 2408 error = DB_CORRUPTION; 2409 goto func_exit; 2410 } 2411 2412 break; 2413 } 2414 2415 rec = btr_pcur_get_rec(&pcur); 2416 2417 if ((ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK) 2418 && (rec_get_n_fields_old(rec) 2419 == DICT_NUM_FIELDS__SYS_INDEXES 2420 /* a record for older SYS_INDEXES table 2421 (missing merge_threshold column) is acceptable. */ 2422 || rec_get_n_fields_old(rec) 2423 == DICT_NUM_FIELDS__SYS_INDEXES - 1)) { 2424 const byte* field; 2425 ulint len; 2426 field = rec_get_nth_field_old( 2427 rec, DICT_FLD__SYS_INDEXES__NAME, &len); 2428 2429 if (len != UNIV_SQL_NULL 2430 && static_cast<char>(*field) 2431 == static_cast<char>(*TEMP_INDEX_PREFIX_STR)) { 2432 /* Skip indexes whose name starts with 2433 TEMP_INDEX_PREFIX_STR, because they will 2434 be dropped by row_merge_drop_temp_indexes() 2435 during crash recovery. */ 2436 goto next_rec; 2437 } 2438 } 2439 2440 err_msg = dict_load_index_low(buf, heap, rec, TRUE, &index); 2441 ut_ad((index == NULL && err_msg != NULL) 2442 || (index != NULL && err_msg == NULL)); 2443 2444 if (err_msg == dict_load_index_id_err) { 2445 /* TABLE_ID mismatch means that we have 2446 run out of index definitions for the table. */ 2447 2448 if (dict_table_get_first_index(table) == NULL 2449 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) { 2450 2451 ib::warn() << "Failed to load the" 2452 " clustered index for table " 2453 << table->name 2454 << " because of the following error: " 2455 << err_msg << "." 2456 " Refusing to load the rest of the" 2457 " indexes (if any) and the whole table" 2458 " altogether."; 2459 error = DB_CORRUPTION; 2460 goto func_exit; 2461 } 2462 2463 break; 2464 } else if (err_msg == dict_load_index_del) { 2465 /* Skip delete-marked records. */ 2466 goto next_rec; 2467 } else if (err_msg) { 2468 ib::error() << err_msg; 2469 if (ignore_err & DICT_ERR_IGNORE_CORRUPT) { 2470 goto next_rec; 2471 } 2472 error = DB_CORRUPTION; 2473 goto func_exit; 2474 } 2475 2476 ut_ad(index); 2477 ut_ad(!dict_index_is_online_ddl(index)); 2478 2479 /* Check whether the index is corrupted */ 2480 if (index->is_corrupted()) { 2481 ib::error() << "Index " << index->name 2482 << " of table " << table->name 2483 << " is corrupted"; 2484 2485 if (!srv_load_corrupted 2486 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT) 2487 && dict_index_is_clust(index)) { 2488 dict_mem_index_free(index); 2489 2490 error = DB_INDEX_CORRUPT; 2491 goto func_exit; 2492 } else { 2493 /* We will load the index if 2494 1) srv_load_corrupted is TRUE 2495 2) ignore_err is set with 2496 DICT_ERR_IGNORE_CORRUPT 2497 3) if the index corrupted is a secondary 2498 index */ 2499 ib::info() << "Load corrupted index " 2500 << index->name 2501 << " of table " << table->name; 2502 } 2503 } 2504 2505 if (index->type & DICT_FTS 2506 && !dict_table_has_fts_index(table)) { 2507 /* This should have been created by now. */ 2508 ut_a(table->fts != NULL); 2509 DICT_TF2_FLAG_SET(table, DICT_TF2_FTS); 2510 } 2511 2512 /* We check for unsupported types first, so that the 2513 subsequent checks are relevant for the supported types. */ 2514 if (index->type & ~(DICT_CLUSTERED | DICT_UNIQUE 2515 | DICT_CORRUPT | DICT_FTS 2516 | DICT_SPATIAL | DICT_VIRTUAL)) { 2517 2518 ib::error() << "Unknown type " << index->type 2519 << " of index " << index->name 2520 << " of table " << table->name; 2521 2522 error = DB_UNSUPPORTED; 2523 dict_mem_index_free(index); 2524 goto func_exit; 2525 } else if (index->page == FIL_NULL 2526 && table->is_readable() 2527 && (!(index->type & DICT_FTS))) { 2528 2529 ib::error() << "Trying to load index " << index->name 2530 << " for table " << table->name 2531 << ", but the index tree has been freed!"; 2532 2533 if (ignore_err & DICT_ERR_IGNORE_INDEX_ROOT) { 2534 /* If caller can tolerate this error, 2535 we will continue to load the index and 2536 let caller deal with this error. However 2537 mark the index and table corrupted. We 2538 only need to mark such in the index 2539 dictionary cache for such metadata corruption, 2540 since we would always be able to set it 2541 when loading the dictionary cache */ 2542 index->table = table; 2543 dict_set_corrupted_index_cache_only(index); 2544 2545 ib::info() << "Index is corrupt but forcing" 2546 " load into data dictionary"; 2547 } else { 2548 corrupted: 2549 dict_mem_index_free(index); 2550 error = DB_CORRUPTION; 2551 goto func_exit; 2552 } 2553 } else if (!dict_index_is_clust(index) 2554 && NULL == dict_table_get_first_index(table)) { 2555 2556 ib::error() << "Trying to load index " << index->name 2557 << " for table " << table->name 2558 << ", but the first index is not clustered!"; 2559 2560 goto corrupted; 2561 } else if (dict_is_sys_table(table->id) 2562 && (dict_index_is_clust(index) 2563 || ((table == dict_sys->sys_tables) 2564 && !strcmp("ID_IND", index->name)))) { 2565 2566 /* The index was created in memory already at booting 2567 of the database server */ 2568 dict_mem_index_free(index); 2569 } else { 2570 dict_load_fields(index, heap); 2571 index->table = table; 2572 2573 /* The data dictionary tables should never contain 2574 invalid index definitions. If we ignored this error 2575 and simply did not load this index definition, the 2576 .frm file would disagree with the index definitions 2577 inside InnoDB. */ 2578 if ((error = dict_index_add_to_cache(index, 2579 index->page)) 2580 != DB_SUCCESS) { 2581 goto func_exit; 2582 } 2583 } 2584 next_rec: 2585 btr_pcur_move_to_next_user_rec(&pcur, &mtr); 2586 } 2587 2588 ut_ad(table->fts_doc_id_index == NULL); 2589 2590 if (table->fts != NULL) { 2591 table->fts_doc_id_index = dict_table_get_index_on_name( 2592 table, FTS_DOC_ID_INDEX_NAME); 2593 } 2594 2595 /* If the table contains FTS indexes, populate table->fts->indexes */ 2596 if (dict_table_has_fts_index(table)) { 2597 ut_ad(table->fts_doc_id_index != NULL); 2598 /* table->fts->indexes should have been created. */ 2599 ut_a(table->fts->indexes != NULL); 2600 dict_table_get_all_fts_indexes(table, table->fts->indexes); 2601 } 2602 2603 func_exit: 2604 btr_pcur_close(&pcur); 2605 mtr_commit(&mtr); 2606 2607 return(error); 2608 } 2609 2610 /** Load a table definition from a SYS_TABLES record to dict_table_t. 2611 Do not load any columns or indexes. 2612 @param[in] name Table name 2613 @param[in] rec SYS_TABLES record 2614 @param[out,own] table table, or NULL 2615 @return error message 2616 @retval NULL on success */ 2617 static const char* dict_load_table_low(const table_name_t& name, 2618 const rec_t* rec, dict_table_t** table) 2619 { 2620 table_id_t table_id; 2621 ulint space_id; 2622 ulint n_cols; 2623 ulint t_num; 2624 ulint flags; 2625 ulint flags2; 2626 ulint n_v_col; 2627 2628 if (const char* error_text = dict_sys_tables_rec_check(rec)) { 2629 *table = NULL; 2630 return(error_text); 2631 } 2632 2633 if (!dict_sys_tables_rec_read(rec, name, &table_id, &space_id, 2634 &t_num, &flags, &flags2)) { 2635 *table = NULL; 2636 return(dict_load_table_flags); 2637 } 2638 2639 dict_table_decode_n_col(t_num, &n_cols, &n_v_col); 2640 2641 *table = dict_mem_table_create( 2642 name.m_name, NULL, n_cols + n_v_col, n_v_col, flags, flags2); 2643 (*table)->space_id = space_id; 2644 (*table)->id = table_id; 2645 (*table)->file_unreadable = !!(flags2 & DICT_TF2_DISCARDED); 2646 2647 return(NULL); 2648 } 2649 2650 /********************************************************************//** 2651 Using the table->heap, copy the null-terminated filepath into 2652 table->data_dir_path and replace the 'databasename/tablename.ibd' 2653 portion with 'tablename'. 2654 This allows SHOW CREATE TABLE to return the correct DATA DIRECTORY path. 2655 Make this data directory path only if it has not yet been saved. */ 2656 static 2657 void 2658 dict_save_data_dir_path( 2659 /*====================*/ 2660 dict_table_t* table, /*!< in/out: table */ 2661 const char* filepath) /*!< in: filepath of tablespace */ 2662 { 2663 ut_ad(mutex_own(&dict_sys->mutex)); 2664 ut_a(DICT_TF_HAS_DATA_DIR(table->flags)); 2665 2666 ut_a(!table->data_dir_path); 2667 ut_a(filepath); 2668 2669 /* Be sure this filepath is not the default filepath. */ 2670 char* default_filepath = fil_make_filepath( 2671 NULL, table->name.m_name, IBD, false); 2672 if (default_filepath) { 2673 if (0 != strcmp(filepath, default_filepath)) { 2674 ulint pathlen = strlen(filepath); 2675 ut_a(pathlen < OS_FILE_MAX_PATH); 2676 ut_a(0 == strcmp(filepath + pathlen - 4, DOT_IBD)); 2677 2678 table->data_dir_path = mem_heap_strdup( 2679 table->heap, filepath); 2680 os_file_make_data_dir_path(table->data_dir_path); 2681 } 2682 2683 ut_free(default_filepath); 2684 } 2685 } 2686 2687 /** Make sure the data_dir_path is saved in dict_table_t if DATA DIRECTORY 2688 was used. Try to read it from the fil_system first, then from SYS_DATAFILES. 2689 @param[in] table Table object 2690 @param[in] dict_mutex_own true if dict_sys->mutex is owned already */ 2691 void 2692 dict_get_and_save_data_dir_path( 2693 dict_table_t* table, 2694 bool dict_mutex_own) 2695 { 2696 ut_ad(!table->is_temporary()); 2697 ut_ad(!table->space || table->space->id == table->space_id); 2698 2699 if (!table->data_dir_path && table->space_id && table->space) { 2700 if (!dict_mutex_own) { 2701 dict_mutex_enter_for_mysql(); 2702 } 2703 2704 table->flags |= (1 << DICT_TF_POS_DATA_DIR); 2705 dict_save_data_dir_path(table, 2706 table->space->chain.start->name); 2707 2708 if (table->data_dir_path == NULL) { 2709 /* Since we did not set the table data_dir_path, 2710 unset the flag. This does not change SYS_DATAFILES 2711 or SYS_TABLES or FSP_FLAGS on the header page of the 2712 tablespace, but it makes dict_table_t consistent. */ 2713 table->flags &= ~DICT_TF_MASK_DATA_DIR; 2714 } 2715 2716 if (!dict_mutex_own) { 2717 dict_mutex_exit_for_mysql(); 2718 } 2719 } 2720 } 2721 2722 /** Loads a table definition and also all its index definitions, and also 2723 the cluster definition if the table is a member in a cluster. Also loads 2724 all foreign key constraints where the foreign key is in the table or where 2725 a foreign key references columns in this table. 2726 @param[in] name Table name in the dbname/tablename format 2727 @param[in] ignore_err Error to be ignored when loading 2728 table and its index definition 2729 @return table, NULL if does not exist; if the table is stored in an 2730 .ibd file, but the file does not exist, then we set the file_unreadable 2731 flag in the table object we return. */ 2732 dict_table_t* dict_load_table(const char* name, dict_err_ignore_t ignore_err) 2733 { 2734 dict_names_t fk_list; 2735 dict_table_t* result; 2736 dict_names_t::iterator i; 2737 2738 DBUG_ENTER("dict_load_table"); 2739 DBUG_PRINT("dict_load_table", ("loading table: '%s'", name)); 2740 2741 ut_ad(mutex_own(&dict_sys->mutex)); 2742 2743 result = dict_table_check_if_in_cache_low(name); 2744 2745 if (!result) { 2746 result = dict_load_table_one(const_cast<char*>(name), 2747 ignore_err, fk_list); 2748 while (!fk_list.empty()) { 2749 if (!dict_table_check_if_in_cache_low(fk_list.front())) 2750 dict_load_table_one( 2751 const_cast<char*>(fk_list.front()), 2752 ignore_err, fk_list); 2753 fk_list.pop_front(); 2754 } 2755 } 2756 2757 DBUG_RETURN(result); 2758 } 2759 2760 /** Opens a tablespace for dict_load_table_one() 2761 @param[in,out] table A table that refers to the tablespace to open 2762 @param[in] ignore_err Whether to ignore an error. */ 2763 UNIV_INLINE 2764 void 2765 dict_load_tablespace( 2766 dict_table_t* table, 2767 dict_err_ignore_t ignore_err) 2768 { 2769 ut_ad(!table->is_temporary()); 2770 ut_ad(!table->space); 2771 ut_ad(table->space_id < SRV_LOG_SPACE_FIRST_ID); 2772 ut_ad(fil_system.sys_space); 2773 2774 if (table->space_id == TRX_SYS_SPACE) { 2775 table->space = fil_system.sys_space; 2776 return; 2777 } 2778 2779 if (table->flags2 & DICT_TF2_DISCARDED) { 2780 ib::warn() << "Tablespace for table " << table->name 2781 << " is set as discarded."; 2782 table->file_unreadable = true; 2783 return; 2784 } 2785 2786 /* The tablespace may already be open. */ 2787 table->space = fil_space_for_table_exists_in_mem( 2788 table->space_id, table->name.m_name, table->flags); 2789 if (table->space) { 2790 return; 2791 } 2792 2793 if (ignore_err == DICT_ERR_IGNORE_DROP) { 2794 table->file_unreadable = true; 2795 return; 2796 } 2797 2798 if (!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)) { 2799 ib::error() << "Failed to find tablespace for table " 2800 << table->name << " in the cache. Attempting" 2801 " to load the tablespace with space id " 2802 << table->space_id; 2803 } 2804 2805 /* Use the remote filepath if needed. This parameter is optional 2806 in the call to fil_ibd_open(). If not supplied, it will be built 2807 from the table->name. */ 2808 char* filepath = NULL; 2809 if (DICT_TF_HAS_DATA_DIR(table->flags)) { 2810 /* This will set table->data_dir_path from either 2811 fil_system or SYS_DATAFILES */ 2812 dict_get_and_save_data_dir_path(table, true); 2813 2814 if (table->data_dir_path) { 2815 filepath = fil_make_filepath( 2816 table->data_dir_path, 2817 table->name.m_name, IBD, true); 2818 } 2819 } 2820 2821 /* Try to open the tablespace. We set the 2nd param (fix_dict) to 2822 false because we do not have an x-lock on dict_operation_lock */ 2823 table->space = fil_ibd_open( 2824 true, false, FIL_TYPE_TABLESPACE, table->space_id, 2825 dict_tf_to_fsp_flags(table->flags), 2826 table->name, filepath); 2827 2828 if (!table->space) { 2829 /* We failed to find a sensible tablespace file */ 2830 table->file_unreadable = true; 2831 } 2832 2833 ut_free(filepath); 2834 } 2835 2836 /** Loads a table definition and also all its index definitions. 2837 2838 Loads those foreign key constraints whose referenced table is already in 2839 dictionary cache. If a foreign key constraint is not loaded, then the 2840 referenced table is pushed into the output stack (fk_tables), if it is not 2841 NULL. These tables must be subsequently loaded so that all the foreign 2842 key constraints are loaded into memory. 2843 2844 @param[in] name Table name in the db/tablename format 2845 @param[in] ignore_err Error to be ignored when loading table 2846 and its index definition 2847 @param[out] fk_tables Related table names that must also be 2848 loaded to ensure that all foreign key 2849 constraints are loaded. 2850 @return table, NULL if does not exist; if the table is stored in an 2851 .ibd file, but the file does not exist, then we set the 2852 file_unreadable flag in the table object we return */ 2853 static 2854 dict_table_t* 2855 dict_load_table_one( 2856 const table_name_t& name, 2857 dict_err_ignore_t ignore_err, 2858 dict_names_t& fk_tables) 2859 { 2860 dberr_t err; 2861 dict_table_t* sys_tables; 2862 btr_pcur_t pcur; 2863 dict_index_t* sys_index; 2864 dtuple_t* tuple; 2865 mem_heap_t* heap; 2866 dfield_t* dfield; 2867 const rec_t* rec; 2868 const byte* field; 2869 ulint len; 2870 mtr_t mtr; 2871 2872 DBUG_ENTER("dict_load_table_one"); 2873 DBUG_PRINT("dict_load_table_one", ("table: %s", name.m_name)); 2874 2875 ut_ad(mutex_own(&dict_sys->mutex)); 2876 2877 heap = mem_heap_create(32000); 2878 2879 mtr_start(&mtr); 2880 2881 sys_tables = dict_table_get_low("SYS_TABLES"); 2882 sys_index = UT_LIST_GET_FIRST(sys_tables->indexes); 2883 ut_ad(!dict_table_is_comp(sys_tables)); 2884 ut_ad(name_of_col_is(sys_tables, sys_index, 2885 DICT_FLD__SYS_TABLES__ID, "ID")); 2886 ut_ad(name_of_col_is(sys_tables, sys_index, 2887 DICT_FLD__SYS_TABLES__N_COLS, "N_COLS")); 2888 ut_ad(name_of_col_is(sys_tables, sys_index, 2889 DICT_FLD__SYS_TABLES__TYPE, "TYPE")); 2890 ut_ad(name_of_col_is(sys_tables, sys_index, 2891 DICT_FLD__SYS_TABLES__MIX_LEN, "MIX_LEN")); 2892 ut_ad(name_of_col_is(sys_tables, sys_index, 2893 DICT_FLD__SYS_TABLES__SPACE, "SPACE")); 2894 2895 tuple = dtuple_create(heap, 1); 2896 dfield = dtuple_get_nth_field(tuple, 0); 2897 2898 dfield_set_data(dfield, name.m_name, ut_strlen(name.m_name)); 2899 dict_index_copy_types(tuple, sys_index, 1); 2900 2901 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE, 2902 BTR_SEARCH_LEAF, &pcur, &mtr); 2903 rec = btr_pcur_get_rec(&pcur); 2904 2905 if (!btr_pcur_is_on_user_rec(&pcur) 2906 || rec_get_deleted_flag(rec, 0)) { 2907 /* Not found */ 2908 err_exit: 2909 btr_pcur_close(&pcur); 2910 mtr_commit(&mtr); 2911 mem_heap_free(heap); 2912 2913 DBUG_RETURN(NULL); 2914 } 2915 2916 field = rec_get_nth_field_old( 2917 rec, DICT_FLD__SYS_TABLES__NAME, &len); 2918 2919 /* Check if the table name in record is the searched one */ 2920 if (len != ut_strlen(name.m_name) 2921 || 0 != ut_memcmp(name.m_name, field, len)) { 2922 2923 goto err_exit; 2924 } 2925 2926 dict_table_t* table; 2927 if (const char* err_msg = dict_load_table_low(name, rec, &table)) { 2928 if (err_msg != dict_load_table_flags) { 2929 ib::error() << err_msg; 2930 } 2931 goto err_exit; 2932 } 2933 2934 btr_pcur_close(&pcur); 2935 mtr_commit(&mtr); 2936 2937 dict_load_tablespace(table, ignore_err); 2938 2939 dict_load_columns(table, heap); 2940 2941 dict_load_virtual(table, heap); 2942 2943 dict_table_add_system_columns(table, heap); 2944 2945 table->can_be_evicted = true; 2946 table->add_to_cache(); 2947 2948 mem_heap_empty(heap); 2949 2950 ut_ad(dict_tf2_is_valid(table->flags, table->flags2)); 2951 2952 /* If there is no tablespace for the table then we only need to 2953 load the index definitions. So that we can IMPORT the tablespace 2954 later. When recovering table locks for resurrected incomplete 2955 transactions, the tablespace should exist, because DDL operations 2956 were not allowed while the table is being locked by a transaction. */ 2957 dict_err_ignore_t index_load_err = 2958 !(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK) 2959 && !table->is_readable() 2960 ? DICT_ERR_IGNORE_ALL 2961 : ignore_err; 2962 2963 err = dict_load_indexes(table, heap, index_load_err); 2964 2965 if (err == DB_INDEX_CORRUPT) { 2966 /* Refuse to load the table if the table has a corrupted 2967 cluster index */ 2968 if (!srv_load_corrupted) { 2969 2970 ib::error() << "Load table " << table->name 2971 << " failed, the table has" 2972 " corrupted clustered indexes. Turn on" 2973 " 'innodb_force_load_corrupted' to drop it"; 2974 dict_table_remove_from_cache(table); 2975 table = NULL; 2976 goto func_exit; 2977 } else { 2978 if (table->indexes.start->is_corrupted()) { 2979 table->corrupted = true; 2980 } 2981 } 2982 } 2983 2984 if (err == DB_SUCCESS && table->is_readable()) { 2985 if (table->space && !fil_space_get_size(table->space_id)) { 2986 corrupted: 2987 table->corrupted = true; 2988 table->file_unreadable = true; 2989 err = DB_CORRUPTION; 2990 } else { 2991 const page_id_t page_id( 2992 table->space->id, 2993 dict_table_get_first_index(table)->page); 2994 mtr.start(); 2995 buf_block_t* block = buf_page_get( 2996 page_id, 2997 dict_table_page_size(table), 2998 RW_S_LATCH, &mtr); 2999 const bool corrupted = !block 3000 || page_get_space_id(block->frame) 3001 != page_id.space() 3002 || page_get_page_no(block->frame) 3003 != page_id.page_no() 3004 || (mach_read_from_2(FIL_PAGE_TYPE 3005 + block->frame) 3006 != FIL_PAGE_INDEX 3007 && mach_read_from_2(FIL_PAGE_TYPE 3008 + block->frame) 3009 != FIL_PAGE_TYPE_INSTANT); 3010 mtr.commit(); 3011 if (corrupted) { 3012 goto corrupted; 3013 } 3014 3015 if (table->supports_instant()) { 3016 err = btr_cur_instant_init(table); 3017 } 3018 } 3019 } 3020 3021 /* Initialize table foreign_child value. Its value could be 3022 changed when dict_load_foreigns() is called below */ 3023 table->fk_max_recusive_level = 0; 3024 3025 /* If the force recovery flag is set, we open the table irrespective 3026 of the error condition, since the user may want to dump data from the 3027 clustered index. However we load the foreign key information only if 3028 all indexes were loaded. */ 3029 if (!table->is_readable()) { 3030 /* Don't attempt to load the indexes from disk. */ 3031 } else if (err == DB_SUCCESS) { 3032 err = dict_load_foreigns(table->name.m_name, NULL, 3033 true, true, 3034 ignore_err, fk_tables); 3035 3036 if (err != DB_SUCCESS) { 3037 ib::warn() << "Load table " << table->name 3038 << " failed, the table has missing" 3039 " foreign key indexes. Turn off" 3040 " 'foreign_key_checks' and try again."; 3041 3042 dict_table_remove_from_cache(table); 3043 table = NULL; 3044 } else { 3045 dict_mem_table_fill_foreign_vcol_set(table); 3046 table->fk_max_recusive_level = 0; 3047 } 3048 } else { 3049 dict_index_t* index; 3050 3051 /* Make sure that at least the clustered index was loaded. 3052 Otherwise refuse to load the table */ 3053 index = dict_table_get_first_index(table); 3054 3055 if (!srv_force_recovery 3056 || !index 3057 || !index->is_primary()) { 3058 dict_table_remove_from_cache(table); 3059 table = NULL; 3060 } else if (index->is_corrupted() 3061 && table->is_readable()) { 3062 /* It is possible we force to load a corrupted 3063 clustered index if srv_load_corrupted is set. 3064 Mark the table as corrupted in this case */ 3065 table->corrupted = true; 3066 } 3067 } 3068 3069 func_exit: 3070 mem_heap_free(heap); 3071 3072 ut_ad(!table 3073 || (ignore_err & ~DICT_ERR_IGNORE_FK_NOKEY) 3074 || !table->is_readable() 3075 || !table->corrupted); 3076 3077 if (table && table->fts) { 3078 if (!(dict_table_has_fts_index(table) 3079 || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID) 3080 || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID))) { 3081 /* the table->fts could be created in dict_load_column 3082 when a user defined FTS_DOC_ID is present, but no 3083 FTS */ 3084 fts_optimize_remove_table(table); 3085 fts_free(table); 3086 } else if (fts_optimize_wq) { 3087 fts_optimize_add_table(table); 3088 } else if (table->can_be_evicted) { 3089 /* fts_optimize_thread is not started yet. 3090 So make the table as non-evictable from cache. */ 3091 dict_table_move_from_lru_to_non_lru(table); 3092 } 3093 } 3094 3095 ut_ad(err != DB_SUCCESS || dict_foreign_set_validate(*table)); 3096 3097 DBUG_RETURN(table); 3098 } 3099 3100 /***********************************************************************//** 3101 Loads a table object based on the table id. 3102 @return table; NULL if table does not exist */ 3103 dict_table_t* 3104 dict_load_table_on_id( 3105 /*==================*/ 3106 table_id_t table_id, /*!< in: table id */ 3107 dict_err_ignore_t ignore_err) /*!< in: errors to ignore 3108 when loading the table */ 3109 { 3110 byte id_buf[8]; 3111 btr_pcur_t pcur; 3112 mem_heap_t* heap; 3113 dtuple_t* tuple; 3114 dfield_t* dfield; 3115 dict_index_t* sys_table_ids; 3116 dict_table_t* sys_tables; 3117 const rec_t* rec; 3118 const byte* field; 3119 ulint len; 3120 dict_table_t* table; 3121 mtr_t mtr; 3122 3123 ut_ad(mutex_own(&dict_sys->mutex)); 3124 3125 table = NULL; 3126 3127 /* NOTE that the operation of this function is protected by 3128 the dictionary mutex, and therefore no deadlocks can occur 3129 with other dictionary operations. */ 3130 3131 mtr_start(&mtr); 3132 /*---------------------------------------------------*/ 3133 /* Get the secondary index based on ID for table SYS_TABLES */ 3134 sys_tables = dict_sys->sys_tables; 3135 sys_table_ids = dict_table_get_next_index( 3136 dict_table_get_first_index(sys_tables)); 3137 ut_ad(!dict_table_is_comp(sys_tables)); 3138 ut_ad(!dict_index_is_clust(sys_table_ids)); 3139 heap = mem_heap_create(256); 3140 3141 tuple = dtuple_create(heap, 1); 3142 dfield = dtuple_get_nth_field(tuple, 0); 3143 3144 /* Write the table id in byte format to id_buf */ 3145 mach_write_to_8(id_buf, table_id); 3146 3147 dfield_set_data(dfield, id_buf, 8); 3148 dict_index_copy_types(tuple, sys_table_ids, 1); 3149 3150 btr_pcur_open_on_user_rec(sys_table_ids, tuple, PAGE_CUR_GE, 3151 BTR_SEARCH_LEAF, &pcur, &mtr); 3152 3153 rec = btr_pcur_get_rec(&pcur); 3154 3155 if (page_rec_is_user_rec(rec)) { 3156 /*---------------------------------------------------*/ 3157 /* Now we have the record in the secondary index 3158 containing the table ID and NAME */ 3159 check_rec: 3160 field = rec_get_nth_field_old( 3161 rec, DICT_FLD__SYS_TABLE_IDS__ID, &len); 3162 ut_ad(len == 8); 3163 3164 /* Check if the table id in record is the one searched for */ 3165 if (table_id == mach_read_from_8(field)) { 3166 if (rec_get_deleted_flag(rec, 0)) { 3167 /* Until purge has completed, there 3168 may be delete-marked duplicate records 3169 for the same SYS_TABLES.ID, but different 3170 SYS_TABLES.NAME. */ 3171 while (btr_pcur_move_to_next(&pcur, &mtr)) { 3172 rec = btr_pcur_get_rec(&pcur); 3173 3174 if (page_rec_is_user_rec(rec)) { 3175 goto check_rec; 3176 } 3177 } 3178 } else { 3179 /* Now we get the table name from the record */ 3180 field = rec_get_nth_field_old(rec, 3181 DICT_FLD__SYS_TABLE_IDS__NAME, &len); 3182 /* Load the table definition to memory */ 3183 char* table_name = mem_heap_strdupl( 3184 heap, (char*) field, len); 3185 table = dict_load_table(table_name, ignore_err); 3186 } 3187 } 3188 } 3189 3190 btr_pcur_close(&pcur); 3191 mtr_commit(&mtr); 3192 mem_heap_free(heap); 3193 3194 return(table); 3195 } 3196 3197 /********************************************************************//** 3198 This function is called when the database is booted. Loads system table 3199 index definitions except for the clustered index which is added to the 3200 dictionary cache at booting before calling this function. */ 3201 void 3202 dict_load_sys_table( 3203 /*================*/ 3204 dict_table_t* table) /*!< in: system table */ 3205 { 3206 mem_heap_t* heap; 3207 3208 ut_ad(mutex_own(&dict_sys->mutex)); 3209 3210 heap = mem_heap_create(1000); 3211 3212 dict_load_indexes(table, heap, DICT_ERR_IGNORE_NONE); 3213 3214 mem_heap_free(heap); 3215 } 3216 3217 /********************************************************************//** 3218 Loads foreign key constraint col names (also for the referenced table). 3219 Members that must be set (and valid) in foreign: 3220 foreign->heap 3221 foreign->n_fields 3222 foreign->id ('\0'-terminated) 3223 Members that will be created and set by this function: 3224 foreign->foreign_col_names[i] 3225 foreign->referenced_col_names[i] 3226 (for i=0..foreign->n_fields-1) */ 3227 static 3228 void 3229 dict_load_foreign_cols( 3230 /*===================*/ 3231 dict_foreign_t* foreign)/*!< in/out: foreign constraint object */ 3232 { 3233 dict_table_t* sys_foreign_cols; 3234 dict_index_t* sys_index; 3235 btr_pcur_t pcur; 3236 dtuple_t* tuple; 3237 dfield_t* dfield; 3238 const rec_t* rec; 3239 const byte* field; 3240 ulint len; 3241 ulint i; 3242 mtr_t mtr; 3243 size_t id_len; 3244 3245 ut_ad(mutex_own(&dict_sys->mutex)); 3246 3247 id_len = strlen(foreign->id); 3248 3249 foreign->foreign_col_names = static_cast<const char**>( 3250 mem_heap_alloc(foreign->heap, 3251 foreign->n_fields * sizeof(void*))); 3252 3253 foreign->referenced_col_names = static_cast<const char**>( 3254 mem_heap_alloc(foreign->heap, 3255 foreign->n_fields * sizeof(void*))); 3256 3257 mtr_start(&mtr); 3258 3259 sys_foreign_cols = dict_table_get_low("SYS_FOREIGN_COLS"); 3260 3261 sys_index = UT_LIST_GET_FIRST(sys_foreign_cols->indexes); 3262 ut_ad(!dict_table_is_comp(sys_foreign_cols)); 3263 3264 tuple = dtuple_create(foreign->heap, 1); 3265 dfield = dtuple_get_nth_field(tuple, 0); 3266 3267 dfield_set_data(dfield, foreign->id, id_len); 3268 dict_index_copy_types(tuple, sys_index, 1); 3269 3270 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE, 3271 BTR_SEARCH_LEAF, &pcur, &mtr); 3272 for (i = 0; i < foreign->n_fields; i++) { 3273 3274 rec = btr_pcur_get_rec(&pcur); 3275 3276 ut_a(btr_pcur_is_on_user_rec(&pcur)); 3277 ut_a(!rec_get_deleted_flag(rec, 0)); 3278 3279 field = rec_get_nth_field_old( 3280 rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len); 3281 3282 if (len != id_len || ut_memcmp(foreign->id, field, len) != 0) { 3283 const rec_t* pos; 3284 ulint pos_len; 3285 const rec_t* for_col_name; 3286 ulint for_col_name_len; 3287 const rec_t* ref_col_name; 3288 ulint ref_col_name_len; 3289 3290 pos = rec_get_nth_field_old( 3291 rec, DICT_FLD__SYS_FOREIGN_COLS__POS, 3292 &pos_len); 3293 3294 for_col_name = rec_get_nth_field_old( 3295 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, 3296 &for_col_name_len); 3297 3298 ref_col_name = rec_get_nth_field_old( 3299 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, 3300 &ref_col_name_len); 3301 3302 ib::fatal sout; 3303 3304 sout << "Unable to load column names for foreign" 3305 " key '" << foreign->id 3306 << "' because it was not found in" 3307 " InnoDB internal table SYS_FOREIGN_COLS. The" 3308 " closest entry we found is:" 3309 " (ID='"; 3310 sout.write(field, len); 3311 sout << "', POS=" << mach_read_from_4(pos) 3312 << ", FOR_COL_NAME='"; 3313 sout.write(for_col_name, for_col_name_len); 3314 sout << "', REF_COL_NAME='"; 3315 sout.write(ref_col_name, ref_col_name_len); 3316 sout << "')"; 3317 } 3318 3319 field = rec_get_nth_field_old( 3320 rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len); 3321 ut_a(len == 4); 3322 ut_a(i == mach_read_from_4(field)); 3323 3324 field = rec_get_nth_field_old( 3325 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len); 3326 foreign->foreign_col_names[i] = mem_heap_strdupl( 3327 foreign->heap, (char*) field, len); 3328 3329 field = rec_get_nth_field_old( 3330 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len); 3331 foreign->referenced_col_names[i] = mem_heap_strdupl( 3332 foreign->heap, (char*) field, len); 3333 3334 btr_pcur_move_to_next_user_rec(&pcur, &mtr); 3335 } 3336 3337 btr_pcur_close(&pcur); 3338 mtr_commit(&mtr); 3339 } 3340 3341 /***********************************************************************//** 3342 Loads a foreign key constraint to the dictionary cache. If the referenced 3343 table is not yet loaded, it is added in the output parameter (fk_tables). 3344 @return DB_SUCCESS or error code */ 3345 static MY_ATTRIBUTE((nonnull(1), warn_unused_result)) 3346 dberr_t 3347 dict_load_foreign( 3348 /*==============*/ 3349 const char* id, 3350 /*!< in: foreign constraint id, must be 3351 '\0'-terminated */ 3352 const char** col_names, 3353 /*!< in: column names, or NULL 3354 to use foreign->foreign_table->col_names */ 3355 bool check_recursive, 3356 /*!< in: whether to record the foreign table 3357 parent count to avoid unlimited recursive 3358 load of chained foreign tables */ 3359 bool check_charsets, 3360 /*!< in: whether to check charset 3361 compatibility */ 3362 dict_err_ignore_t ignore_err, 3363 /*!< in: error to be ignored */ 3364 dict_names_t& fk_tables) 3365 /*!< out: the foreign key constraint is added 3366 to the dictionary cache only if the referenced 3367 table is already in cache. Otherwise, the 3368 foreign key constraint is not added to cache, 3369 and the referenced table is added to this 3370 stack. */ 3371 { 3372 dict_foreign_t* foreign; 3373 dict_table_t* sys_foreign; 3374 btr_pcur_t pcur; 3375 dict_index_t* sys_index; 3376 dtuple_t* tuple; 3377 mem_heap_t* heap2; 3378 dfield_t* dfield; 3379 const rec_t* rec; 3380 const byte* field; 3381 ulint len; 3382 ulint n_fields_and_type; 3383 mtr_t mtr; 3384 dict_table_t* for_table; 3385 dict_table_t* ref_table; 3386 size_t id_len; 3387 3388 DBUG_ENTER("dict_load_foreign"); 3389 DBUG_PRINT("dict_load_foreign", 3390 ("id: '%s', check_recursive: %d", id, check_recursive)); 3391 3392 ut_ad(mutex_own(&dict_sys->mutex)); 3393 3394 id_len = strlen(id); 3395 3396 heap2 = mem_heap_create(1000); 3397 3398 mtr_start(&mtr); 3399 3400 sys_foreign = dict_table_get_low("SYS_FOREIGN"); 3401 3402 sys_index = UT_LIST_GET_FIRST(sys_foreign->indexes); 3403 ut_ad(!dict_table_is_comp(sys_foreign)); 3404 3405 tuple = dtuple_create(heap2, 1); 3406 dfield = dtuple_get_nth_field(tuple, 0); 3407 3408 dfield_set_data(dfield, id, id_len); 3409 dict_index_copy_types(tuple, sys_index, 1); 3410 3411 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE, 3412 BTR_SEARCH_LEAF, &pcur, &mtr); 3413 rec = btr_pcur_get_rec(&pcur); 3414 3415 if (!btr_pcur_is_on_user_rec(&pcur) 3416 || rec_get_deleted_flag(rec, 0)) { 3417 /* Not found */ 3418 3419 ib::error() << "Cannot load foreign constraint " << id 3420 << ": could not find the relevant record in " 3421 << "SYS_FOREIGN"; 3422 3423 btr_pcur_close(&pcur); 3424 mtr_commit(&mtr); 3425 mem_heap_free(heap2); 3426 3427 DBUG_RETURN(DB_ERROR); 3428 } 3429 3430 field = rec_get_nth_field_old(rec, DICT_FLD__SYS_FOREIGN__ID, &len); 3431 3432 /* Check if the id in record is the searched one */ 3433 if (len != id_len || ut_memcmp(id, field, len) != 0) { 3434 3435 { 3436 ib::error err; 3437 err << "Cannot load foreign constraint " << id 3438 << ": found "; 3439 err.write(field, len); 3440 err << " instead in SYS_FOREIGN"; 3441 } 3442 3443 btr_pcur_close(&pcur); 3444 mtr_commit(&mtr); 3445 mem_heap_free(heap2); 3446 3447 DBUG_RETURN(DB_ERROR); 3448 } 3449 3450 /* Read the table names and the number of columns associated 3451 with the constraint */ 3452 3453 mem_heap_free(heap2); 3454 3455 foreign = dict_mem_foreign_create(); 3456 3457 n_fields_and_type = mach_read_from_4( 3458 rec_get_nth_field_old( 3459 rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len)); 3460 3461 ut_a(len == 4); 3462 3463 /* We store the type in the bits 24..29 of n_fields_and_type. */ 3464 3465 foreign->type = (unsigned int) (n_fields_and_type >> 24); 3466 foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL); 3467 3468 foreign->id = mem_heap_strdupl(foreign->heap, id, id_len); 3469 3470 field = rec_get_nth_field_old( 3471 rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len); 3472 3473 foreign->foreign_table_name = mem_heap_strdupl( 3474 foreign->heap, (char*) field, len); 3475 dict_mem_foreign_table_name_lookup_set(foreign, TRUE); 3476 3477 const ulint foreign_table_name_len = len; 3478 3479 field = rec_get_nth_field_old( 3480 rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len); 3481 foreign->referenced_table_name = mem_heap_strdupl( 3482 foreign->heap, (char*) field, len); 3483 dict_mem_referenced_table_name_lookup_set(foreign, TRUE); 3484 3485 btr_pcur_close(&pcur); 3486 mtr_commit(&mtr); 3487 3488 dict_load_foreign_cols(foreign); 3489 3490 ref_table = dict_table_check_if_in_cache_low( 3491 foreign->referenced_table_name_lookup); 3492 for_table = dict_table_check_if_in_cache_low( 3493 foreign->foreign_table_name_lookup); 3494 3495 if (!for_table) { 3496 /* To avoid recursively loading the tables related through 3497 the foreign key constraints, the child table name is saved 3498 here. The child table will be loaded later, along with its 3499 foreign key constraint. */ 3500 3501 ut_a(ref_table != NULL); 3502 fk_tables.push_back( 3503 mem_heap_strdupl(ref_table->heap, 3504 foreign->foreign_table_name_lookup, 3505 foreign_table_name_len)); 3506 3507 dict_foreign_remove_from_cache(foreign); 3508 DBUG_RETURN(DB_SUCCESS); 3509 } 3510 3511 ut_a(for_table || ref_table); 3512 3513 /* Note that there may already be a foreign constraint object in 3514 the dictionary cache for this constraint: then the following 3515 call only sets the pointers in it to point to the appropriate table 3516 and index objects and frees the newly created object foreign. 3517 Adding to the cache should always succeed since we are not creating 3518 a new foreign key constraint but loading one from the data 3519 dictionary. */ 3520 3521 DBUG_RETURN(dict_foreign_add_to_cache(foreign, col_names, 3522 check_charsets, 3523 ignore_err)); 3524 } 3525 3526 /***********************************************************************//** 3527 Loads foreign key constraints where the table is either the foreign key 3528 holder or where the table is referenced by a foreign key. Adds these 3529 constraints to the data dictionary. 3530 3531 The foreign key constraint is loaded only if the referenced table is also 3532 in the dictionary cache. If the referenced table is not in dictionary 3533 cache, then it is added to the output parameter (fk_tables). 3534 3535 @return DB_SUCCESS or error code */ 3536 dberr_t 3537 dict_load_foreigns( 3538 const char* table_name, /*!< in: table name */ 3539 const char** col_names, /*!< in: column names, or NULL 3540 to use table->col_names */ 3541 bool check_recursive,/*!< in: Whether to check 3542 recursive load of tables 3543 chained by FK */ 3544 bool check_charsets, /*!< in: whether to check 3545 charset compatibility */ 3546 dict_err_ignore_t ignore_err, /*!< in: error to be ignored */ 3547 dict_names_t& fk_tables) 3548 /*!< out: stack of table 3549 names which must be loaded 3550 subsequently to load all the 3551 foreign key constraints. */ 3552 { 3553 ulint tuple_buf[(DTUPLE_EST_ALLOC(1) + sizeof(ulint) - 1) 3554 / sizeof(ulint)]; 3555 btr_pcur_t pcur; 3556 dtuple_t* tuple; 3557 dfield_t* dfield; 3558 dict_index_t* sec_index; 3559 dict_table_t* sys_foreign; 3560 const rec_t* rec; 3561 const byte* field; 3562 ulint len; 3563 dberr_t err; 3564 mtr_t mtr; 3565 3566 DBUG_ENTER("dict_load_foreigns"); 3567 3568 ut_ad(mutex_own(&dict_sys->mutex)); 3569 3570 sys_foreign = dict_table_get_low("SYS_FOREIGN"); 3571 3572 if (sys_foreign == NULL) { 3573 /* No foreign keys defined yet in this database */ 3574 3575 ib::info() << "No foreign key system tables in the database"; 3576 DBUG_RETURN(DB_ERROR); 3577 } 3578 3579 ut_ad(!dict_table_is_comp(sys_foreign)); 3580 mtr_start(&mtr); 3581 3582 /* Get the secondary index based on FOR_NAME from table 3583 SYS_FOREIGN */ 3584 3585 sec_index = dict_table_get_next_index( 3586 dict_table_get_first_index(sys_foreign)); 3587 ut_ad(!dict_index_is_clust(sec_index)); 3588 start_load: 3589 3590 tuple = dtuple_create_from_mem(tuple_buf, sizeof(tuple_buf), 1, 0); 3591 dfield = dtuple_get_nth_field(tuple, 0); 3592 3593 dfield_set_data(dfield, table_name, ut_strlen(table_name)); 3594 dict_index_copy_types(tuple, sec_index, 1); 3595 3596 btr_pcur_open_on_user_rec(sec_index, tuple, PAGE_CUR_GE, 3597 BTR_SEARCH_LEAF, &pcur, &mtr); 3598 loop: 3599 rec = btr_pcur_get_rec(&pcur); 3600 3601 if (!btr_pcur_is_on_user_rec(&pcur)) { 3602 /* End of index */ 3603 3604 goto load_next_index; 3605 } 3606 3607 /* Now we have the record in the secondary index containing a table 3608 name and a foreign constraint ID */ 3609 3610 field = rec_get_nth_field_old( 3611 rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__NAME, &len); 3612 3613 /* Check if the table name in the record is the one searched for; the 3614 following call does the comparison in the latin1_swedish_ci 3615 charset-collation, in a case-insensitive way. */ 3616 3617 if (0 != cmp_data_data(dfield_get_type(dfield)->mtype, 3618 dfield_get_type(dfield)->prtype, 3619 static_cast<const byte*>( 3620 dfield_get_data(dfield)), 3621 dfield_get_len(dfield), 3622 field, len)) { 3623 3624 goto load_next_index; 3625 } 3626 3627 /* Since table names in SYS_FOREIGN are stored in a case-insensitive 3628 order, we have to check that the table name matches also in a binary 3629 string comparison. On Unix, MySQL allows table names that only differ 3630 in character case. If lower_case_table_names=2 then what is stored 3631 may not be the same case, but the previous comparison showed that they 3632 match with no-case. */ 3633 3634 if (rec_get_deleted_flag(rec, 0)) { 3635 goto next_rec; 3636 } 3637 3638 if ((innobase_get_lower_case_table_names() != 2) 3639 && (0 != ut_memcmp(field, table_name, len))) { 3640 goto next_rec; 3641 } 3642 3643 /* Now we get a foreign key constraint id */ 3644 field = rec_get_nth_field_old( 3645 rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__ID, &len); 3646 3647 /* Copy the string because the page may be modified or evicted 3648 after mtr_commit() below. */ 3649 char fk_id[MAX_TABLE_NAME_LEN + 1]; 3650 3651 ut_a(len <= MAX_TABLE_NAME_LEN); 3652 memcpy(fk_id, field, len); 3653 fk_id[len] = '\0'; 3654 3655 btr_pcur_store_position(&pcur, &mtr); 3656 3657 mtr_commit(&mtr); 3658 3659 /* Load the foreign constraint definition to the dictionary cache */ 3660 3661 err = dict_load_foreign(fk_id, col_names, 3662 check_recursive, check_charsets, ignore_err, 3663 fk_tables); 3664 3665 if (err != DB_SUCCESS) { 3666 btr_pcur_close(&pcur); 3667 3668 DBUG_RETURN(err); 3669 } 3670 3671 mtr_start(&mtr); 3672 3673 btr_pcur_restore_position(BTR_SEARCH_LEAF, &pcur, &mtr); 3674 next_rec: 3675 btr_pcur_move_to_next_user_rec(&pcur, &mtr); 3676 3677 goto loop; 3678 3679 load_next_index: 3680 btr_pcur_close(&pcur); 3681 mtr_commit(&mtr); 3682 3683 sec_index = dict_table_get_next_index(sec_index); 3684 3685 if (sec_index != NULL) { 3686 3687 mtr_start(&mtr); 3688 3689 /* Switch to scan index on REF_NAME, fk_max_recusive_level 3690 already been updated when scanning FOR_NAME index, no need to 3691 update again */ 3692 check_recursive = FALSE; 3693 3694 goto start_load; 3695 } 3696 3697 DBUG_RETURN(DB_SUCCESS); 3698 } 3699 3700 /***********************************************************************//** 3701 Loads a table id based on the index id. 3702 @return true if found */ 3703 static 3704 bool 3705 dict_load_table_id_on_index_id( 3706 /*===========================*/ 3707 index_id_t index_id, /*!< in: index id */ 3708 table_id_t* table_id) /*!< out: table id */ 3709 { 3710 /* check hard coded indexes */ 3711 switch(index_id) { 3712 case DICT_TABLES_ID: 3713 case DICT_COLUMNS_ID: 3714 case DICT_INDEXES_ID: 3715 case DICT_FIELDS_ID: 3716 *table_id = index_id; 3717 return true; 3718 case DICT_TABLE_IDS_ID: 3719 /* The following is a secondary index on SYS_TABLES */ 3720 *table_id = DICT_TABLES_ID; 3721 return true; 3722 } 3723 3724 bool found = false; 3725 mtr_t mtr; 3726 3727 ut_ad(mutex_own(&(dict_sys->mutex))); 3728 3729 /* NOTE that the operation of this function is protected by 3730 the dictionary mutex, and therefore no deadlocks can occur 3731 with other dictionary operations. */ 3732 3733 mtr_start(&mtr); 3734 3735 btr_pcur_t pcur; 3736 const rec_t* rec = dict_startscan_system(&pcur, &mtr, SYS_INDEXES); 3737 3738 while (rec) { 3739 ulint len; 3740 const byte* field = rec_get_nth_field_old( 3741 rec, DICT_FLD__SYS_INDEXES__ID, &len); 3742 ut_ad(len == 8); 3743 3744 /* Check if the index id is the one searched for */ 3745 if (index_id == mach_read_from_8(field)) { 3746 found = true; 3747 /* Now we get the table id */ 3748 const byte* field = rec_get_nth_field_old( 3749 rec, 3750 DICT_FLD__SYS_INDEXES__TABLE_ID, 3751 &len); 3752 *table_id = mach_read_from_8(field); 3753 break; 3754 } 3755 mtr_commit(&mtr); 3756 mtr_start(&mtr); 3757 rec = dict_getnext_system(&pcur, &mtr); 3758 } 3759 3760 btr_pcur_close(&pcur); 3761 mtr_commit(&mtr); 3762 3763 return(found); 3764 } 3765 3766 dict_table_t* dict_table_open_on_index_id(index_id_t index_id) 3767 { 3768 table_id_t table_id; 3769 dict_table_t * table = NULL; 3770 if (dict_load_table_id_on_index_id(index_id, &table_id)) { 3771 table = dict_table_open_on_id(table_id, true, 3772 DICT_TABLE_OP_LOAD_TABLESPACE); 3773 } 3774 3775 return table; 3776 } 3777