1 /*****************************************************************************
2
3 Copyright (c) 1996, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file dict/dict0load.cc
29 Loads to the memory cache database object definitions
30 from dictionary tables
31
32 Created 4/24/1996 Heikki Tuuri
33 *******************************************************/
34
35 #include "ha_prototypes.h"
36
37 #include "dict0load.h"
38 #ifdef UNIV_NONINL
39 #include "dict0load.ic"
40 #endif
41
42 #include "mysql_version.h"
43 #include "btr0pcur.h"
44 #include "btr0btr.h"
45 #include "dict0boot.h"
46 #include "dict0crea.h"
47 #include "dict0dict.h"
48 #include "dict0mem.h"
49 #include "dict0priv.h"
50 #include "dict0stats.h"
51 #include "fsp0file.h"
52 #include "fsp0sysspace.h"
53 #include "fts0priv.h"
54 #include "mach0data.h"
55 #include "page0page.h"
56 #include "rem0cmp.h"
57 #include "srv0start.h"
58 #include "srv0srv.h"
59 #include <stack>
60 #include <set>
61
62 /** Following are the InnoDB system tables. The positions in
63 this array are referenced by enum dict_system_table_id. */
64 static const char* SYSTEM_TABLE_NAME[] = {
65 "SYS_TABLES",
66 "SYS_INDEXES",
67 "SYS_COLUMNS",
68 "SYS_FIELDS",
69 "SYS_FOREIGN",
70 "SYS_FOREIGN_COLS",
71 "SYS_TABLESPACES",
72 "SYS_DATAFILES",
73 "SYS_VIRTUAL",
74 "SYS_ZIP_DICT",
75 "SYS_ZIP_DICT_COLS"
76 };
77
78 /** Loads a table definition and also all its index definitions.
79
80 Loads those foreign key constraints whose referenced table is already in
81 dictionary cache. If a foreign key constraint is not loaded, then the
82 referenced table is pushed into the output stack (fk_tables), if it is not
83 NULL. These tables must be subsequently loaded so that all the foreign
84 key constraints are loaded into memory.
85
86 @param[in] name Table name in the db/tablename format
87 @param[in] cached true=add to cache, false=do not
88 @param[in] ignore_err Error to be ignored when loading table
89 and its index definition
90 @param[out] fk_tables Related table names that must also be
91 loaded to ensure that all foreign key
92 constraints are loaded.
93 @return table, NULL if does not exist; if the table is stored in an
94 .ibd file, but the file does not exist, then we set the
95 file_unreadable flag TRUE in the table object we return */
96 static
97 dict_table_t*
98 dict_load_table_one(
99 table_name_t& name,
100 bool cached,
101 dict_err_ignore_t ignore_err,
102 dict_names_t& fk_tables);
103
104 /** Loads a table definition from a SYS_TABLES record to dict_table_t.
105 Does not load any columns or indexes.
106 @param[in] name Table name
107 @param[in] rec SYS_TABLES record
108 @param[out,own] table Table, or NULL
109 @return error message, or NULL on success */
110 static
111 const char*
112 dict_load_table_low(
113 table_name_t& name,
114 const rec_t* rec,
115 dict_table_t** table);
116
117 /* If this flag is TRUE, then we will load the cluster index's (and tables')
118 metadata even if it is marked as "corrupted". */
119 my_bool srv_load_corrupted = FALSE;
120
121 #ifdef UNIV_DEBUG
122 /****************************************************************//**
123 Compare the name of an index column.
124 @return TRUE if the i'th column of index is 'name'. */
125 static
126 ibool
name_of_col_is(const dict_table_t * table,const dict_index_t * index,ulint i,const char * name)127 name_of_col_is(
128 /*===========*/
129 const dict_table_t* table, /*!< in: table */
130 const dict_index_t* index, /*!< in: index */
131 ulint i, /*!< in: index field offset */
132 const char* name) /*!< in: name to compare to */
133 {
134 ulint tmp = dict_col_get_no(dict_field_get_col(
135 dict_index_get_nth_field(
136 index, i)));
137
138 return(strcmp(name, dict_table_get_col_name(table, tmp)) == 0);
139 }
140 #endif /* UNIV_DEBUG */
141
142 /********************************************************************//**
143 Finds the first table name in the given database.
144 @return own: table name, NULL if does not exist; the caller must free
145 the memory in the string! */
146 char*
dict_get_first_table_name_in_db(const char * name)147 dict_get_first_table_name_in_db(
148 /*============================*/
149 const char* name) /*!< in: database name which ends in '/' */
150 {
151 dict_table_t* sys_tables;
152 btr_pcur_t pcur;
153 dict_index_t* sys_index;
154 dtuple_t* tuple;
155 mem_heap_t* heap;
156 dfield_t* dfield;
157 const rec_t* rec;
158 const byte* field;
159 ulint len;
160 mtr_t mtr;
161
162 ut_ad(mutex_own(&dict_sys->mutex));
163
164 heap = mem_heap_create(1000);
165
166 mtr_start(&mtr);
167
168 sys_tables = dict_table_get_low("SYS_TABLES");
169 sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
170 ut_ad(!dict_table_is_comp(sys_tables));
171
172 tuple = dtuple_create(heap, 1);
173 dfield = dtuple_get_nth_field(tuple, 0);
174
175 dfield_set_data(dfield, name, ut_strlen(name));
176 dict_index_copy_types(tuple, sys_index, 1);
177
178 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
179 BTR_SEARCH_LEAF, &pcur, &mtr);
180 loop:
181 rec = btr_pcur_get_rec(&pcur);
182
183 if (!btr_pcur_is_on_user_rec(&pcur)) {
184 /* Not found */
185
186 btr_pcur_close(&pcur);
187 mtr_commit(&mtr);
188 mem_heap_free(heap);
189
190 return(NULL);
191 }
192
193 field = rec_get_nth_field_old(
194 rec, DICT_FLD__SYS_TABLES__NAME, &len);
195
196 if (len < strlen(name)
197 || ut_memcmp(name, field, strlen(name)) != 0) {
198 /* Not found */
199
200 btr_pcur_close(&pcur);
201 mtr_commit(&mtr);
202 mem_heap_free(heap);
203
204 return(NULL);
205 }
206
207 if (!rec_get_deleted_flag(rec, 0)) {
208
209 /* We found one */
210
211 char* table_name = mem_strdupl((char*) field, len);
212
213 btr_pcur_close(&pcur);
214 mtr_commit(&mtr);
215 mem_heap_free(heap);
216
217 return(table_name);
218 }
219
220 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
221
222 goto loop;
223 }
224
225 /********************************************************************//**
226 This function gets the next system table record as it scans the table.
227 @return the next record if found, NULL if end of scan */
228 static
229 const rec_t*
dict_getnext_system_low(btr_pcur_t * pcur,mtr_t * mtr)230 dict_getnext_system_low(
231 /*====================*/
232 btr_pcur_t* pcur, /*!< in/out: persistent cursor to the
233 record*/
234 mtr_t* mtr) /*!< in: the mini-transaction */
235 {
236 rec_t* rec = NULL;
237
238 while (!rec || rec_get_deleted_flag(rec, 0)) {
239 btr_pcur_move_to_next_user_rec(pcur, mtr);
240
241 rec = btr_pcur_get_rec(pcur);
242
243 if (!btr_pcur_is_on_user_rec(pcur)) {
244 /* end of index */
245 btr_pcur_close(pcur);
246
247 return(NULL);
248 }
249 }
250
251 /* Get a record, let's save the position */
252 btr_pcur_store_position(pcur, mtr);
253
254 return(rec);
255 }
256
257 /********************************************************************//**
258 This function opens a system table, and returns the first record.
259 @return first record of the system table */
260 const rec_t*
dict_startscan_system(btr_pcur_t * pcur,mtr_t * mtr,dict_system_id_t system_id)261 dict_startscan_system(
262 /*==================*/
263 btr_pcur_t* pcur, /*!< out: persistent cursor to
264 the record */
265 mtr_t* mtr, /*!< in: the mini-transaction */
266 dict_system_id_t system_id) /*!< in: which system table to open */
267 {
268 dict_table_t* system_table;
269 dict_index_t* clust_index;
270 const rec_t* rec;
271
272 ut_a(system_id < SYS_NUM_SYSTEM_TABLES);
273
274 system_table = dict_table_get_low(SYSTEM_TABLE_NAME[system_id]);
275
276 clust_index = UT_LIST_GET_FIRST(system_table->indexes);
277
278 btr_pcur_open_at_index_side(true, clust_index, BTR_SEARCH_LEAF, pcur,
279 true, 0, mtr);
280
281 rec = dict_getnext_system_low(pcur, mtr);
282
283 return(rec);
284 }
285
286 /********************************************************************//**
287 This function gets the next system table record as it scans the table.
288 @return the next record if found, NULL if end of scan */
289 const rec_t*
dict_getnext_system(btr_pcur_t * pcur,mtr_t * mtr)290 dict_getnext_system(
291 /*================*/
292 btr_pcur_t* pcur, /*!< in/out: persistent cursor
293 to the record */
294 mtr_t* mtr) /*!< in: the mini-transaction */
295 {
296 const rec_t* rec;
297
298 /* Restore the position */
299 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
300
301 /* Get the next record */
302 rec = dict_getnext_system_low(pcur, mtr);
303
304 return(rec);
305 }
306
307 /********************************************************************//**
308 This function processes one SYS_TABLES record and populate the dict_table_t
309 struct for the table. Extracted out of dict_print() to be used by
310 both monitor table output and information schema innodb_sys_tables output.
311 @return error message, or NULL on success */
312 const char*
dict_process_sys_tables_rec_and_mtr_commit(mem_heap_t * heap,const rec_t * rec,dict_table_t ** table,dict_table_info_t status,mtr_t * mtr)313 dict_process_sys_tables_rec_and_mtr_commit(
314 /*=======================================*/
315 mem_heap_t* heap, /*!< in/out: temporary memory heap */
316 const rec_t* rec, /*!< in: SYS_TABLES record */
317 dict_table_t** table, /*!< out: dict_table_t to fill */
318 dict_table_info_t status, /*!< in: status bit controls
319 options such as whether we shall
320 look for dict_table_t from cache
321 first */
322 mtr_t* mtr) /*!< in/out: mini-transaction,
323 will be committed */
324 {
325 ulint len;
326 const char* field;
327 const char* err_msg = NULL;
328 table_name_t table_name;
329
330 field = (const char*) rec_get_nth_field_old(
331 rec, DICT_FLD__SYS_TABLES__NAME, &len);
332
333 ut_a(!rec_get_deleted_flag(rec, 0));
334
335 ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
336
337 /* Get the table name */
338 table_name.m_name = mem_heap_strdupl(heap, field, len);
339
340 /* If DICT_TABLE_LOAD_FROM_CACHE is set, first check
341 whether there is cached dict_table_t struct */
342 if (status & DICT_TABLE_LOAD_FROM_CACHE) {
343
344 /* Commit before load the table again */
345 mtr_commit(mtr);
346
347 *table = dict_table_get_low(table_name.m_name);
348
349 if (!(*table)) {
350 err_msg = "Table not found in cache";
351 }
352 } else {
353 err_msg = dict_load_table_low(table_name, rec, table);
354 mtr_commit(mtr);
355 }
356
357 if (err_msg) {
358 return(err_msg);
359 }
360
361 return(NULL);
362 }
363
364 /********************************************************************//**
365 This function parses a SYS_INDEXES record and populate a dict_index_t
366 structure with the information from the record. For detail information
367 about SYS_INDEXES fields, please refer to dict_boot() function.
368 @return error message, or NULL on success */
369 const char*
dict_process_sys_indexes_rec(mem_heap_t * heap,const rec_t * rec,dict_index_t * index,table_id_t * table_id)370 dict_process_sys_indexes_rec(
371 /*=========================*/
372 mem_heap_t* heap, /*!< in/out: heap memory */
373 const rec_t* rec, /*!< in: current SYS_INDEXES rec */
374 dict_index_t* index, /*!< out: index to be filled */
375 table_id_t* table_id) /*!< out: index table id */
376 {
377 const char* err_msg;
378 byte* buf;
379
380 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
381
382 /* Parse the record, and get "dict_index_t" struct filled */
383 err_msg = dict_load_index_low(buf, NULL,
384 heap, rec, FALSE, &index);
385
386 *table_id = mach_read_from_8(buf);
387
388 return(err_msg);
389 }
390
391 /********************************************************************//**
392 This function parses a SYS_COLUMNS record and populate a dict_column_t
393 structure with the information from the record.
394 @return error message, or NULL on success */
395 const char*
dict_process_sys_columns_rec(mem_heap_t * heap,const rec_t * rec,dict_col_t * column,table_id_t * table_id,const char ** col_name,ulint * nth_v_col)396 dict_process_sys_columns_rec(
397 /*=========================*/
398 mem_heap_t* heap, /*!< in/out: heap memory */
399 const rec_t* rec, /*!< in: current SYS_COLUMNS rec */
400 dict_col_t* column, /*!< out: dict_col_t to be filled */
401 table_id_t* table_id, /*!< out: table id */
402 const char** col_name, /*!< out: column name */
403 ulint* nth_v_col) /*!< out: if virtual col, this is
404 record's sequence number */
405 {
406 const char* err_msg;
407
408 /* Parse the record, and get "dict_col_t" struct filled */
409 err_msg = dict_load_column_low(NULL, heap, column,
410 table_id, col_name, rec, nth_v_col);
411
412 return(err_msg);
413 }
414
415 /** This function parses a SYS_VIRTUAL record and extracts virtual column
416 information
417 @param[in,out] heap heap memory
418 @param[in] rec current SYS_COLUMNS rec
419 @param[in,out] table_id table id
420 @param[in,out] pos virtual column position
421 @param[in,out] base_pos base column position
422 @return error message, or NULL on success */
423 const char*
dict_process_sys_virtual_rec(mem_heap_t * heap,const rec_t * rec,table_id_t * table_id,ulint * pos,ulint * base_pos)424 dict_process_sys_virtual_rec(
425 mem_heap_t* heap,
426 const rec_t* rec,
427 table_id_t* table_id,
428 ulint* pos,
429 ulint* base_pos)
430 {
431 const char* err_msg;
432
433 /* Parse the record, and get "dict_col_t" struct filled */
434 err_msg = dict_load_virtual_low(NULL, heap, NULL, table_id,
435 pos, base_pos, rec);
436
437 return(err_msg);
438 }
439 /********************************************************************//**
440 This function parses a SYS_FIELDS record and populates a dict_field_t
441 structure with the information from the record.
442 @return error message, or NULL on success */
443 const char*
dict_process_sys_fields_rec(mem_heap_t * heap,const rec_t * rec,dict_field_t * sys_field,ulint * pos,index_id_t * index_id,index_id_t last_id)444 dict_process_sys_fields_rec(
445 /*========================*/
446 mem_heap_t* heap, /*!< in/out: heap memory */
447 const rec_t* rec, /*!< in: current SYS_FIELDS rec */
448 dict_field_t* sys_field, /*!< out: dict_field_t to be
449 filled */
450 ulint* pos, /*!< out: Field position */
451 index_id_t* index_id, /*!< out: current index id */
452 index_id_t last_id) /*!< in: previous index id */
453 {
454 byte* buf;
455 byte* last_index_id;
456 const char* err_msg;
457
458 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
459
460 last_index_id = static_cast<byte*>(mem_heap_alloc(heap, 8));
461 mach_write_to_8(last_index_id, last_id);
462
463 err_msg = dict_load_field_low(buf, NULL, sys_field,
464 pos, last_index_id, heap, rec);
465
466 *index_id = mach_read_from_8(buf);
467
468 return(err_msg);
469
470 }
471
472 /********************************************************************//**
473 This function parses a SYS_FOREIGN record and populate a dict_foreign_t
474 structure with the information from the record. For detail information
475 about SYS_FOREIGN fields, please refer to dict_load_foreign() function.
476 @return error message, or NULL on success */
477 const char*
dict_process_sys_foreign_rec(mem_heap_t * heap,const rec_t * rec,dict_foreign_t * foreign)478 dict_process_sys_foreign_rec(
479 /*=========================*/
480 mem_heap_t* heap, /*!< in/out: heap memory */
481 const rec_t* rec, /*!< in: current SYS_FOREIGN rec */
482 dict_foreign_t* foreign) /*!< out: dict_foreign_t struct
483 to be filled */
484 {
485 ulint len;
486 const byte* field;
487 ulint n_fields_and_type;
488
489 if (rec_get_deleted_flag(rec, 0)) {
490 return("delete-marked record in SYS_FOREIGN");
491 }
492
493 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN) {
494 return("wrong number of columns in SYS_FOREIGN record");
495 }
496
497 field = rec_get_nth_field_old(
498 rec, DICT_FLD__SYS_FOREIGN__ID, &len);
499 if (len == 0 || len == UNIV_SQL_NULL) {
500 err_len:
501 return("incorrect column length in SYS_FOREIGN");
502 }
503
504 /* This recieves a dict_foreign_t* that points to a stack variable.
505 So mem_heap_free(foreign->heap) is not used as elsewhere.
506 Since the heap used here is freed elsewhere, foreign->heap
507 is not assigned. */
508 foreign->id = mem_heap_strdupl(heap, (const char*) field, len);
509
510 rec_get_nth_field_offs_old(
511 rec, DICT_FLD__SYS_FOREIGN__DB_TRX_ID, &len);
512 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
513 goto err_len;
514 }
515 rec_get_nth_field_offs_old(
516 rec, DICT_FLD__SYS_FOREIGN__DB_ROLL_PTR, &len);
517 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
518 goto err_len;
519 }
520
521 /* The _lookup versions of the referenced and foreign table names
522 are not assigned since they are not used in this dict_foreign_t */
523
524 field = rec_get_nth_field_old(
525 rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
526 if (len == 0 || len == UNIV_SQL_NULL) {
527 goto err_len;
528 }
529 foreign->foreign_table_name = mem_heap_strdupl(
530 heap, (const char*) field, len);
531
532 field = rec_get_nth_field_old(
533 rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
534 if (len == 0 || len == UNIV_SQL_NULL) {
535 goto err_len;
536 }
537 foreign->referenced_table_name = mem_heap_strdupl(
538 heap, (const char*) field, len);
539
540 field = rec_get_nth_field_old(
541 rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len);
542 if (len != 4) {
543 goto err_len;
544 }
545 n_fields_and_type = mach_read_from_4(field);
546
547 foreign->type = (unsigned int) (n_fields_and_type >> 24);
548 foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL);
549
550 return(NULL);
551 }
552
553 /********************************************************************//**
554 This function parses a SYS_FOREIGN_COLS record and extract necessary
555 information from the record and return to caller.
556 @return error message, or NULL on success */
557 const char*
dict_process_sys_foreign_col_rec(mem_heap_t * heap,const rec_t * rec,const char ** name,const char ** for_col_name,const char ** ref_col_name,ulint * pos)558 dict_process_sys_foreign_col_rec(
559 /*=============================*/
560 mem_heap_t* heap, /*!< in/out: heap memory */
561 const rec_t* rec, /*!< in: current SYS_FOREIGN_COLS rec */
562 const char** name, /*!< out: foreign key constraint name */
563 const char** for_col_name, /*!< out: referencing column name */
564 const char** ref_col_name, /*!< out: referenced column name
565 in referenced table */
566 ulint* pos) /*!< out: column position */
567 {
568 ulint len;
569 const byte* field;
570
571 if (rec_get_deleted_flag(rec, 0)) {
572 return("delete-marked record in SYS_FOREIGN_COLS");
573 }
574
575 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN_COLS) {
576 return("wrong number of columns in SYS_FOREIGN_COLS record");
577 }
578
579 field = rec_get_nth_field_old(
580 rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
581 if (len == 0 || len == UNIV_SQL_NULL) {
582 err_len:
583 return("incorrect column length in SYS_FOREIGN_COLS");
584 }
585 *name = mem_heap_strdupl(heap, (char*) field, len);
586
587 field = rec_get_nth_field_old(
588 rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
589 if (len != 4) {
590 goto err_len;
591 }
592 *pos = mach_read_from_4(field);
593
594 rec_get_nth_field_offs_old(
595 rec, DICT_FLD__SYS_FOREIGN_COLS__DB_TRX_ID, &len);
596 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
597 goto err_len;
598 }
599 rec_get_nth_field_offs_old(
600 rec, DICT_FLD__SYS_FOREIGN_COLS__DB_ROLL_PTR, &len);
601 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
602 goto err_len;
603 }
604
605 field = rec_get_nth_field_old(
606 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
607 if (len == 0 || len == UNIV_SQL_NULL) {
608 goto err_len;
609 }
610 *for_col_name = mem_heap_strdupl(heap, (char*) field, len);
611
612 field = rec_get_nth_field_old(
613 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
614 if (len == 0 || len == UNIV_SQL_NULL) {
615 goto err_len;
616 }
617 *ref_col_name = mem_heap_strdupl(heap, (char*) field, len);
618
619 return(NULL);
620 }
621
622 /********************************************************************//**
623 This function parses a SYS_TABLESPACES record, extracts necessary
624 information from the record and returns to caller.
625 @return error message, or NULL on success */
626 const char*
dict_process_sys_tablespaces(mem_heap_t * heap,const rec_t * rec,ulint * space,const char ** name,ulint * flags)627 dict_process_sys_tablespaces(
628 /*=========================*/
629 mem_heap_t* heap, /*!< in/out: heap memory */
630 const rec_t* rec, /*!< in: current SYS_TABLESPACES rec */
631 ulint* space, /*!< out: space id */
632 const char** name, /*!< out: tablespace name */
633 ulint* flags) /*!< out: tablespace flags */
634 {
635 ulint len;
636 const byte* field;
637
638 /* Initialize the output values */
639 *space = ULINT_UNDEFINED;
640 *name = NULL;
641 *flags = ULINT_UNDEFINED;
642
643 if (rec_get_deleted_flag(rec, 0)) {
644 return("delete-marked record in SYS_TABLESPACES");
645 }
646
647 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLESPACES) {
648 return("wrong number of columns in SYS_TABLESPACES record");
649 }
650
651 field = rec_get_nth_field_old(
652 rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
653 if (len != DICT_FLD_LEN_SPACE) {
654 err_len:
655 return("incorrect column length in SYS_TABLESPACES");
656 }
657 *space = mach_read_from_4(field);
658
659 rec_get_nth_field_offs_old(
660 rec, DICT_FLD__SYS_TABLESPACES__DB_TRX_ID, &len);
661 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
662 goto err_len;
663 }
664
665 rec_get_nth_field_offs_old(
666 rec, DICT_FLD__SYS_TABLESPACES__DB_ROLL_PTR, &len);
667 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
668 goto err_len;
669 }
670
671 field = rec_get_nth_field_old(
672 rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
673 if (len == 0 || len == UNIV_SQL_NULL) {
674 goto err_len;
675 }
676 *name = mem_heap_strdupl(heap, (char*) field, len);
677
678 field = rec_get_nth_field_old(
679 rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
680 if (len != DICT_FLD_LEN_FLAGS) {
681 goto err_len;
682 }
683 *flags = mach_read_from_4(field);
684
685 return(NULL);
686 }
687
688 /********************************************************************//**
689 This function parses a SYS_DATAFILES record, extracts necessary
690 information from the record and returns it to the caller.
691 @return error message, or NULL on success */
692 const char*
dict_process_sys_datafiles(mem_heap_t * heap,const rec_t * rec,ulint * space,const char ** path)693 dict_process_sys_datafiles(
694 /*=======================*/
695 mem_heap_t* heap, /*!< in/out: heap memory */
696 const rec_t* rec, /*!< in: current SYS_DATAFILES rec */
697 ulint* space, /*!< out: space id */
698 const char** path) /*!< out: datafile paths */
699 {
700 ulint len;
701 const byte* field;
702
703 if (rec_get_deleted_flag(rec, 0)) {
704 return("delete-marked record in SYS_DATAFILES");
705 }
706
707 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_DATAFILES) {
708 return("wrong number of columns in SYS_DATAFILES record");
709 }
710
711 field = rec_get_nth_field_old(
712 rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
713 if (len != DICT_FLD_LEN_SPACE) {
714 err_len:
715 return("incorrect column length in SYS_DATAFILES");
716 }
717 *space = mach_read_from_4(field);
718
719 rec_get_nth_field_offs_old(
720 rec, DICT_FLD__SYS_DATAFILES__DB_TRX_ID, &len);
721 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
722 goto err_len;
723 }
724
725 rec_get_nth_field_offs_old(
726 rec, DICT_FLD__SYS_DATAFILES__DB_ROLL_PTR, &len);
727 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
728 goto err_len;
729 }
730
731 field = rec_get_nth_field_old(
732 rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
733 if (len == 0 || len == UNIV_SQL_NULL) {
734 goto err_len;
735 }
736 *path = mem_heap_strdupl(heap, (char*) field, len);
737
738 return(NULL);
739 }
740
741 /** This function parses a SYS_ZIP_DICT record, extracts necessary
742 information from the record and returns to caller.
743 @return error message, or NULL on success */
744 const char*
dict_process_sys_zip_dict(mem_heap_t * heap,const page_size_t & page_size,const rec_t * rec,ulint * id,const char ** name,const char ** data,ulint * data_len)745 dict_process_sys_zip_dict(
746 mem_heap_t* heap, /*!< in/out: heap memory */
747 const page_size_t&
748 page_size, /*!< in: BLOB page size */
749 const rec_t* rec, /*!< in: current SYS_ZIP_DICT rec */
750 ulint* id, /*!< out: dict id */
751 const char** name, /*!< out: dict name */
752 const char** data, /*!< out: dict data */
753 ulint* data_len) /*!< out: dict data length */
754 {
755 ulint len;
756 const byte* field;
757
758 /* Initialize the output values */
759 *id = ULINT_UNDEFINED;
760 *name = NULL;
761 *data = NULL;
762 *data_len = 0;
763
764 if (UNIV_UNLIKELY(rec_get_deleted_flag(rec, 0))) {
765 return("delete-marked record in SYS_ZIP_DICT");
766 }
767
768 if (UNIV_UNLIKELY(
769 rec_get_n_fields_old(rec)!= DICT_NUM_FIELDS__SYS_ZIP_DICT)) {
770 return("wrong number of columns in SYS_ZIP_DICT record");
771 }
772
773 field = rec_get_nth_field_old(
774 rec, DICT_FLD__SYS_ZIP_DICT__ID, &len);
775 if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
776 goto err_len;
777 }
778 *id = mach_read_from_4(field);
779
780 rec_get_nth_field_offs_old(
781 rec, DICT_FLD__SYS_ZIP_DICT__DB_TRX_ID, &len);
782 if (UNIV_UNLIKELY(len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL)) {
783 goto err_len;
784 }
785
786 rec_get_nth_field_offs_old(
787 rec, DICT_FLD__SYS_ZIP_DICT__DB_ROLL_PTR, &len);
788 if (UNIV_UNLIKELY(len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL)) {
789 goto err_len;
790 }
791
792 field = rec_get_nth_field_old(
793 rec, DICT_FLD__SYS_ZIP_DICT__NAME, &len);
794 if (UNIV_UNLIKELY(len == 0 || len == UNIV_SQL_NULL)) {
795 goto err_len;
796 }
797 *name = mem_heap_strdupl(heap, (char*) field, len);
798
799 field = rec_get_nth_field_old(
800 rec, DICT_FLD__SYS_ZIP_DICT__DATA, &len);
801 if (UNIV_UNLIKELY(len == UNIV_SQL_NULL)) {
802 goto err_len;
803 }
804
805 if (rec_get_1byte_offs_flag(rec) == 0 &&
806 rec_2_is_field_extern(rec, DICT_FLD__SYS_ZIP_DICT__DATA)) {
807 ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
808
809 if (UNIV_UNLIKELY
810 (!memcmp(field + len - BTR_EXTERN_FIELD_REF_SIZE,
811 field_ref_zero,
812 BTR_EXTERN_FIELD_REF_SIZE))) {
813 goto err_len;
814 }
815 *data = reinterpret_cast<char*>(
816 btr_copy_externally_stored_field(data_len, field,
817 page_size, len, heap));
818 }
819 else {
820 *data_len = len;
821 *data = static_cast<char*>(mem_heap_dup(heap, field, len));
822 }
823
824 return(NULL);
825
826 err_len:
827 return("incorrect column length in SYS_ZIP_DICT");
828 }
829
830 /** This function parses a SYS_ZIP_DICT_COLS record, extracts necessary
831 information from the record and returns to caller.
832 @return error message, or NULL on success */
833 const char*
dict_process_sys_zip_dict_cols(mem_heap_t * heap,const rec_t * rec,table_id_t * table_id,ulint * column_pos,ulint * dict_id)834 dict_process_sys_zip_dict_cols(
835 mem_heap_t* heap, /*!< in/out: heap memory */
836 const rec_t* rec, /*!< in: current SYS_ZIP_DICT rec */
837 table_id_t* table_id, /*!< out: table id */
838 ulint* column_pos, /*!< out: column position */
839 ulint* dict_id) /*!< out: dict id */
840 {
841 ulint len;
842 const byte* field;
843
844 /* Initialize the output values */
845 *table_id = ULINT_UNDEFINED;
846 *column_pos = ULINT_UNDEFINED;
847 *dict_id = ULINT_UNDEFINED;
848
849 if (UNIV_UNLIKELY(rec_get_deleted_flag(rec, 0))) {
850 return("delete-marked record in SYS_ZIP_DICT_COLS");
851 }
852
853 if (UNIV_UNLIKELY(rec_get_n_fields_old(rec) !=
854 DICT_NUM_FIELDS__SYS_ZIP_DICT_COLS)) {
855 return("wrong number of columns in SYS_ZIP_DICT_COLS"
856 " record");
857 }
858
859 field = rec_get_nth_field_old(
860 rec, DICT_FLD__SYS_ZIP_DICT_COLS__TABLE_ID, &len);
861 if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
862 err_len:
863 return("incorrect column length in SYS_ZIP_DICT_COLS");
864 }
865 *table_id = mach_read_from_4(field);
866
867 field = rec_get_nth_field_old(
868 rec, DICT_FLD__SYS_ZIP_DICT_COLS__COLUMN_POS, &len);
869 if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
870 goto err_len;
871 }
872 *column_pos = mach_read_from_4(field);
873
874 rec_get_nth_field_offs_old(
875 rec, DICT_FLD__SYS_ZIP_DICT_COLS__DB_TRX_ID, &len);
876 if (UNIV_UNLIKELY(len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL)) {
877 goto err_len;
878 }
879
880 rec_get_nth_field_offs_old(
881 rec, DICT_FLD__SYS_ZIP_DICT_COLS__DB_ROLL_PTR, &len);
882 if (UNIV_UNLIKELY(len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL)) {
883 goto err_len;
884 }
885
886 field = rec_get_nth_field_old(
887 rec, DICT_FLD__SYS_ZIP_DICT_COLS__DICT_ID, &len);
888 if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
889 goto err_len;
890 }
891 *dict_id = mach_read_from_4(field);
892
893 return(NULL);
894 }
895
896 /** Get the first filepath from SYS_DATAFILES for a given space_id.
897 @param[in] space_id Tablespace ID
898 @return First filepath (caller must invoke ut_free() on it)
899 @retval NULL if no SYS_DATAFILES entry was found. */
900 char*
dict_get_first_path(ulint space_id)901 dict_get_first_path(
902 ulint space_id)
903 {
904 mtr_t mtr;
905 dict_table_t* sys_datafiles;
906 dict_index_t* sys_index;
907 dtuple_t* tuple;
908 dfield_t* dfield;
909 byte* buf;
910 btr_pcur_t pcur;
911 const rec_t* rec;
912 const byte* field;
913 ulint len;
914 char* filepath = NULL;
915 mem_heap_t* heap = mem_heap_create(1024);
916
917 ut_ad(mutex_own(&dict_sys->mutex));
918
919 mtr_start(&mtr);
920
921 sys_datafiles = dict_table_get_low("SYS_DATAFILES");
922 sys_index = UT_LIST_GET_FIRST(sys_datafiles->indexes);
923
924 ut_ad(!dict_table_is_comp(sys_datafiles));
925 ut_ad(name_of_col_is(sys_datafiles, sys_index,
926 DICT_FLD__SYS_DATAFILES__SPACE, "SPACE"));
927 ut_ad(name_of_col_is(sys_datafiles, sys_index,
928 DICT_FLD__SYS_DATAFILES__PATH, "PATH"));
929
930 tuple = dtuple_create(heap, 1);
931 dfield = dtuple_get_nth_field(tuple, DICT_FLD__SYS_DATAFILES__SPACE);
932
933 buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
934 mach_write_to_4(buf, space_id);
935
936 dfield_set_data(dfield, buf, 4);
937 dict_index_copy_types(tuple, sys_index, 1);
938
939 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
940 BTR_SEARCH_LEAF, &pcur, &mtr);
941
942 rec = btr_pcur_get_rec(&pcur);
943
944 /* Get the filepath from this SYS_DATAFILES record. */
945 if (btr_pcur_is_on_user_rec(&pcur)) {
946 field = rec_get_nth_field_old(
947 rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
948 ut_a(len == 4);
949
950 if (space_id == mach_read_from_4(field)) {
951 /* A record for this space ID was found. */
952 field = rec_get_nth_field_old(
953 rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
954
955 ut_ad(len > 0);
956 ut_ad(len < OS_FILE_MAX_PATH);
957
958 if (len > 0 && len != UNIV_SQL_NULL) {
959 filepath = mem_strdupl(
960 reinterpret_cast<const char*>(field),
961 len);
962 ut_ad(filepath != NULL);
963
964 /* The dictionary may have been written on
965 another OS. */
966 os_normalize_path(filepath);
967 }
968 }
969 }
970
971 btr_pcur_close(&pcur);
972 mtr_commit(&mtr);
973 mem_heap_free(heap);
974
975 return(filepath);
976 }
977
978 /** Gets the space name from SYS_TABLESPACES for a given space ID.
979 @param[in] space_id Tablespace ID
980 @param[in] callers_heap A heap to allocate from, may be NULL
981 @return Tablespace name (caller is responsible to free it)
982 @retval NULL if no dictionary entry was found. */
983 static
984 char*
dict_space_get_name(ulint space_id,mem_heap_t * callers_heap)985 dict_space_get_name(
986 ulint space_id,
987 mem_heap_t* callers_heap)
988 {
989 mtr_t mtr;
990 dict_table_t* sys_tablespaces;
991 dict_index_t* sys_index;
992 dtuple_t* tuple;
993 dfield_t* dfield;
994 byte* buf;
995 btr_pcur_t pcur;
996 const rec_t* rec;
997 const byte* field;
998 ulint len;
999 char* space_name = NULL;
1000 mem_heap_t* heap = mem_heap_create(1024);
1001
1002 ut_ad(mutex_own(&dict_sys->mutex));
1003
1004 sys_tablespaces = dict_table_get_low("SYS_TABLESPACES");
1005 if (sys_tablespaces == NULL) {
1006 ut_a(!srv_sys_tablespaces_open);
1007 return(NULL);
1008 }
1009
1010 sys_index = UT_LIST_GET_FIRST(sys_tablespaces->indexes);
1011
1012 ut_ad(!dict_table_is_comp(sys_tablespaces));
1013 ut_ad(name_of_col_is(sys_tablespaces, sys_index,
1014 DICT_FLD__SYS_TABLESPACES__SPACE, "SPACE"));
1015 ut_ad(name_of_col_is(sys_tablespaces, sys_index,
1016 DICT_FLD__SYS_TABLESPACES__NAME, "NAME"));
1017
1018 tuple = dtuple_create(heap, 1);
1019 dfield = dtuple_get_nth_field(tuple, DICT_FLD__SYS_TABLESPACES__SPACE);
1020
1021 buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1022 mach_write_to_4(buf, space_id);
1023
1024 dfield_set_data(dfield, buf, 4);
1025 dict_index_copy_types(tuple, sys_index, 1);
1026
1027 mtr_start(&mtr);
1028
1029 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
1030 BTR_SEARCH_LEAF, &pcur, &mtr);
1031
1032 rec = btr_pcur_get_rec(&pcur);
1033
1034 /* Get the tablespace name from this SYS_TABLESPACES record. */
1035 if (btr_pcur_is_on_user_rec(&pcur)) {
1036 field = rec_get_nth_field_old(
1037 rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
1038 ut_a(len == 4);
1039
1040 if (space_id == mach_read_from_4(field)) {
1041 /* A record for this space ID was found. */
1042 field = rec_get_nth_field_old(
1043 rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
1044
1045 ut_ad(len > 0);
1046 ut_ad(len < OS_FILE_MAX_PATH);
1047
1048 if (len > 0 && len != UNIV_SQL_NULL) {
1049 /* Found a tablespace name. */
1050 if (callers_heap == NULL) {
1051 space_name = mem_strdupl(
1052 reinterpret_cast<
1053 const char*>(field),
1054 len);
1055 } else {
1056 space_name = mem_heap_strdupl(
1057 callers_heap,
1058 reinterpret_cast<
1059 const char*>(field),
1060 len);
1061 }
1062 ut_ad(space_name);
1063 }
1064 }
1065 }
1066
1067 btr_pcur_close(&pcur);
1068 mtr_commit(&mtr);
1069 mem_heap_free(heap);
1070
1071 return(space_name);
1072 }
1073
1074 /** Update the record for space_id in SYS_TABLESPACES to this filepath.
1075 @param[in] space_id Tablespace ID
1076 @param[in] filepath Tablespace filepath
1077 @return DB_SUCCESS if OK, dberr_t if the insert failed */
1078 dberr_t
dict_update_filepath(ulint space_id,const char * filepath)1079 dict_update_filepath(
1080 ulint space_id,
1081 const char* filepath)
1082 {
1083 if (!srv_sys_tablespaces_open) {
1084 /* Startup procedure is not yet ready for updates. */
1085 return(DB_SUCCESS);
1086 }
1087
1088 dberr_t err = DB_SUCCESS;
1089 trx_t* trx;
1090
1091 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
1092 ut_ad(mutex_own(&dict_sys->mutex));
1093
1094 trx = trx_allocate_for_background();
1095 trx->op_info = "update filepath";
1096 trx->dict_operation_lock_mode = RW_X_LATCH;
1097 trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
1098
1099 pars_info_t* info = pars_info_create();
1100
1101 pars_info_add_int4_literal(info, "space", space_id);
1102 pars_info_add_str_literal(info, "path", filepath);
1103
1104 err = que_eval_sql(info,
1105 "PROCEDURE UPDATE_FILEPATH () IS\n"
1106 "BEGIN\n"
1107 "UPDATE SYS_DATAFILES"
1108 " SET PATH = :path\n"
1109 " WHERE SPACE = :space;\n"
1110 "END;\n", FALSE, trx);
1111
1112 trx_commit_for_mysql(trx);
1113 trx->dict_operation_lock_mode = 0;
1114 trx_free_for_background(trx);
1115
1116 if (err == DB_SUCCESS) {
1117 /* We just updated SYS_DATAFILES due to the contents in
1118 a link file. Make a note that we did this. */
1119 ib::info() << "The InnoDB data dictionary table SYS_DATAFILES"
1120 " for tablespace ID " << space_id
1121 << " was updated to use file " << filepath << ".";
1122 } else {
1123 ib::warn() << "Error occurred while updating InnoDB data"
1124 " dictionary table SYS_DATAFILES for tablespace ID "
1125 << space_id << " to file " << filepath << ": "
1126 << ut_strerr(err) << ".";
1127 }
1128
1129 return(err);
1130 }
1131
1132 /** Replace records in SYS_TABLESPACES and SYS_DATAFILES associated with
1133 the given space_id using an independent transaction.
1134 @param[in] space_id Tablespace ID
1135 @param[in] name Tablespace name
1136 @param[in] filepath First filepath
1137 @param[in] fsp_flags Tablespace flags
1138 @return DB_SUCCESS if OK, dberr_t if the insert failed */
1139 dberr_t
dict_replace_tablespace_and_filepath(ulint space_id,const char * name,const char * filepath,ulint fsp_flags)1140 dict_replace_tablespace_and_filepath(
1141 ulint space_id,
1142 const char* name,
1143 const char* filepath,
1144 ulint fsp_flags)
1145 {
1146 if (!srv_sys_tablespaces_open) {
1147 /* Startup procedure is not yet ready for updates.
1148 Return success since this will likely get updated
1149 later. */
1150 return(DB_SUCCESS);
1151 }
1152
1153 dberr_t err = DB_SUCCESS;
1154 trx_t* trx;
1155
1156 DBUG_EXECUTE_IF("innodb_fail_to_update_tablespace_dict",
1157 return(DB_INTERRUPTED););
1158
1159 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
1160 ut_ad(mutex_own(&dict_sys->mutex));
1161 ut_ad(filepath);
1162
1163 trx = trx_allocate_for_background();
1164 trx->op_info = "insert tablespace and filepath";
1165 trx->dict_operation_lock_mode = RW_X_LATCH;
1166 trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
1167
1168 /* A record for this space ID was not found in
1169 SYS_DATAFILES. Assume the record is also missing in
1170 SYS_TABLESPACES. Insert records into them both. */
1171 err = dict_replace_tablespace_in_dictionary(
1172 space_id, name, fsp_flags, filepath, trx, false);
1173
1174 trx_commit_for_mysql(trx);
1175 trx->dict_operation_lock_mode = 0;
1176 trx_free_for_background(trx);
1177
1178 return(err);
1179 }
1180
1181 /** Check the validity of a SYS_TABLES record
1182 Make sure the fields are the right length and that they
1183 do not contain invalid contents.
1184 @param[in] rec SYS_TABLES record
1185 @return error message, or NULL on success */
1186 static
1187 const char*
dict_sys_tables_rec_check(const rec_t * rec)1188 dict_sys_tables_rec_check(
1189 const rec_t* rec)
1190 {
1191 const byte* field;
1192 ulint len;
1193
1194 ut_ad(mutex_own(&dict_sys->mutex));
1195
1196 if (rec_get_deleted_flag(rec, 0)) {
1197 return("delete-marked record in SYS_TABLES");
1198 }
1199
1200 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLES) {
1201 return("wrong number of columns in SYS_TABLES record");
1202 }
1203
1204 rec_get_nth_field_offs_old(
1205 rec, DICT_FLD__SYS_TABLES__NAME, &len);
1206 if (len == 0 || len == UNIV_SQL_NULL) {
1207 err_len:
1208 return("incorrect column length in SYS_TABLES");
1209 }
1210 rec_get_nth_field_offs_old(
1211 rec, DICT_FLD__SYS_TABLES__DB_TRX_ID, &len);
1212 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1213 goto err_len;
1214 }
1215 rec_get_nth_field_offs_old(
1216 rec, DICT_FLD__SYS_TABLES__DB_ROLL_PTR, &len);
1217 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1218 goto err_len;
1219 }
1220
1221 rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__ID, &len);
1222 if (len != 8) {
1223 goto err_len;
1224 }
1225
1226 field = rec_get_nth_field_old(
1227 rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1228 if (field == NULL || len != 4) {
1229 goto err_len;
1230 }
1231
1232 rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1233 if (len != 4) {
1234 goto err_len;
1235 }
1236
1237 rec_get_nth_field_offs_old(
1238 rec, DICT_FLD__SYS_TABLES__MIX_ID, &len);
1239 if (len != 8) {
1240 goto err_len;
1241 }
1242
1243 field = rec_get_nth_field_old(
1244 rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1245 if (field == NULL || len != 4) {
1246 goto err_len;
1247 }
1248
1249 rec_get_nth_field_offs_old(
1250 rec, DICT_FLD__SYS_TABLES__CLUSTER_ID, &len);
1251 if (len != UNIV_SQL_NULL) {
1252 goto err_len;
1253 }
1254
1255 field = rec_get_nth_field_old(
1256 rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1257 if (field == NULL || len != 4) {
1258 goto err_len;
1259 }
1260
1261 return(NULL);
1262 }
1263
1264 /** Read and return the contents of a SYS_TABLESPACES record.
1265 @param[in] rec A record of SYS_TABLESPACES
1266 @param[out] id Pointer to the space_id for this table
1267 @param[in,out] name Buffer for Tablespace Name of length NAME_LEN
1268 @param[out] flags Pointer to tablespace flags
1269 @return true if the record was read correctly, false if not. */
1270 bool
dict_sys_tablespaces_rec_read(const rec_t * rec,ulint * id,char * name,ulint * flags)1271 dict_sys_tablespaces_rec_read(
1272 const rec_t* rec,
1273 ulint* id,
1274 char* name,
1275 ulint* flags)
1276 {
1277 const byte* field;
1278 ulint len;
1279
1280 field = rec_get_nth_field_old(
1281 rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
1282 if (len != DICT_FLD_LEN_SPACE) {
1283 ib::error() << "Wrong field length in SYS_TABLESPACES.SPACE: "
1284 << len;
1285 return(false);
1286 }
1287 *id = mach_read_from_4(field);
1288
1289 field = rec_get_nth_field_old(
1290 rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
1291 if (len == 0 || len == UNIV_SQL_NULL) {
1292 ib::error() << "Wrong field length in SYS_TABLESPACES.NAME: "
1293 << len;
1294 return(false);
1295 }
1296 strncpy(name, reinterpret_cast<const char*>(field), NAME_LEN);
1297
1298 /* read the 4 byte flags from the TYPE field */
1299 field = rec_get_nth_field_old(
1300 rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
1301 if (len != 4) {
1302 ib::error() << "Wrong field length in SYS_TABLESPACES.FLAGS: "
1303 << len;
1304 return(false);
1305 }
1306 *flags = mach_read_from_4(field);
1307
1308 return(true);
1309 }
1310
1311 /** Load and check each general tablespace mentioned in the SYS_TABLESPACES.
1312 Ignore system and file-per-table tablespaces.
1313 If it is valid, add it to the file_system list.
1314 @param[in] validate true when the previous shutdown was not clean
1315 @return the highest space ID found. */
1316 UNIV_INLINE
1317 ulint
dict_check_sys_tablespaces(bool validate)1318 dict_check_sys_tablespaces(
1319 bool validate)
1320 {
1321 ulint max_space_id = 0;
1322 btr_pcur_t pcur;
1323 const rec_t* rec;
1324 mtr_t mtr;
1325
1326 DBUG_ENTER("dict_check_sys_tablespaces");
1327
1328 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
1329 ut_ad(mutex_own(&dict_sys->mutex));
1330
1331 /* Before traversing it, let's make sure we have
1332 SYS_TABLESPACES and SYS_DATAFILES loaded. */
1333 dict_table_get_low("SYS_TABLESPACES");
1334 dict_table_get_low("SYS_DATAFILES");
1335
1336 mtr_start(&mtr);
1337
1338 for (rec = dict_startscan_system(&pcur, &mtr, SYS_TABLESPACES);
1339 rec != NULL;
1340 rec = dict_getnext_system(&pcur, &mtr))
1341 {
1342 char space_name[NAME_LEN];
1343 ulint space_id = 0;
1344 ulint fsp_flags;
1345
1346 if (!dict_sys_tablespaces_rec_read(rec, &space_id,
1347 space_name, &fsp_flags)) {
1348 continue;
1349 }
1350
1351 /* Ignore system and file-per-table tablespaces. */
1352 if (is_system_tablespace(space_id)
1353 || !fsp_is_shared_tablespace(fsp_flags)) {
1354 continue;
1355 }
1356
1357 /* Ignore tablespaces that already are in the tablespace
1358 cache. */
1359 if (fil_space_for_table_exists_in_mem(
1360 space_id, space_name, false, true, NULL, 0)) {
1361 /* Recovery can open a datafile that does not
1362 match SYS_DATAFILES. If they don't match, update
1363 SYS_DATAFILES. */
1364 char *dict_path = dict_get_first_path(space_id);
1365 char *fil_path = fil_space_get_first_path(space_id);
1366 if (dict_path && fil_path
1367 && strcmp(dict_path, fil_path)) {
1368 dict_update_filepath(space_id, fil_path);
1369 }
1370 ut_free(dict_path);
1371 ut_free(fil_path);
1372 continue;
1373 }
1374
1375 /* Set the expected filepath from the data dictionary.
1376 If the file is found elsewhere (from an ISL or the default
1377 location) or this path is the same file but looks different,
1378 fil_ibd_open() will update the dictionary with what is
1379 opened. */
1380 char* filepath = dict_get_first_path(space_id);
1381
1382 Keyring_encryption_info keyring_encryption_info;
1383
1384 /* Check that the .ibd file exists. */
1385 dberr_t err = fil_ibd_open(
1386 validate,
1387 !srv_read_only_mode && srv_log_file_size != 0,
1388 FIL_TYPE_TABLESPACE,
1389 space_id,
1390 fsp_flags,
1391 space_name,
1392 filepath,
1393 keyring_encryption_info);
1394
1395 if (err != DB_SUCCESS) {
1396 ib::warn() << "Ignoring tablespace "
1397 << id_name_t(space_name)
1398 << " because it could not be opened.";
1399 }
1400
1401 max_space_id = ut_max(max_space_id, space_id);
1402
1403 ut_free(filepath);
1404 }
1405
1406 mtr_commit(&mtr);
1407
1408 DBUG_RETURN(max_space_id);
1409 }
1410
1411 /** Read and return 5 integer fields from a SYS_TABLES record.
1412 @param[in] rec A record of SYS_TABLES
1413 @param[in] name Table Name, the same as SYS_TABLES.NAME
1414 @param[out] table_id Pointer to the table_id for this table
1415 @param[out] space_id Pointer to the space_id for this table
1416 @param[out] n_cols Pointer to number of columns for this table.
1417 @param[out] flags Pointer to table flags
1418 @param[out] flags2 Pointer to table flags2
1419 @return true if the record was read correctly, false if not. */
1420 static
1421 bool
dict_sys_tables_rec_read(const rec_t * rec,const table_name_t & table_name,table_id_t * table_id,ulint * space_id,ulint * n_cols,ulint * flags,ulint * flags2)1422 dict_sys_tables_rec_read(
1423 const rec_t* rec,
1424 const table_name_t& table_name,
1425 table_id_t* table_id,
1426 ulint* space_id,
1427 ulint* n_cols,
1428 ulint* flags,
1429 ulint* flags2)
1430 {
1431 const byte* field;
1432 ulint len;
1433 ulint type;
1434
1435 *flags2 = 0;
1436
1437 field = rec_get_nth_field_old(
1438 rec, DICT_FLD__SYS_TABLES__ID, &len);
1439 ut_ad(len == 8);
1440 *table_id = static_cast<table_id_t>(mach_read_from_8(field));
1441
1442 field = rec_get_nth_field_old(
1443 rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1444 ut_ad(len == 4);
1445 *space_id = mach_read_from_4(field);
1446
1447 /* Read the 4 byte flags from the TYPE field */
1448 field = rec_get_nth_field_old(
1449 rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1450 ut_a(len == 4);
1451 type = mach_read_from_4(field);
1452
1453 /* The low order bit of SYS_TABLES.TYPE is always set to 1. But in
1454 dict_table_t::flags the low order bit is used to determine if the
1455 row format is Redundant (0) or Compact (1) when the format is Antelope.
1456 Read the 4 byte N_COLS field and look at the high order bit. It
1457 should be set for COMPACT and later. It should not be set for
1458 REDUNDANT. */
1459 field = rec_get_nth_field_old(
1460 rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1461 ut_a(len == 4);
1462 *n_cols = mach_read_from_4(field);
1463
1464 /* This validation function also combines the DICT_N_COLS_COMPACT
1465 flag in n_cols into the type field to effectively make it a
1466 dict_table_t::flags. */
1467
1468 if (ULINT_UNDEFINED == dict_sys_tables_type_validate(type, *n_cols)) {
1469 ib::error() << "Table " << table_name << " in InnoDB"
1470 " data dictionary contains invalid flags."
1471 " SYS_TABLES.TYPE=" << type <<
1472 " SYS_TABLES.N_COLS=" << *n_cols;
1473 *flags = ULINT_UNDEFINED;
1474 return(false);
1475 }
1476
1477 *flags = dict_sys_tables_type_to_tf(type, *n_cols);
1478
1479 /* Get flags2 from SYS_TABLES.MIX_LEN */
1480 field = rec_get_nth_field_old(
1481 rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1482 *flags2 = mach_read_from_4(field);
1483
1484 /* DICT_TF2_FTS will be set when indexes are being loaded */
1485 *flags2 &= ~DICT_TF2_FTS;
1486
1487 /* Now that we have used this bit, unset it. */
1488 *n_cols &= ~DICT_N_COLS_COMPACT;
1489
1490 return(true);
1491 }
1492
1493 /** Load and check each non-predefined tablespace mentioned in SYS_TABLES.
1494 Search SYS_TABLES and check each tablespace mentioned that has not
1495 already been added to the fil_system. If it is valid, add it to the
1496 file_system list. Perform extra validation on the table if recovery from
1497 the REDO log occurred.
1498 @param[in] validate Whether to do validation on the table.
1499 @return the highest space ID found. */
1500 UNIV_INLINE
1501 ulint
dict_check_sys_tables(bool validate)1502 dict_check_sys_tables(
1503 bool validate)
1504 {
1505 ulint max_space_id = 0;
1506 btr_pcur_t pcur;
1507 const rec_t* rec;
1508 mtr_t mtr;
1509
1510 DBUG_ENTER("dict_check_sys_tables");
1511
1512 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
1513 ut_ad(mutex_own(&dict_sys->mutex));
1514
1515 mtr_start(&mtr);
1516
1517 /* Before traversing SYS_TABLES, let's make sure we have
1518 SYS_TABLESPACES and SYS_DATAFILES loaded. */
1519 dict_table_t* sys_tablespaces;
1520 dict_table_t* sys_datafiles;
1521 sys_tablespaces = dict_table_get_low("SYS_TABLESPACES");
1522 ut_a(sys_tablespaces != NULL);
1523 sys_datafiles = dict_table_get_low("SYS_DATAFILES");
1524 ut_a(sys_datafiles != NULL);
1525
1526 for (rec = dict_startscan_system(&pcur, &mtr, SYS_TABLES);
1527 rec != NULL;
1528 rec = dict_getnext_system(&pcur, &mtr)) {
1529 const byte* field;
1530 ulint len;
1531 char* space_name;
1532 table_name_t table_name;
1533 table_id_t table_id;
1534 ulint space_id;
1535 ulint n_cols;
1536 ulint flags;
1537 ulint flags2;
1538
1539 /* If a table record is not useable, ignore it and continue
1540 on to the next record. Error messages were logged. */
1541 if (dict_sys_tables_rec_check(rec) != NULL) {
1542 continue;
1543 }
1544
1545 /* Copy the table name from rec */
1546 field = rec_get_nth_field_old(
1547 rec, DICT_FLD__SYS_TABLES__NAME, &len);
1548 table_name.m_name = mem_strdupl((char*) field, len);
1549 DBUG_PRINT("dict_check_sys_tables",
1550 ("name: %p, '%s'", table_name.m_name,
1551 table_name.m_name));
1552
1553 dict_sys_tables_rec_read(rec, table_name,
1554 &table_id, &space_id,
1555 &n_cols, &flags, &flags2);
1556 if (flags == ULINT_UNDEFINED
1557 || is_system_tablespace(space_id)) {
1558 ut_free(table_name.m_name);
1559 continue;
1560 }
1561
1562 if (flags2 & DICT_TF2_DISCARDED) {
1563 ib::info() << "Ignoring tablespace " << table_name
1564 << " because the DISCARD flag is set .";
1565 ut_free(table_name.m_name);
1566 continue;
1567 }
1568
1569 /* If the table is not a predefined tablespace then it must
1570 be in a file-per-table or shared tablespace.
1571 Note that flags2 is not available for REDUNDANT tables,
1572 so don't check those. */
1573 ut_ad(DICT_TF_HAS_SHARED_SPACE(flags)
1574 || !DICT_TF_GET_COMPACT(flags)
1575 || flags2 & DICT_TF2_USE_FILE_PER_TABLE);
1576
1577 /* Look up the tablespace name in the data dictionary if this
1578 is a shared tablespace. For file-per-table, the table_name
1579 and the tablespace_name are the same.
1580 Some hidden tables like FTS AUX tables may not be found in
1581 the dictionary since they can always be found in the default
1582 location. If so, then dict_space_get_name() will return NULL,
1583 the space name must be the table_name, and the filepath can be
1584 discovered in the default location.*/
1585 char* shared_space_name = dict_space_get_name(space_id, NULL);
1586 space_name = shared_space_name == NULL
1587 ? table_name.m_name
1588 : shared_space_name;
1589
1590 /* Now that we have the proper name for this tablespace,
1591 whether it is a shared tablespace or a single table
1592 tablespace, look to see if it is already in the tablespace
1593 cache. */
1594 if (fil_space_for_table_exists_in_mem(
1595 space_id, space_name, false, true, NULL, 0)) {
1596 /* Recovery can open a datafile that does not
1597 match SYS_DATAFILES. If they don't match, update
1598 SYS_DATAFILES. */
1599 char *dict_path = dict_get_first_path(space_id);
1600 char *fil_path = fil_space_get_first_path(space_id);
1601 if (dict_path && fil_path
1602 && strcmp(dict_path, fil_path)) {
1603 dict_update_filepath(space_id, fil_path);
1604 }
1605 ut_free(dict_path);
1606 ut_free(fil_path);
1607 ut_free(table_name.m_name);
1608 ut_free(shared_space_name);
1609 continue;
1610 }
1611
1612 /* Set the expected filepath from the data dictionary.
1613 If the file is found elsewhere (from an ISL or the default
1614 location) or this path is the same file but looks different,
1615 fil_ibd_open() will update the dictionary with what is
1616 opened. */
1617 char* filepath = dict_get_first_path(space_id);
1618
1619 /* Check that the .ibd file exists. */
1620 bool is_temp = flags2 & DICT_TF2_TEMPORARY;
1621 bool is_encrypted = flags2 & DICT_TF2_ENCRYPTION;
1622 ulint fsp_flags = dict_tf_to_fsp_flags(flags,
1623 is_temp,
1624 is_encrypted);
1625
1626 Keyring_encryption_info keyring_encryption_info;
1627
1628 dberr_t err = fil_ibd_open(
1629 validate,
1630 !srv_read_only_mode && srv_log_file_size != 0,
1631 FIL_TYPE_TABLESPACE,
1632 space_id,
1633 fsp_flags,
1634 space_name,
1635 filepath,
1636 keyring_encryption_info);
1637
1638 if (err != DB_SUCCESS) {
1639 ib::warn() << "Ignoring tablespace "
1640 << id_name_t(space_name)
1641 << " because it could not be opened.";
1642 }
1643
1644 max_space_id = ut_max(max_space_id, space_id);
1645
1646 ut_free(table_name.m_name);
1647 ut_free(shared_space_name);
1648 ut_free(filepath);
1649 }
1650
1651 mtr_commit(&mtr);
1652
1653 DBUG_RETURN(max_space_id);
1654 }
1655
1656 /** Check each tablespace found in the data dictionary.
1657 Look at each general tablespace found in SYS_TABLESPACES.
1658 Then look at each table defined in SYS_TABLES that has a space_id > 0
1659 to find all the file-per-table tablespaces.
1660
1661 In a crash recovery we already have some tablespace objects created from
1662 processing the REDO log. Any other tablespace in SYS_TABLESPACES not
1663 previously used in recovery will be opened here. We will compare the
1664 space_id information in the data dictionary to what we find in the
1665 tablespace file. In addition, more validation will be done if recovery
1666 was needed and force_recovery is not set.
1667
1668 We also scan the biggest space id, and store it to fil_system.
1669 @param[in] validate true if recovery was needed */
1670 void
dict_check_tablespaces_and_store_max_id(bool validate)1671 dict_check_tablespaces_and_store_max_id(
1672 bool validate)
1673 {
1674 mtr_t mtr;
1675
1676 DBUG_ENTER("dict_check_tablespaces_and_store_max_id");
1677
1678 rw_lock_x_lock(dict_operation_lock);
1679 mutex_enter(&dict_sys->mutex);
1680
1681 /* Initialize the max space_id from sys header */
1682 mtr_start(&mtr);
1683 ulint max_space_id = mtr_read_ulint(
1684 dict_hdr_get(&mtr) + DICT_HDR_MAX_SPACE_ID,
1685 MLOG_4BYTES, &mtr);
1686 mtr_commit(&mtr);
1687
1688 fil_set_max_space_id_if_bigger(max_space_id);
1689
1690 /* Open all general tablespaces found in SYS_TABLESPACES. */
1691 ulint max1 = dict_check_sys_tablespaces(validate);
1692
1693 /* Open all tablespaces referenced in SYS_TABLES.
1694 This will update SYS_TABLESPACES and SYS_DATAFILES if it
1695 finds any file-per-table tablespaces not already there. */
1696 ulint max2 = dict_check_sys_tables(validate);
1697
1698 /* Store the max space_id found */
1699 max_space_id = ut_max(max1, max2);
1700 fil_set_max_space_id_if_bigger(max_space_id);
1701
1702 mutex_exit(&dict_sys->mutex);
1703 rw_lock_x_unlock(dict_operation_lock);
1704
1705 DBUG_VOID_RETURN;
1706 }
1707
1708 /** Error message for a delete-marked record in dict_load_column_low() */
1709 static const char* dict_load_column_del = "delete-marked record in SYS_COLUMN";
1710
1711 /********************************************************************//**
1712 Loads a table column definition from a SYS_COLUMNS record to
1713 dict_table_t.
1714 @return error message, or NULL on success */
1715 const char*
dict_load_column_low(dict_table_t * table,mem_heap_t * heap,dict_col_t * column,table_id_t * table_id,const char ** col_name,const rec_t * rec,ulint * nth_v_col)1716 dict_load_column_low(
1717 /*=================*/
1718 dict_table_t* table, /*!< in/out: table, could be NULL
1719 if we just populate a dict_column_t
1720 struct with information from
1721 a SYS_COLUMNS record */
1722 mem_heap_t* heap, /*!< in/out: memory heap
1723 for temporary storage */
1724 dict_col_t* column, /*!< out: dict_column_t to fill,
1725 or NULL if table != NULL */
1726 table_id_t* table_id, /*!< out: table id */
1727 const char** col_name, /*!< out: column name */
1728 const rec_t* rec, /*!< in: SYS_COLUMNS record */
1729 ulint* nth_v_col) /*!< out: if not NULL, this
1730 records the "n" of "nth" virtual
1731 column */
1732 {
1733 char* name;
1734 const byte* field;
1735 ulint len;
1736 ulint mtype;
1737 ulint prtype;
1738 ulint col_len;
1739 ulint pos;
1740 ulint num_base;
1741
1742 ut_ad(table || column);
1743
1744 if (rec_get_deleted_flag(rec, 0)) {
1745 return(dict_load_column_del);
1746 }
1747
1748 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_COLUMNS) {
1749 return("wrong number of columns in SYS_COLUMNS record");
1750 }
1751
1752 field = rec_get_nth_field_old(
1753 rec, DICT_FLD__SYS_COLUMNS__TABLE_ID, &len);
1754 if (len != 8) {
1755 err_len:
1756 return("incorrect column length in SYS_COLUMNS");
1757 }
1758
1759 if (table_id) {
1760 *table_id = mach_read_from_8(field);
1761 } else if (table->id != mach_read_from_8(field)) {
1762 return("SYS_COLUMNS.TABLE_ID mismatch");
1763 }
1764
1765 field = rec_get_nth_field_old(
1766 rec, DICT_FLD__SYS_COLUMNS__POS, &len);
1767 if (len != 4) {
1768 goto err_len;
1769 }
1770
1771 pos = mach_read_from_4(field);
1772
1773 rec_get_nth_field_offs_old(
1774 rec, DICT_FLD__SYS_COLUMNS__DB_TRX_ID, &len);
1775 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1776 goto err_len;
1777 }
1778 rec_get_nth_field_offs_old(
1779 rec, DICT_FLD__SYS_COLUMNS__DB_ROLL_PTR, &len);
1780 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1781 goto err_len;
1782 }
1783
1784 field = rec_get_nth_field_old(
1785 rec, DICT_FLD__SYS_COLUMNS__NAME, &len);
1786 if (len == 0 || len == UNIV_SQL_NULL) {
1787 goto err_len;
1788 }
1789
1790 name = mem_heap_strdupl(heap, (const char*) field, len);
1791
1792 if (col_name) {
1793 *col_name = name;
1794 }
1795
1796 field = rec_get_nth_field_old(
1797 rec, DICT_FLD__SYS_COLUMNS__MTYPE, &len);
1798 if (len != 4) {
1799 goto err_len;
1800 }
1801
1802 mtype = mach_read_from_4(field);
1803
1804 field = rec_get_nth_field_old(
1805 rec, DICT_FLD__SYS_COLUMNS__PRTYPE, &len);
1806 if (len != 4) {
1807 goto err_len;
1808 }
1809 prtype = mach_read_from_4(field);
1810
1811 if (dtype_get_charset_coll(prtype) == 0
1812 && dtype_is_string_type(mtype)) {
1813 /* The table was created with < 4.1.2. */
1814
1815 if (dtype_is_binary_string_type(mtype, prtype)) {
1816 /* Use the binary collation for
1817 string columns of binary type. */
1818
1819 prtype = dtype_form_prtype(
1820 prtype,
1821 DATA_MYSQL_BINARY_CHARSET_COLL);
1822 } else {
1823 /* Use the default charset for
1824 other than binary columns. */
1825
1826 prtype = dtype_form_prtype(
1827 prtype,
1828 data_mysql_default_charset_coll);
1829 }
1830 }
1831
1832 if (table && table->n_def != pos && !(prtype & DATA_VIRTUAL)) {
1833 return("SYS_COLUMNS.POS mismatch");
1834 }
1835
1836 field = rec_get_nth_field_old(
1837 rec, DICT_FLD__SYS_COLUMNS__LEN, &len);
1838 if (len != 4) {
1839 goto err_len;
1840 }
1841 col_len = mach_read_from_4(field);
1842 field = rec_get_nth_field_old(
1843 rec, DICT_FLD__SYS_COLUMNS__PREC, &len);
1844 if (len != 4) {
1845 goto err_len;
1846 }
1847 num_base = mach_read_from_4(field);
1848
1849 if (column == NULL) {
1850 if (prtype & DATA_VIRTUAL) {
1851 #ifdef UNIV_DEBUG
1852 dict_v_col_t* vcol =
1853 #endif
1854 dict_mem_table_add_v_col(
1855 table, heap, name, mtype,
1856 prtype, col_len,
1857 dict_get_v_col_mysql_pos(pos), num_base);
1858 ut_ad(vcol->v_pos == dict_get_v_col_pos(pos));
1859 } else {
1860 ut_ad(num_base == 0);
1861 dict_mem_table_add_col(table, heap, name, mtype,
1862 prtype, col_len);
1863 }
1864 } else {
1865 dict_mem_fill_column_struct(column, pos, mtype,
1866 prtype, col_len);
1867 }
1868
1869 /* Report the virtual column number */
1870 if ((prtype & DATA_VIRTUAL) && nth_v_col != NULL) {
1871 *nth_v_col = dict_get_v_col_pos(pos);
1872 }
1873
1874 return(NULL);
1875 }
1876
1877 /** Error message for a delete-marked record in dict_load_virtual_low() */
1878 static const char* dict_load_virtual_del = "delete-marked record in SYS_VIRTUAL";
1879
1880 /** Loads a virtual column "mapping" (to base columns) information
1881 from a SYS_VIRTUAL record
1882 @param[in,out] table table
1883 @param[in,out] heap memory heap
1884 @param[in,out] column mapped base column's dict_column_t
1885 @param[in,out] table_id table id
1886 @param[in,out] pos virtual column position
1887 @param[in,out] base_pos base column position
1888 @param[in] rec SYS_VIRTUAL record
1889 @return error message, or NULL on success */
1890 const char*
dict_load_virtual_low(dict_table_t * table,mem_heap_t * heap,dict_col_t ** column,table_id_t * table_id,ulint * pos,ulint * base_pos,const rec_t * rec)1891 dict_load_virtual_low(
1892 dict_table_t* table,
1893 mem_heap_t* heap,
1894 dict_col_t** column,
1895 table_id_t* table_id,
1896 ulint* pos,
1897 ulint* base_pos,
1898 const rec_t* rec)
1899 {
1900 const byte* field;
1901 ulint len;
1902 ulint base;
1903
1904 if (rec_get_deleted_flag(rec, 0)) {
1905 return(dict_load_virtual_del);
1906 }
1907
1908 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_VIRTUAL) {
1909 return("wrong number of columns in SYS_VIRTUAL record");
1910 }
1911
1912 field = rec_get_nth_field_old(
1913 rec, DICT_FLD__SYS_VIRTUAL__TABLE_ID, &len);
1914 if (len != 8) {
1915 err_len:
1916 return("incorrect column length in SYS_VIRTUAL");
1917 }
1918
1919 if (table_id != NULL) {
1920 *table_id = mach_read_from_8(field);
1921 } else if (table->id != mach_read_from_8(field)) {
1922 return("SYS_VIRTUAL.TABLE_ID mismatch");
1923 }
1924
1925 field = rec_get_nth_field_old(
1926 rec, DICT_FLD__SYS_VIRTUAL__POS, &len);
1927 if (len != 4) {
1928 goto err_len;
1929 }
1930
1931 if (pos != NULL) {
1932 *pos = mach_read_from_4(field);
1933 }
1934
1935 field = rec_get_nth_field_old(
1936 rec, DICT_FLD__SYS_VIRTUAL__BASE_POS, &len);
1937 if (len != 4) {
1938 goto err_len;
1939 }
1940
1941 base = mach_read_from_4(field);
1942
1943 if (base_pos != NULL) {
1944 *base_pos = base;
1945 }
1946
1947 rec_get_nth_field_offs_old(
1948 rec, DICT_FLD__SYS_VIRTUAL__DB_TRX_ID, &len);
1949 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1950 goto err_len;
1951 }
1952
1953 rec_get_nth_field_offs_old(
1954 rec, DICT_FLD__SYS_VIRTUAL__DB_ROLL_PTR, &len);
1955 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1956 goto err_len;
1957 }
1958
1959 if (column != NULL) {
1960 *column = dict_table_get_nth_col(table, base);
1961 }
1962
1963 return(NULL);
1964 }
1965 /********************************************************************//**
1966 Loads definitions for table columns. */
1967 static
1968 void
dict_load_columns(dict_table_t * table,mem_heap_t * heap)1969 dict_load_columns(
1970 /*==============*/
1971 dict_table_t* table, /*!< in/out: table */
1972 mem_heap_t* heap) /*!< in/out: memory heap
1973 for temporary storage */
1974 {
1975 dict_table_t* sys_columns;
1976 dict_index_t* sys_index;
1977 btr_pcur_t pcur;
1978 dtuple_t* tuple;
1979 dfield_t* dfield;
1980 const rec_t* rec;
1981 byte* buf;
1982 ulint i;
1983 mtr_t mtr;
1984 ulint n_skipped = 0;
1985
1986 ut_ad(mutex_own(&dict_sys->mutex));
1987
1988 mtr_start(&mtr);
1989
1990 sys_columns = dict_table_get_low("SYS_COLUMNS");
1991 sys_index = UT_LIST_GET_FIRST(sys_columns->indexes);
1992 ut_ad(!dict_table_is_comp(sys_columns));
1993
1994 ut_ad(name_of_col_is(sys_columns, sys_index,
1995 DICT_FLD__SYS_COLUMNS__NAME, "NAME"));
1996 ut_ad(name_of_col_is(sys_columns, sys_index,
1997 DICT_FLD__SYS_COLUMNS__PREC, "PREC"));
1998
1999 tuple = dtuple_create(heap, 1);
2000 dfield = dtuple_get_nth_field(tuple, 0);
2001
2002 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2003 mach_write_to_8(buf, table->id);
2004
2005 dfield_set_data(dfield, buf, 8);
2006 dict_index_copy_types(tuple, sys_index, 1);
2007
2008 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2009 BTR_SEARCH_LEAF, &pcur, &mtr);
2010
2011 ut_ad(table->n_t_cols == static_cast<ulint>(
2012 table->n_cols) + static_cast<ulint>(table->n_v_cols));
2013
2014 for (i = 0;
2015 i + DATA_N_SYS_COLS < table->n_t_cols + n_skipped;
2016 i++) {
2017 const char* err_msg;
2018 const char* name = NULL;
2019 ulint nth_v_col = ULINT_UNDEFINED;
2020
2021 rec = btr_pcur_get_rec(&pcur);
2022
2023 ut_a(btr_pcur_is_on_user_rec(&pcur));
2024
2025 err_msg = dict_load_column_low(table, heap, NULL, NULL,
2026 &name, rec, &nth_v_col);
2027
2028 if (err_msg == dict_load_column_del) {
2029 n_skipped++;
2030 goto next_rec;
2031 } else if (err_msg) {
2032 ib::fatal() << err_msg;
2033 }
2034
2035 /* Note: Currently we have one DOC_ID column that is
2036 shared by all FTS indexes on a table. And only non-virtual
2037 column can be used for FULLTEXT index */
2038 if (innobase_strcasecmp(name,
2039 FTS_DOC_ID_COL_NAME) == 0
2040 && nth_v_col == ULINT_UNDEFINED) {
2041 dict_col_t* col;
2042 /* As part of normal loading of tables the
2043 table->flag is not set for tables with FTS
2044 till after the FTS indexes are loaded. So we
2045 create the fts_t instance here if there isn't
2046 one already created.
2047
2048 This case does not arise for table create as
2049 the flag is set before the table is created. */
2050 if (table->fts == NULL) {
2051 table->fts = fts_create(table);
2052 fts_optimize_add_table(table);
2053 }
2054
2055 ut_a(table->fts->doc_col == ULINT_UNDEFINED);
2056
2057 col = dict_table_get_nth_col(table, i - n_skipped);
2058
2059 ut_ad(col->len == sizeof(doc_id_t));
2060
2061 if (col->prtype & DATA_FTS_DOC_ID) {
2062 DICT_TF2_FLAG_SET(
2063 table, DICT_TF2_FTS_HAS_DOC_ID);
2064 DICT_TF2_FLAG_UNSET(
2065 table, DICT_TF2_FTS_ADD_DOC_ID);
2066 }
2067
2068 table->fts->doc_col = i - n_skipped;
2069 }
2070 next_rec:
2071 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2072 }
2073
2074 btr_pcur_close(&pcur);
2075 mtr_commit(&mtr);
2076 }
2077
2078 /** Loads SYS_VIRTUAL info for one virtual column
2079 @param[in,out] table table
2080 @param[in] nth_v_col virtual column sequence num
2081 @param[in,out] v_col virtual column
2082 @param[in,out] heap memory heap
2083 */
2084 static
2085 void
dict_load_virtual_one_col(dict_table_t * table,ulint nth_v_col,dict_v_col_t * v_col,mem_heap_t * heap)2086 dict_load_virtual_one_col(
2087 dict_table_t* table,
2088 ulint nth_v_col,
2089 dict_v_col_t* v_col,
2090 mem_heap_t* heap)
2091 {
2092 dict_table_t* sys_virtual;
2093 dict_index_t* sys_virtual_index;
2094 btr_pcur_t pcur;
2095 dtuple_t* tuple;
2096 dfield_t* dfield;
2097 const rec_t* rec;
2098 byte* buf;
2099 ulint i = 0;
2100 mtr_t mtr;
2101 ulint skipped = 0;
2102
2103 ut_ad(mutex_own(&dict_sys->mutex));
2104
2105 if (v_col->num_base == 0) {
2106 return;
2107 }
2108
2109 mtr_start(&mtr);
2110
2111 sys_virtual = dict_table_get_low("SYS_VIRTUAL");
2112 sys_virtual_index = UT_LIST_GET_FIRST(sys_virtual->indexes);
2113 ut_ad(!dict_table_is_comp(sys_virtual));
2114
2115 ut_ad(name_of_col_is(sys_virtual, sys_virtual_index,
2116 DICT_FLD__SYS_VIRTUAL__POS, "POS"));
2117
2118 tuple = dtuple_create(heap, 2);
2119
2120 /* table ID field */
2121 dfield = dtuple_get_nth_field(tuple, 0);
2122
2123 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2124 mach_write_to_8(buf, table->id);
2125
2126 dfield_set_data(dfield, buf, 8);
2127
2128 /* virtual column pos field */
2129 dfield = dtuple_get_nth_field(tuple, 1);
2130
2131 buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
2132 ulint vcol_pos = dict_create_v_col_pos(nth_v_col, v_col->m_col.ind);
2133 mach_write_to_4(buf, vcol_pos);
2134
2135 dfield_set_data(dfield, buf, 4);
2136
2137 dict_index_copy_types(tuple, sys_virtual_index, 2);
2138
2139 btr_pcur_open_on_user_rec(sys_virtual_index, tuple, PAGE_CUR_GE,
2140 BTR_SEARCH_LEAF, &pcur, &mtr);
2141
2142 for (i = 0; i < v_col->num_base + skipped; i++) {
2143 const char* err_msg;
2144 ulint pos;
2145
2146 ut_ad(btr_pcur_is_on_user_rec(&pcur));
2147
2148 rec = btr_pcur_get_rec(&pcur);
2149
2150 ut_a(btr_pcur_is_on_user_rec(&pcur));
2151
2152 err_msg = dict_load_virtual_low(table, heap,
2153 &v_col->base_col[i - skipped],
2154 NULL,
2155 &pos, NULL, rec);
2156
2157 if (err_msg) {
2158 if (err_msg != dict_load_virtual_del) {
2159 ib::fatal() << err_msg;
2160 } else {
2161 skipped++;
2162 }
2163 } else {
2164 ut_ad(pos == vcol_pos);
2165 }
2166
2167 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2168 }
2169
2170 btr_pcur_close(&pcur);
2171 mtr_commit(&mtr);
2172 }
2173
2174 /** Loads info from SYS_VIRTUAL for virtual columns.
2175 @param[in,out] table table
2176 @param[in] heap memory heap
2177 */
2178 static
2179 void
dict_load_virtual(dict_table_t * table,mem_heap_t * heap)2180 dict_load_virtual(
2181 dict_table_t* table,
2182 mem_heap_t* heap)
2183 {
2184 for (ulint i = 0; i < table->n_v_cols; i++) {
2185 dict_v_col_t* v_col = dict_table_get_nth_v_col(table, i);
2186
2187 dict_load_virtual_one_col(table, i, v_col, heap);
2188 }
2189 }
2190
2191 /** Error message for a delete-marked record in dict_load_field_low() */
2192 static const char* dict_load_field_del = "delete-marked record in SYS_FIELDS";
2193
2194 /********************************************************************//**
2195 Loads an index field definition from a SYS_FIELDS record to
2196 dict_index_t.
2197 @return error message, or NULL on success */
2198 const char*
dict_load_field_low(byte * index_id,dict_index_t * index,dict_field_t * sys_field,ulint * pos,byte * last_index_id,mem_heap_t * heap,const rec_t * rec)2199 dict_load_field_low(
2200 /*================*/
2201 byte* index_id, /*!< in/out: index id (8 bytes)
2202 an "in" value if index != NULL
2203 and "out" if index == NULL */
2204 dict_index_t* index, /*!< in/out: index, could be NULL
2205 if we just populate a dict_field_t
2206 struct with information from
2207 a SYS_FIELDS record */
2208 dict_field_t* sys_field, /*!< out: dict_field_t to be
2209 filled */
2210 ulint* pos, /*!< out: Field position */
2211 byte* last_index_id, /*!< in: last index id */
2212 mem_heap_t* heap, /*!< in/out: memory heap
2213 for temporary storage */
2214 const rec_t* rec) /*!< in: SYS_FIELDS record */
2215 {
2216 const byte* field;
2217 ulint len;
2218 ulint pos_and_prefix_len;
2219 ulint prefix_len;
2220 ibool first_field;
2221 ulint position;
2222
2223 /* Either index or sys_field is supplied, not both */
2224 ut_a((!index) || (!sys_field));
2225
2226 if (rec_get_deleted_flag(rec, 0)) {
2227 return(dict_load_field_del);
2228 }
2229
2230 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FIELDS) {
2231 return("wrong number of columns in SYS_FIELDS record");
2232 }
2233
2234 field = rec_get_nth_field_old(
2235 rec, DICT_FLD__SYS_FIELDS__INDEX_ID, &len);
2236 if (len != 8) {
2237 err_len:
2238 return("incorrect column length in SYS_FIELDS");
2239 }
2240
2241 if (!index) {
2242 ut_a(last_index_id);
2243 memcpy(index_id, (const char*) field, 8);
2244 first_field = memcmp(index_id, last_index_id, 8);
2245 } else {
2246 first_field = (index->n_def == 0);
2247 if (memcmp(field, index_id, 8)) {
2248 return("SYS_FIELDS.INDEX_ID mismatch");
2249 }
2250 }
2251
2252 /* The next field stores the field position in the index and a
2253 possible column prefix length if the index field does not
2254 contain the whole column. The storage format is like this: if
2255 there is at least one prefix field in the index, then the HIGH
2256 2 bytes contain the field number (index->n_def) and the low 2
2257 bytes the prefix length for the field. Otherwise the field
2258 number (index->n_def) is contained in the 2 LOW bytes. */
2259
2260 field = rec_get_nth_field_old(
2261 rec, DICT_FLD__SYS_FIELDS__POS, &len);
2262 if (len != 4) {
2263 goto err_len;
2264 }
2265
2266 pos_and_prefix_len = mach_read_from_4(field);
2267
2268 if (index && UNIV_UNLIKELY
2269 ((pos_and_prefix_len & 0xFFFFUL) != index->n_def
2270 && (pos_and_prefix_len >> 16 & 0xFFFF) != index->n_def)) {
2271 return("SYS_FIELDS.POS mismatch");
2272 }
2273
2274 if (first_field || pos_and_prefix_len > 0xFFFFUL) {
2275 prefix_len = pos_and_prefix_len & 0xFFFFUL;
2276 position = (pos_and_prefix_len & 0xFFFF0000UL) >> 16;
2277 } else {
2278 prefix_len = 0;
2279 position = pos_and_prefix_len & 0xFFFFUL;
2280 }
2281
2282 rec_get_nth_field_offs_old(
2283 rec, DICT_FLD__SYS_FIELDS__DB_TRX_ID, &len);
2284 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2285 goto err_len;
2286 }
2287 rec_get_nth_field_offs_old(
2288 rec, DICT_FLD__SYS_FIELDS__DB_ROLL_PTR, &len);
2289 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2290 goto err_len;
2291 }
2292
2293 field = rec_get_nth_field_old(
2294 rec, DICT_FLD__SYS_FIELDS__COL_NAME, &len);
2295 if (len == 0 || len == UNIV_SQL_NULL) {
2296 goto err_len;
2297 }
2298
2299 if (index) {
2300 dict_mem_index_add_field(
2301 index, mem_heap_strdupl(heap, (const char*) field, len),
2302 prefix_len);
2303 } else {
2304 ut_a(sys_field);
2305 ut_a(pos);
2306
2307 sys_field->name = mem_heap_strdupl(
2308 heap, (const char*) field, len);
2309 sys_field->prefix_len = prefix_len;
2310 *pos = position;
2311 }
2312
2313 return(NULL);
2314 }
2315
2316 /********************************************************************//**
2317 Loads definitions for index fields.
2318 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption */
2319 static
2320 ulint
dict_load_fields(dict_index_t * index,mem_heap_t * heap)2321 dict_load_fields(
2322 /*=============*/
2323 dict_index_t* index, /*!< in/out: index whose fields to load */
2324 mem_heap_t* heap) /*!< in: memory heap for temporary storage */
2325 {
2326 dict_table_t* sys_fields;
2327 dict_index_t* sys_index;
2328 btr_pcur_t pcur;
2329 dtuple_t* tuple;
2330 dfield_t* dfield;
2331 const rec_t* rec;
2332 byte* buf;
2333 ulint i;
2334 mtr_t mtr;
2335 dberr_t error;
2336
2337 ut_ad(mutex_own(&dict_sys->mutex));
2338
2339 mtr_start(&mtr);
2340
2341 sys_fields = dict_table_get_low("SYS_FIELDS");
2342 sys_index = UT_LIST_GET_FIRST(sys_fields->indexes);
2343 ut_ad(!dict_table_is_comp(sys_fields));
2344 ut_ad(name_of_col_is(sys_fields, sys_index,
2345 DICT_FLD__SYS_FIELDS__COL_NAME, "COL_NAME"));
2346
2347 tuple = dtuple_create(heap, 1);
2348 dfield = dtuple_get_nth_field(tuple, 0);
2349
2350 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2351 mach_write_to_8(buf, index->id);
2352
2353 dfield_set_data(dfield, buf, 8);
2354 dict_index_copy_types(tuple, sys_index, 1);
2355
2356 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2357 BTR_SEARCH_LEAF, &pcur, &mtr);
2358 for (i = 0; i < index->n_fields; i++) {
2359 const char* err_msg;
2360
2361 rec = btr_pcur_get_rec(&pcur);
2362
2363 ut_a(btr_pcur_is_on_user_rec(&pcur));
2364
2365 err_msg = dict_load_field_low(buf, index, NULL, NULL, NULL,
2366 heap, rec);
2367
2368 if (err_msg == dict_load_field_del) {
2369 /* There could be delete marked records in
2370 SYS_FIELDS because SYS_FIELDS.INDEX_ID can be
2371 updated by ALTER TABLE ADD INDEX. */
2372
2373 goto next_rec;
2374 } else if (err_msg) {
2375 ib::error() << err_msg;
2376 error = DB_CORRUPTION;
2377 goto func_exit;
2378 }
2379 next_rec:
2380 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2381 }
2382
2383 error = DB_SUCCESS;
2384 func_exit:
2385 btr_pcur_close(&pcur);
2386 mtr_commit(&mtr);
2387 return(error);
2388 }
2389
2390 /** Error message for a delete-marked record in dict_load_index_low() */
2391 static const char* dict_load_index_del = "delete-marked record in SYS_INDEXES";
2392 /** Error message for table->id mismatch in dict_load_index_low() */
2393 static const char* dict_load_index_id_err = "SYS_INDEXES.TABLE_ID mismatch";
2394
2395 /********************************************************************//**
2396 Loads an index definition from a SYS_INDEXES record to dict_index_t.
2397 If allocate=TRUE, we will create a dict_index_t structure and fill it
2398 accordingly. If allocated=FALSE, the dict_index_t will be supplied by
2399 the caller and filled with information read from the record. @return
2400 error message, or NULL on success */
2401 const char*
dict_load_index_low(byte * table_id,const char * table_name,mem_heap_t * heap,const rec_t * rec,ibool allocate,dict_index_t ** index)2402 dict_load_index_low(
2403 /*================*/
2404 byte* table_id, /*!< in/out: table id (8 bytes),
2405 an "in" value if allocate=TRUE
2406 and "out" when allocate=FALSE */
2407 const char* table_name, /*!< in: table name */
2408 mem_heap_t* heap, /*!< in/out: temporary memory heap */
2409 const rec_t* rec, /*!< in: SYS_INDEXES record */
2410 ibool allocate, /*!< in: TRUE=allocate *index,
2411 FALSE=fill in a pre-allocated
2412 *index */
2413 dict_index_t** index) /*!< out,own: index, or NULL */
2414 {
2415 const byte* field;
2416 ulint len;
2417 ulint name_len;
2418 char* name_buf;
2419 index_id_t id;
2420 ulint n_fields;
2421 ulint type;
2422 ulint space;
2423 ulint merge_threshold;
2424
2425 if (allocate) {
2426 /* If allocate=TRUE, no dict_index_t will
2427 be supplied. Initialize "*index" to NULL */
2428 *index = NULL;
2429 }
2430
2431 if (rec_get_deleted_flag(rec, 0)) {
2432 return(dict_load_index_del);
2433 }
2434
2435 if (rec_get_n_fields_old(rec) == DICT_NUM_FIELDS__SYS_INDEXES) {
2436 /* MERGE_THRESHOLD exists */
2437 field = rec_get_nth_field_old(
2438 rec, DICT_FLD__SYS_INDEXES__MERGE_THRESHOLD, &len);
2439 switch (len) {
2440 case 4:
2441 merge_threshold = mach_read_from_4(field);
2442 break;
2443 case UNIV_SQL_NULL:
2444 merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2445 break;
2446 default:
2447 return("incorrect MERGE_THRESHOLD length"
2448 " in SYS_INDEXES");
2449 }
2450 } else if (rec_get_n_fields_old(rec)
2451 == DICT_NUM_FIELDS__SYS_INDEXES - 1) {
2452 /* MERGE_THRESHOLD doesn't exist */
2453
2454 merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2455 } else {
2456 return("wrong number of columns in SYS_INDEXES record");
2457 }
2458
2459 field = rec_get_nth_field_old(
2460 rec, DICT_FLD__SYS_INDEXES__TABLE_ID, &len);
2461 if (len != 8) {
2462 err_len:
2463 return("incorrect column length in SYS_INDEXES");
2464 }
2465
2466 if (!allocate) {
2467 /* We are reading a SYS_INDEXES record. Copy the table_id */
2468 memcpy(table_id, (const char*) field, 8);
2469 } else if (memcmp(field, table_id, 8)) {
2470 /* Caller supplied table_id, verify it is the same
2471 id as on the index record */
2472 return(dict_load_index_id_err);
2473 }
2474
2475 field = rec_get_nth_field_old(
2476 rec, DICT_FLD__SYS_INDEXES__ID, &len);
2477 if (len != 8) {
2478 goto err_len;
2479 }
2480
2481 id = mach_read_from_8(field);
2482
2483 rec_get_nth_field_offs_old(
2484 rec, DICT_FLD__SYS_INDEXES__DB_TRX_ID, &len);
2485 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2486 goto err_len;
2487 }
2488 rec_get_nth_field_offs_old(
2489 rec, DICT_FLD__SYS_INDEXES__DB_ROLL_PTR, &len);
2490 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2491 goto err_len;
2492 }
2493
2494 field = rec_get_nth_field_old(
2495 rec, DICT_FLD__SYS_INDEXES__NAME, &name_len);
2496 if (name_len == UNIV_SQL_NULL) {
2497 goto err_len;
2498 }
2499
2500 name_buf = mem_heap_strdupl(heap, (const char*) field,
2501 name_len);
2502
2503 field = rec_get_nth_field_old(
2504 rec, DICT_FLD__SYS_INDEXES__N_FIELDS, &len);
2505 if (len != 4) {
2506 goto err_len;
2507 }
2508 n_fields = mach_read_from_4(field);
2509
2510 field = rec_get_nth_field_old(
2511 rec, DICT_FLD__SYS_INDEXES__TYPE, &len);
2512 if (len != 4) {
2513 goto err_len;
2514 }
2515 type = mach_read_from_4(field);
2516 if (type & (~0U << DICT_IT_BITS)) {
2517 return("unknown SYS_INDEXES.TYPE bits");
2518 }
2519
2520 field = rec_get_nth_field_old(
2521 rec, DICT_FLD__SYS_INDEXES__SPACE, &len);
2522 if (len != 4) {
2523 goto err_len;
2524 }
2525 space = mach_read_from_4(field);
2526
2527 field = rec_get_nth_field_old(
2528 rec, DICT_FLD__SYS_INDEXES__PAGE_NO, &len);
2529 if (len != 4) {
2530 goto err_len;
2531 }
2532
2533 if (allocate) {
2534 *index = dict_mem_index_create(table_name, name_buf,
2535 space, type, n_fields);
2536 } else {
2537 ut_a(*index);
2538
2539 dict_mem_fill_index_struct(*index, NULL, NULL, name_buf,
2540 space, type, n_fields);
2541 }
2542
2543 (*index)->id = id;
2544 (*index)->page = mach_read_from_4(field);
2545 ut_ad((*index)->page);
2546 (*index)->merge_threshold = merge_threshold;
2547
2548 return(NULL);
2549 }
2550
2551 /********************************************************************//**
2552 Loads definitions for table indexes. Adds them to the data dictionary
2553 cache.
2554 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption of dictionary
2555 table or DB_UNSUPPORTED if table has unknown index type */
2556 static MY_ATTRIBUTE((nonnull))
2557 dberr_t
dict_load_indexes(dict_table_t * table,mem_heap_t * heap,dict_err_ignore_t ignore_err)2558 dict_load_indexes(
2559 /*==============*/
2560 dict_table_t* table, /*!< in/out: table */
2561 mem_heap_t* heap, /*!< in: memory heap for temporary storage */
2562 dict_err_ignore_t ignore_err)
2563 /*!< in: error to be ignored when
2564 loading the index definition */
2565 {
2566 dict_table_t* sys_indexes;
2567 dict_index_t* sys_index;
2568 btr_pcur_t pcur;
2569 dtuple_t* tuple;
2570 dfield_t* dfield;
2571 const rec_t* rec;
2572 byte* buf;
2573 mtr_t mtr;
2574 dberr_t error = DB_SUCCESS;
2575
2576 ut_ad(mutex_own(&dict_sys->mutex));
2577
2578 mtr_start(&mtr);
2579
2580 sys_indexes = dict_table_get_low("SYS_INDEXES");
2581 sys_index = UT_LIST_GET_FIRST(sys_indexes->indexes);
2582 ut_ad(!dict_table_is_comp(sys_indexes));
2583 ut_ad(name_of_col_is(sys_indexes, sys_index,
2584 DICT_FLD__SYS_INDEXES__NAME, "NAME"));
2585 ut_ad(name_of_col_is(sys_indexes, sys_index,
2586 DICT_FLD__SYS_INDEXES__PAGE_NO, "PAGE_NO"));
2587
2588 tuple = dtuple_create(heap, 1);
2589 dfield = dtuple_get_nth_field(tuple, 0);
2590
2591 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2592 mach_write_to_8(buf, table->id);
2593
2594 dfield_set_data(dfield, buf, 8);
2595 dict_index_copy_types(tuple, sys_index, 1);
2596
2597 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2598 BTR_SEARCH_LEAF, &pcur, &mtr);
2599 for (;;) {
2600 dict_index_t* index = NULL;
2601 const char* err_msg;
2602
2603 if (!btr_pcur_is_on_user_rec(&pcur)) {
2604
2605 /* We should allow the table to open even
2606 without index when DICT_ERR_IGNORE_CORRUPT is set.
2607 DICT_ERR_IGNORE_CORRUPT is currently only set
2608 for drop table */
2609 if (dict_table_get_first_index(table) == NULL
2610 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2611 ib::warn() << "Cannot load table "
2612 << table->name
2613 << " because it has no indexes in"
2614 " InnoDB internal data dictionary.";
2615 error = DB_CORRUPTION;
2616 goto func_exit;
2617 }
2618
2619 break;
2620 }
2621
2622 rec = btr_pcur_get_rec(&pcur);
2623
2624 if ((ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2625 && (rec_get_n_fields_old(rec)
2626 == DICT_NUM_FIELDS__SYS_INDEXES
2627 /* a record for older SYS_INDEXES table
2628 (missing merge_threshold column) is acceptable. */
2629 || rec_get_n_fields_old(rec)
2630 == DICT_NUM_FIELDS__SYS_INDEXES - 1)) {
2631 const byte* field;
2632 ulint len;
2633 field = rec_get_nth_field_old(
2634 rec, DICT_FLD__SYS_INDEXES__NAME, &len);
2635
2636 if (len != UNIV_SQL_NULL
2637 && static_cast<char>(*field)
2638 == static_cast<char>(*TEMP_INDEX_PREFIX_STR)) {
2639 /* Skip indexes whose name starts with
2640 TEMP_INDEX_PREFIX, because they will
2641 be dropped during crash recovery. */
2642 goto next_rec;
2643 }
2644 }
2645
2646 err_msg = dict_load_index_low(
2647 buf, table->name.m_name, heap, rec, TRUE, &index);
2648 ut_ad((index == NULL && err_msg != NULL)
2649 || (index != NULL && err_msg == NULL));
2650
2651 if (err_msg == dict_load_index_id_err) {
2652 /* TABLE_ID mismatch means that we have
2653 run out of index definitions for the table. */
2654
2655 if (dict_table_get_first_index(table) == NULL
2656 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2657
2658 ib::warn() << "Failed to load the"
2659 " clustered index for table "
2660 << table->name
2661 << " because of the following error: "
2662 << err_msg << "."
2663 " Refusing to load the rest of the"
2664 " indexes (if any) and the whole table"
2665 " altogether.";
2666 error = DB_CORRUPTION;
2667 goto func_exit;
2668 }
2669
2670 break;
2671 } else if (err_msg == dict_load_index_del) {
2672 /* Skip delete-marked records. */
2673 goto next_rec;
2674 } else if (err_msg) {
2675 ib::error() << err_msg;
2676 if (ignore_err & DICT_ERR_IGNORE_CORRUPT) {
2677 goto next_rec;
2678 }
2679 error = DB_CORRUPTION;
2680 goto func_exit;
2681 }
2682
2683 ut_ad(index);
2684
2685 /* Check whether the index is corrupted */
2686 if (dict_index_is_corrupted(index)) {
2687
2688 ib::error() << "Index " << index->name
2689 << " of table " << table->name
2690 << " is corrupted";
2691
2692 if (!srv_load_corrupted
2693 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)
2694 && dict_index_is_clust(index)) {
2695 dict_mem_index_free(index);
2696
2697 error = DB_INDEX_CORRUPT;
2698 goto func_exit;
2699 } else {
2700 /* We will load the index if
2701 1) srv_load_corrupted is TRUE
2702 2) ignore_err is set with
2703 DICT_ERR_IGNORE_CORRUPT
2704 3) if the index corrupted is a secondary
2705 index */
2706 ib::info() << "Load corrupted index "
2707 << index->name
2708 << " of table " << table->name;
2709 }
2710 }
2711
2712 if (index->type & DICT_FTS
2713 && !dict_table_has_fts_index(table)) {
2714 /* This should have been created by now. */
2715 ut_a(table->fts != NULL);
2716 DICT_TF2_FLAG_SET(table, DICT_TF2_FTS);
2717 }
2718
2719 /* We check for unsupported types first, so that the
2720 subsequent checks are relevant for the supported types. */
2721 if (index->type & ~(DICT_CLUSTERED | DICT_UNIQUE
2722 | DICT_CORRUPT | DICT_FTS
2723 | DICT_SPATIAL | DICT_VIRTUAL)) {
2724
2725 ib::error() << "Unknown type " << index->type
2726 << " of index " << index->name
2727 << " of table " << table->name;
2728
2729 error = DB_UNSUPPORTED;
2730 dict_mem_index_free(index);
2731 goto func_exit;
2732 } else if (index->page == FIL_NULL
2733 && !table->file_unreadable
2734 && (!(index->type & DICT_FTS))) {
2735
2736 ib::error() << "Trying to load index " << index->name
2737 << " for table " << table->name
2738 << ", but the index tree has been freed!";
2739
2740 if (ignore_err & DICT_ERR_IGNORE_INDEX_ROOT) {
2741 /* If caller can tolerate this error,
2742 we will continue to load the index and
2743 let caller deal with this error. However
2744 mark the index and table corrupted. We
2745 only need to mark such in the index
2746 dictionary cache for such metadata corruption,
2747 since we would always be able to set it
2748 when loading the dictionary cache */
2749 index->table = table;
2750 dict_set_corrupted_index_cache_only(index);
2751
2752 ib::info() << "Index is corrupt but forcing"
2753 " load into data dictionary";
2754 } else {
2755 corrupted:
2756 dict_mem_index_free(index);
2757 error = DB_CORRUPTION;
2758 goto func_exit;
2759 }
2760 } else if (!dict_index_is_clust(index)
2761 && NULL == dict_table_get_first_index(table)) {
2762
2763 ib::error() << "Trying to load index " << index->name
2764 << " for table " << table->name
2765 << ", but the first index is not clustered!";
2766
2767 goto corrupted;
2768 } else if (dict_is_sys_table(table->id)
2769 && (dict_index_is_clust(index)
2770 || ((table == dict_sys->sys_tables)
2771 && !strcmp("ID_IND", index->name)))) {
2772
2773 /* The index was created in memory already at booting
2774 of the database server */
2775 dict_mem_index_free(index);
2776 } else {
2777 dict_load_fields(index, heap);
2778
2779 error = dict_index_add_to_cache(
2780 table, index, index->page, FALSE);
2781
2782 /* The data dictionary tables should never contain
2783 invalid index definitions. If we ignored this error
2784 and simply did not load this index definition, the
2785 .frm file would disagree with the index definitions
2786 inside InnoDB. */
2787 if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
2788
2789 goto func_exit;
2790 }
2791 }
2792 next_rec:
2793 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2794 }
2795
2796 ut_ad(table->fts_doc_id_index == NULL);
2797
2798 if (table->fts != NULL) {
2799 table->fts_doc_id_index = dict_table_get_index_on_name(
2800 table, FTS_DOC_ID_INDEX_NAME);
2801 }
2802
2803 /* If the table contains FTS indexes, populate table->fts->indexes */
2804 if (dict_table_has_fts_index(table)) {
2805 ut_ad(table->fts_doc_id_index != NULL);
2806 /* table->fts->indexes should have been created. */
2807 ut_a(table->fts->indexes != NULL);
2808 dict_table_get_all_fts_indexes(table, table->fts->indexes);
2809 }
2810
2811 func_exit:
2812 btr_pcur_close(&pcur);
2813 mtr_commit(&mtr);
2814
2815 return(error);
2816 }
2817
2818 /** Loads a table definition from a SYS_TABLES record to dict_table_t.
2819 Does not load any columns or indexes.
2820 @param[in] name Table name
2821 @param[in] rec SYS_TABLES record
2822 @param[out,own] table table, or NULL
2823 @return error message, or NULL on success */
2824 static
2825 const char*
dict_load_table_low(table_name_t & name,const rec_t * rec,dict_table_t ** table)2826 dict_load_table_low(
2827 table_name_t& name,
2828 const rec_t* rec,
2829 dict_table_t** table)
2830 {
2831 table_id_t table_id;
2832 ulint space_id;
2833 ulint n_cols;
2834 ulint t_num;
2835 ulint flags;
2836 ulint flags2;
2837 ulint n_v_col;
2838
2839 const char* error_text = dict_sys_tables_rec_check(rec);
2840 if (error_text != NULL) {
2841 return(error_text);
2842 }
2843
2844 dict_sys_tables_rec_read(rec, name, &table_id, &space_id,
2845 &t_num, &flags, &flags2);
2846
2847 if (flags == ULINT_UNDEFINED) {
2848 return("incorrect flags in SYS_TABLES");
2849 }
2850
2851 dict_table_decode_n_col(t_num, &n_cols, &n_v_col);
2852
2853 *table = dict_mem_table_create(
2854 name.m_name, space_id, n_cols + n_v_col, n_v_col, flags, flags2);
2855 (*table)->id = table_id;
2856 (*table)->set_file_readable();
2857
2858 return(NULL);
2859 }
2860
2861 /********************************************************************//**
2862 Using the table->heap, copy the null-terminated filepath into
2863 table->data_dir_path and replace the 'databasename/tablename.ibd'
2864 portion with 'tablename'.
2865 This allows SHOW CREATE TABLE to return the correct DATA DIRECTORY path.
2866 Make this data directory path only if it has not yet been saved. */
2867 void
dict_save_data_dir_path(dict_table_t * table,char * filepath)2868 dict_save_data_dir_path(
2869 /*====================*/
2870 dict_table_t* table, /*!< in/out: table */
2871 char* filepath) /*!< in: filepath of tablespace */
2872 {
2873 ut_ad(mutex_own(&dict_sys->mutex));
2874 ut_a(DICT_TF_HAS_DATA_DIR(table->flags));
2875
2876 ut_a(!table->data_dir_path);
2877 ut_a(filepath);
2878
2879 /* Be sure this filepath is not the default filepath. */
2880 char* default_filepath = fil_make_filepath(
2881 NULL, table->name.m_name, IBD, false);
2882 if (default_filepath) {
2883 if (0 != strcmp(filepath, default_filepath)) {
2884 ulint pathlen = strlen(filepath);
2885 ut_a(pathlen < OS_FILE_MAX_PATH);
2886 ut_a(0 == strcmp(filepath + pathlen - 4, DOT_IBD));
2887
2888 table->data_dir_path = mem_heap_strdup(
2889 table->heap, filepath);
2890 os_file_make_data_dir_path(table->data_dir_path);
2891 }
2892
2893 ut_free(default_filepath);
2894 }
2895 }
2896
2897 /** Make sure the data_dir_path is saved in dict_table_t if DATA DIRECTORY
2898 was used. Try to read it from the fil_system first, then from SYS_DATAFILES.
2899 @param[in] table Table object
2900 @param[in] dict_mutex_own true if dict_sys->mutex is owned already */
2901 void
dict_get_and_save_data_dir_path(dict_table_t * table,bool dict_mutex_own)2902 dict_get_and_save_data_dir_path(
2903 dict_table_t* table,
2904 bool dict_mutex_own)
2905 {
2906 if (DICT_TF_HAS_DATA_DIR(table->flags)
2907 && (!table->data_dir_path)) {
2908 char* path = fil_space_get_first_path(table->space);
2909
2910 if (!dict_mutex_own) {
2911 dict_mutex_enter_for_mysql();
2912 }
2913
2914 if (path == NULL) {
2915 path = dict_get_first_path(table->space);
2916 }
2917
2918 if (path != NULL) {
2919 dict_save_data_dir_path(table, path);
2920 ut_free(path);
2921 }
2922
2923 if (table->data_dir_path == NULL) {
2924 /* Since we did not set the table data_dir_path,
2925 unset the flag. This does not change SYS_DATAFILES
2926 or SYS_TABLES or FSP_FLAGS on the header page of the
2927 tablespace, but it makes dict_table_t consistent. */
2928 table->flags &= ~DICT_TF_MASK_DATA_DIR;
2929 }
2930
2931 if (!dict_mutex_own) {
2932 dict_mutex_exit_for_mysql();
2933 }
2934 }
2935 }
2936
2937 /** Make sure the tablespace name is saved in dict_table_t if the table
2938 uses a general tablespace.
2939 Try to read it from the fil_system_t first, then from SYS_TABLESPACES.
2940 @param[in] table Table object
2941 @param[in] dict_mutex_own) true if dict_sys->mutex is owned already */
2942 void
dict_get_and_save_space_name(dict_table_t * table,bool dict_mutex_own)2943 dict_get_and_save_space_name(
2944 dict_table_t* table,
2945 bool dict_mutex_own)
2946 {
2947 /* Do this only for general tablespaces. */
2948 if (!DICT_TF_HAS_SHARED_SPACE(table->flags)) {
2949 return;
2950 }
2951
2952 bool use_cache = true;
2953 if (table->tablespace != NULL) {
2954
2955 if (srv_sys_tablespaces_open
2956 && dict_table_has_temp_general_tablespace_name(
2957 table->tablespace)) {
2958 /* We previous saved the temporary name,
2959 get the real one now. */
2960 use_cache = false;
2961 } else {
2962 /* Keep and use this name */
2963 return;
2964 }
2965 }
2966
2967 if (use_cache) {
2968 fil_space_t* space = fil_space_acquire_silent(table->space);
2969
2970 if (space != NULL) {
2971 /* Use this name unless it is a temporary general
2972 tablespace name and we can now replace it. */
2973 if (!srv_sys_tablespaces_open
2974 || !dict_table_has_temp_general_tablespace_name(
2975 space->name)) {
2976
2977 /* Use this tablespace name */
2978 table->tablespace = mem_heap_strdup(
2979 table->heap, space->name);
2980
2981 fil_space_release(space);
2982 return;
2983 }
2984 fil_space_release(space);
2985 }
2986 }
2987
2988 /* Read it from the dictionary. */
2989 if (srv_sys_tablespaces_open) {
2990 if (!dict_mutex_own) {
2991 dict_mutex_enter_for_mysql();
2992 }
2993
2994 table->tablespace = dict_space_get_name(
2995 table->space, table->heap);
2996
2997 if (!dict_mutex_own) {
2998 dict_mutex_exit_for_mysql();
2999 }
3000 }
3001 }
3002
3003 /** Loads a table definition and also all its index definitions, and also
3004 the cluster definition if the table is a member in a cluster. Also loads
3005 all foreign key constraints where the foreign key is in the table or where
3006 a foreign key references columns in this table.
3007 @param[in] name Table name in the dbname/tablename format
3008 @param[in] cached true=add to cache, false=do not
3009 @param[in] ignore_err Error to be ignored when loading
3010 table and its index definition
3011 @return table, NULL if does not exist; if the table is stored in an
3012 .ibd file, but the file does not exist, then we set the file_unreadable
3013 flag in the table object we return. */
3014 dict_table_t*
dict_load_table(const char * name,bool cached,dict_err_ignore_t ignore_err)3015 dict_load_table(
3016 const char* name,
3017 bool cached,
3018 dict_err_ignore_t ignore_err)
3019 {
3020 dict_names_t fk_list;
3021 dict_table_t* result;
3022 dict_names_t::iterator i;
3023 table_name_t table_name;
3024
3025 DBUG_ENTER("dict_load_table");
3026 DBUG_PRINT("dict_load_table", ("loading table: '%s'", name));
3027
3028 ut_ad(mutex_own(&dict_sys->mutex));
3029
3030 table_name.m_name = const_cast<char*>(name);
3031
3032 result = dict_table_check_if_in_cache_low(name);
3033
3034 if (!result) {
3035 result = dict_load_table_one(table_name, cached, ignore_err,
3036 fk_list);
3037 while (!fk_list.empty()) {
3038 table_name_t fk_table_name;
3039 dict_table_t* fk_table;
3040
3041 fk_table_name.m_name =
3042 const_cast<char*>(fk_list.front());
3043 fk_table = dict_table_check_if_in_cache_low(
3044 fk_table_name.m_name);
3045 if (!fk_table) {
3046 dict_load_table_one(fk_table_name, cached,
3047 ignore_err, fk_list);
3048 }
3049 fk_list.pop_front();
3050 }
3051 }
3052
3053 DBUG_RETURN(result);
3054 }
3055
3056 /** Opens a tablespace for dict_load_table_one()
3057 @param[in,out] table A table that refers to the tablespace to open
3058 @param[in] heap A memory heap
3059 @param[in] ignore_err Whether to ignore an error. */
3060 UNIV_INLINE
3061 void
dict_load_tablespace(dict_table_t * table,mem_heap_t * heap,dict_err_ignore_t ignore_err)3062 dict_load_tablespace(
3063 dict_table_t* table,
3064 mem_heap_t* heap,
3065 dict_err_ignore_t ignore_err)
3066 {
3067 /* The system tablespace is always available. */
3068 if (is_system_tablespace(table->space)) {
3069 return;
3070 }
3071
3072 if (table->flags2 & DICT_TF2_DISCARDED) {
3073 ib::warn() << "Tablespace for table " << table->name
3074 << " is set as discarded.";
3075 table->set_file_unreadable();
3076 return;
3077 }
3078
3079 if (dict_table_is_temporary(table)) {
3080 /* Do not bother to retry opening temporary tables. */
3081 table->set_file_unreadable();
3082 return;
3083 }
3084
3085 /* A file-per-table table name is also the tablespace name.
3086 A general tablespace name is not the same as the table name.
3087 Use the general tablespace name if it can be read from the
3088 dictionary, if not use 'innodb_general_##. */
3089 char* shared_space_name = NULL;
3090 char* space_name;
3091 if (DICT_TF_HAS_SHARED_SPACE(table->flags)) {
3092 if (srv_sys_tablespaces_open) {
3093 shared_space_name =
3094 dict_space_get_name(table->space, NULL);
3095
3096 } else {
3097 /* Make the temporary tablespace name. */
3098 shared_space_name = static_cast<char*>(
3099 ut_malloc_nokey(
3100 strlen(general_space_name) + 20));
3101
3102 sprintf(shared_space_name, "%s_" ULINTPF,
3103 general_space_name,
3104 static_cast<ulint>(table->space));
3105 }
3106 space_name = shared_space_name;
3107 } else {
3108 space_name = table->name.m_name;
3109 }
3110
3111 /* The tablespace may already be open. */
3112 if (fil_space_for_table_exists_in_mem(
3113 table->space, space_name, false,
3114 true, heap, table->id)) {
3115 ut_free(shared_space_name);
3116 return;
3117 }
3118
3119 if (!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)) {
3120 ib::error() << "Failed to find tablespace for table "
3121 << table->name << " in the cache. Attempting"
3122 " to load the tablespace with space id "
3123 << table->space;
3124 }
3125
3126 /* Use the remote filepath if needed. This parameter is optional
3127 in the call to fil_ibd_open(). If not supplied, it will be built
3128 from the space_name. */
3129 char* filepath = NULL;
3130 if (DICT_TF_HAS_DATA_DIR(table->flags)) {
3131 /* This will set table->data_dir_path from either
3132 fil_system or SYS_DATAFILES */
3133 dict_get_and_save_data_dir_path(table, true);
3134
3135 if (table->data_dir_path) {
3136 filepath = fil_make_filepath(
3137 table->data_dir_path,
3138 table->name.m_name, IBD, true);
3139 }
3140
3141 } else if (DICT_TF_HAS_SHARED_SPACE(table->flags)) {
3142 /* Set table->tablespace from either
3143 fil_system or SYS_TABLESPACES */
3144 dict_get_and_save_space_name(table, true);
3145
3146 /* Set the filepath from either
3147 fil_system or SYS_DATAFILES. */
3148 filepath = dict_get_first_path(table->space);
3149 if (filepath == NULL) {
3150 ib::warn() << "Could not find the filepath"
3151 " for table " << table->name <<
3152 ", space ID " << table->space;
3153 }
3154 }
3155
3156 /* Try to open the tablespace. We set the 2nd param (fix_dict) to
3157 false because we do not have an x-lock on dict_operation_lock */
3158 ulint fsp_flags = dict_tf_to_fsp_flags(table->flags,
3159 false,
3160 dict_table_is_encrypted(table));
3161
3162 Keyring_encryption_info keyring_encryption_info;
3163
3164 dberr_t err = fil_ibd_open(
3165 true, false, FIL_TYPE_TABLESPACE, table->space,
3166 fsp_flags, space_name, filepath, keyring_encryption_info);
3167
3168 if (err != DB_SUCCESS) {
3169 /* We failed to find a sensible tablespace file */
3170 table->set_file_unreadable();
3171 }
3172
3173 table->keyring_encryption_info = keyring_encryption_info;
3174
3175 ut_free(shared_space_name);
3176 ut_free(filepath);
3177 }
3178
3179 /** Loads a table definition and also all its index definitions.
3180
3181 Loads those foreign key constraints whose referenced table is already in
3182 dictionary cache. If a foreign key constraint is not loaded, then the
3183 referenced table is pushed into the output stack (fk_tables), if it is not
3184 NULL. These tables must be subsequently loaded so that all the foreign
3185 key constraints are loaded into memory.
3186
3187 @param[in] name Table name in the db/tablename format
3188 @param[in] cached true=add to cache, false=do not
3189 @param[in] ignore_err Error to be ignored when loading table
3190 and its index definition
3191 @param[out] fk_tables Related table names that must also be
3192 loaded to ensure that all foreign key
3193 constraints are loaded.
3194 @return table, NULL if does not exist; if the table is stored in an
3195 .ibd file, but the file does not exist, then we set the
3196 file_unreadable flag TRUE in the table object we return */
3197 static
3198 dict_table_t*
dict_load_table_one(table_name_t & name,bool cached,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)3199 dict_load_table_one(
3200 table_name_t& name,
3201 bool cached,
3202 dict_err_ignore_t ignore_err,
3203 dict_names_t& fk_tables)
3204 {
3205 dberr_t err;
3206 dict_table_t* table;
3207 dict_table_t* sys_tables;
3208 btr_pcur_t pcur;
3209 dict_index_t* sys_index;
3210 dtuple_t* tuple;
3211 mem_heap_t* heap;
3212 dfield_t* dfield;
3213 const rec_t* rec;
3214 const byte* field;
3215 ulint len;
3216 const char* err_msg;
3217 mtr_t mtr;
3218
3219 DBUG_ENTER("dict_load_table_one");
3220 DBUG_PRINT("dict_load_table_one", ("table: %s", name.m_name));
3221
3222 ut_ad(mutex_own(&dict_sys->mutex));
3223
3224 heap = mem_heap_create(32000);
3225
3226 mtr_start(&mtr);
3227
3228 sys_tables = dict_table_get_low("SYS_TABLES");
3229 sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
3230 ut_ad(!dict_table_is_comp(sys_tables));
3231 ut_ad(name_of_col_is(sys_tables, sys_index,
3232 DICT_FLD__SYS_TABLES__ID, "ID"));
3233 ut_ad(name_of_col_is(sys_tables, sys_index,
3234 DICT_FLD__SYS_TABLES__N_COLS, "N_COLS"));
3235 ut_ad(name_of_col_is(sys_tables, sys_index,
3236 DICT_FLD__SYS_TABLES__TYPE, "TYPE"));
3237 ut_ad(name_of_col_is(sys_tables, sys_index,
3238 DICT_FLD__SYS_TABLES__MIX_LEN, "MIX_LEN"));
3239 ut_ad(name_of_col_is(sys_tables, sys_index,
3240 DICT_FLD__SYS_TABLES__SPACE, "SPACE"));
3241
3242 tuple = dtuple_create(heap, 1);
3243 dfield = dtuple_get_nth_field(tuple, 0);
3244
3245 dfield_set_data(dfield, name.m_name, ut_strlen(name.m_name));
3246 dict_index_copy_types(tuple, sys_index, 1);
3247
3248 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3249 BTR_SEARCH_LEAF, &pcur, &mtr);
3250 rec = btr_pcur_get_rec(&pcur);
3251
3252 if (!btr_pcur_is_on_user_rec(&pcur)
3253 || rec_get_deleted_flag(rec, 0)) {
3254 /* Not found */
3255 err_exit:
3256 btr_pcur_close(&pcur);
3257 mtr_commit(&mtr);
3258 mem_heap_free(heap);
3259
3260 DBUG_RETURN(NULL);
3261 }
3262
3263 field = rec_get_nth_field_old(
3264 rec, DICT_FLD__SYS_TABLES__NAME, &len);
3265
3266 /* Check if the table name in record is the searched one */
3267 if (len != ut_strlen(name.m_name)
3268 || 0 != ut_memcmp(name.m_name, field, len)) {
3269
3270 goto err_exit;
3271 }
3272
3273 err_msg = dict_load_table_low(name, rec, &table);
3274
3275 if (err_msg) {
3276
3277 ib::error() << err_msg;
3278 goto err_exit;
3279 }
3280
3281 btr_pcur_close(&pcur);
3282 mtr_commit(&mtr);
3283
3284 dict_load_tablespace(table, heap, ignore_err);
3285
3286 dict_load_columns(table, heap);
3287
3288 dict_load_virtual(table, heap);
3289
3290 if (cached) {
3291 dict_table_add_to_cache(table, TRUE, heap);
3292 } else {
3293 dict_table_add_system_columns(table, heap);
3294 }
3295
3296 mem_heap_empty(heap);
3297
3298 /* If there is no tablespace for the table then we only need to
3299 load the index definitions. So that we can IMPORT the tablespace
3300 later. When recovering table locks for resurrected incomplete
3301 transactions, the tablespace should exist, because DDL operations
3302 were not allowed while the table is being locked by a transaction. */
3303 dict_err_ignore_t index_load_err =
3304 !(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
3305 && table->file_unreadable
3306 ? DICT_ERR_IGNORE_ALL
3307 : ignore_err;
3308 err = dict_load_indexes(table, heap, index_load_err);
3309
3310 if (err == DB_INDEX_CORRUPT) {
3311 /* Refuse to load the table if the table has a corrupted
3312 cluster index */
3313 if (!srv_load_corrupted) {
3314
3315 ib::error() << "Load table " << table->name
3316 << " failed, the table has"
3317 " corrupted clustered indexes. Turn on"
3318 " 'innodb_force_load_corrupted' to drop it";
3319 dict_table_remove_from_cache(table);
3320 table = NULL;
3321 goto func_exit;
3322 } else {
3323 dict_index_t* clust_index;
3324 clust_index = dict_table_get_first_index(table);
3325
3326 if (dict_index_is_corrupted(clust_index)) {
3327 table->corrupted = TRUE;
3328 }
3329 }
3330 }
3331
3332 /* We don't trust the table->flags2(retrieved from SYS_TABLES.MIX_LEN
3333 field) if the datafiles are from 3.23.52 version. To identify this
3334 version, we do the below check and reset the flags. */
3335 if (!DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)
3336 && table->space == srv_sys_space.space_id()
3337 && table->flags == 0) {
3338 table->flags2 = 0;
3339 }
3340
3341 DBUG_EXECUTE_IF("ib_table_invalid_flags",
3342 if(strcmp(table->name.m_name, "test/t1") == 0) {
3343 table->flags2 = 255;
3344 table->flags = 255;
3345 });
3346
3347 if (!dict_tf2_is_valid(table->flags, table->flags2)) {
3348 ib::error() << "Table " << table->name << " in InnoDB"
3349 " data dictionary contains invalid flags."
3350 " SYS_TABLES.MIX_LEN=" << table->flags2;
3351 table->flags2 &= ~(DICT_TF2_TEMPORARY|DICT_TF2_INTRINSIC);
3352 dict_table_remove_from_cache(table);
3353 table = NULL;
3354 err = DB_FAIL;
3355 goto func_exit;
3356 }
3357
3358 /* Initialize table foreign_child value. Its value could be
3359 changed when dict_load_foreigns() is called below */
3360 table->fk_max_recusive_level = 0;
3361
3362 /* If the force recovery flag is set, we open the table irrespective
3363 of the error condition, since the user may want to dump data from the
3364 clustered index. However we load the foreign key information only if
3365 all indexes were loaded. */
3366 if (!cached || table->file_unreadable) {
3367 /* Don't attempt to load the indexes from disk. */
3368 } else if (err == DB_SUCCESS) {
3369 err = dict_load_foreigns(table->name.m_name, NULL,
3370 true, true,
3371 ignore_err, fk_tables);
3372
3373 if (err != DB_SUCCESS) {
3374 ib::warn() << "Load table " << table->name
3375 << " failed, the table has missing"
3376 " foreign key indexes. Turn off"
3377 " 'foreign_key_checks' and try again.";
3378
3379 dict_table_remove_from_cache(table);
3380 table = NULL;
3381 } else {
3382 dict_mem_table_fill_foreign_vcol_set(table);
3383 table->fk_max_recusive_level = 0;
3384 }
3385 } else {
3386 dict_index_t* index;
3387
3388 /* Make sure that at least the clustered index was loaded.
3389 Otherwise refuse to load the table */
3390 index = dict_table_get_first_index(table);
3391
3392 if (!srv_force_recovery
3393 || !index
3394 || !dict_index_is_clust(index)) {
3395
3396 dict_table_remove_from_cache(table);
3397 table = NULL;
3398
3399 } else if (dict_index_is_corrupted(index)
3400 && !table->file_unreadable) {
3401
3402 /* It is possible we force to load a corrupted
3403 clustered index if srv_load_corrupted is set.
3404 Mark the table as corrupted in this case */
3405 table->corrupted = TRUE;
3406 }
3407 }
3408
3409 func_exit:
3410 mem_heap_free(heap);
3411
3412 ut_ad(!table
3413 || ignore_err != DICT_ERR_IGNORE_NONE
3414 || table->file_unreadable
3415 || !table->corrupted);
3416
3417 if (table && table->fts) {
3418 if (!(dict_table_has_fts_index(table)
3419 || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)
3420 || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID))) {
3421 /* the table->fts could be created in dict_load_column
3422 when a user defined FTS_DOC_ID is present, but no
3423 FTS */
3424 fts_optimize_remove_table(table);
3425 fts_free(table);
3426 } else {
3427 fts_optimize_add_table(table);
3428 }
3429 }
3430
3431 ut_ad(err != DB_SUCCESS || dict_foreign_set_validate(*table));
3432
3433 DBUG_RETURN(table);
3434 }
3435
3436 /***********************************************************************//**
3437 Loads a table object based on the table id.
3438 @return table; NULL if table does not exist */
3439 dict_table_t*
dict_load_table_on_id(table_id_t table_id,dict_err_ignore_t ignore_err)3440 dict_load_table_on_id(
3441 /*==================*/
3442 table_id_t table_id, /*!< in: table id */
3443 dict_err_ignore_t ignore_err) /*!< in: errors to ignore
3444 when loading the table */
3445 {
3446 byte id_buf[8];
3447 btr_pcur_t pcur;
3448 mem_heap_t* heap;
3449 dtuple_t* tuple;
3450 dfield_t* dfield;
3451 dict_index_t* sys_table_ids;
3452 dict_table_t* sys_tables;
3453 const rec_t* rec;
3454 const byte* field;
3455 ulint len;
3456 dict_table_t* table;
3457 mtr_t mtr;
3458
3459 ut_ad(mutex_own(&dict_sys->mutex));
3460
3461 table = NULL;
3462
3463 /* NOTE that the operation of this function is protected by
3464 the dictionary mutex, and therefore no deadlocks can occur
3465 with other dictionary operations. */
3466
3467 mtr_start(&mtr);
3468 /*---------------------------------------------------*/
3469 /* Get the secondary index based on ID for table SYS_TABLES */
3470 sys_tables = dict_sys->sys_tables;
3471 sys_table_ids = dict_table_get_next_index(
3472 dict_table_get_first_index(sys_tables));
3473 ut_ad(!dict_table_is_comp(sys_tables));
3474 ut_ad(!dict_index_is_clust(sys_table_ids));
3475 heap = mem_heap_create(256);
3476
3477 tuple = dtuple_create(heap, 1);
3478 dfield = dtuple_get_nth_field(tuple, 0);
3479
3480 /* Write the table id in byte format to id_buf */
3481 mach_write_to_8(id_buf, table_id);
3482
3483 dfield_set_data(dfield, id_buf, 8);
3484 dict_index_copy_types(tuple, sys_table_ids, 1);
3485
3486 btr_pcur_open_on_user_rec(sys_table_ids, tuple, PAGE_CUR_GE,
3487 BTR_SEARCH_LEAF, &pcur, &mtr);
3488
3489 rec = btr_pcur_get_rec(&pcur);
3490
3491 if (page_rec_is_user_rec(rec)) {
3492 /*---------------------------------------------------*/
3493 /* Now we have the record in the secondary index
3494 containing the table ID and NAME */
3495 check_rec:
3496 field = rec_get_nth_field_old(
3497 rec, DICT_FLD__SYS_TABLE_IDS__ID, &len);
3498 ut_ad(len == 8);
3499
3500 /* Check if the table id in record is the one searched for */
3501 if (table_id == mach_read_from_8(field)) {
3502 if (rec_get_deleted_flag(rec, 0)) {
3503 /* Until purge has completed, there
3504 may be delete-marked duplicate records
3505 for the same SYS_TABLES.ID, but different
3506 SYS_TABLES.NAME. */
3507 while (btr_pcur_move_to_next(&pcur, &mtr)) {
3508 rec = btr_pcur_get_rec(&pcur);
3509
3510 if (page_rec_is_user_rec(rec)) {
3511 goto check_rec;
3512 }
3513 }
3514 } else {
3515 /* Now we get the table name from the record */
3516 field = rec_get_nth_field_old(rec,
3517 DICT_FLD__SYS_TABLE_IDS__NAME, &len);
3518 /* Load the table definition to memory */
3519 char* table_name = mem_heap_strdupl(
3520 heap, (char*) field, len);
3521 table = dict_load_table(table_name, true, ignore_err);
3522 }
3523 }
3524 }
3525
3526 btr_pcur_close(&pcur);
3527 mtr_commit(&mtr);
3528 mem_heap_free(heap);
3529
3530 return(table);
3531 }
3532
3533 /********************************************************************//**
3534 This function is called when the database is booted. Loads system table
3535 index definitions except for the clustered index which is added to the
3536 dictionary cache at booting before calling this function. */
3537 void
dict_load_sys_table(dict_table_t * table)3538 dict_load_sys_table(
3539 /*================*/
3540 dict_table_t* table) /*!< in: system table */
3541 {
3542 mem_heap_t* heap;
3543
3544 ut_ad(mutex_own(&dict_sys->mutex));
3545
3546 heap = mem_heap_create(1000);
3547
3548 dict_load_indexes(table, heap, DICT_ERR_IGNORE_NONE);
3549
3550 mem_heap_free(heap);
3551 }
3552
3553 /********************************************************************//**
3554 Loads foreign key constraint col names (also for the referenced table).
3555 Members that must be set (and valid) in foreign:
3556 foreign->heap
3557 foreign->n_fields
3558 foreign->id ('\0'-terminated)
3559 Members that will be created and set by this function:
3560 foreign->foreign_col_names[i]
3561 foreign->referenced_col_names[i]
3562 (for i=0..foreign->n_fields-1) */
3563 static
3564 void
dict_load_foreign_cols(dict_foreign_t * foreign)3565 dict_load_foreign_cols(
3566 /*===================*/
3567 dict_foreign_t* foreign)/*!< in/out: foreign constraint object */
3568 {
3569 dict_table_t* sys_foreign_cols;
3570 dict_index_t* sys_index;
3571 btr_pcur_t pcur;
3572 dtuple_t* tuple;
3573 dfield_t* dfield;
3574 const rec_t* rec;
3575 const byte* field;
3576 ulint len;
3577 ulint i;
3578 mtr_t mtr;
3579 size_t id_len;
3580
3581 ut_ad(mutex_own(&dict_sys->mutex));
3582
3583 id_len = strlen(foreign->id);
3584
3585 foreign->foreign_col_names = static_cast<const char**>(
3586 mem_heap_alloc(foreign->heap,
3587 foreign->n_fields * sizeof(void*)));
3588
3589 foreign->referenced_col_names = static_cast<const char**>(
3590 mem_heap_alloc(foreign->heap,
3591 foreign->n_fields * sizeof(void*)));
3592
3593 mtr_start(&mtr);
3594
3595 sys_foreign_cols = dict_table_get_low("SYS_FOREIGN_COLS");
3596
3597 sys_index = UT_LIST_GET_FIRST(sys_foreign_cols->indexes);
3598 ut_ad(!dict_table_is_comp(sys_foreign_cols));
3599
3600 tuple = dtuple_create(foreign->heap, 1);
3601 dfield = dtuple_get_nth_field(tuple, 0);
3602
3603 dfield_set_data(dfield, foreign->id, id_len);
3604 dict_index_copy_types(tuple, sys_index, 1);
3605
3606 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3607 BTR_SEARCH_LEAF, &pcur, &mtr);
3608 for (i = 0; i < foreign->n_fields; i++) {
3609
3610 rec = btr_pcur_get_rec(&pcur);
3611
3612 ut_a(btr_pcur_is_on_user_rec(&pcur));
3613 ut_a(!rec_get_deleted_flag(rec, 0));
3614
3615 field = rec_get_nth_field_old(
3616 rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
3617
3618 if (len != id_len || ut_memcmp(foreign->id, field, len) != 0) {
3619 const rec_t* pos;
3620 ulint pos_len;
3621 const rec_t* for_col_name;
3622 ulint for_col_name_len;
3623 const rec_t* ref_col_name;
3624 ulint ref_col_name_len;
3625
3626 pos = rec_get_nth_field_old(
3627 rec, DICT_FLD__SYS_FOREIGN_COLS__POS,
3628 &pos_len);
3629
3630 for_col_name = rec_get_nth_field_old(
3631 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME,
3632 &for_col_name_len);
3633
3634 ref_col_name = rec_get_nth_field_old(
3635 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME,
3636 &ref_col_name_len);
3637
3638 ib::fatal sout;
3639
3640 sout << "Unable to load column names for foreign"
3641 " key '" << foreign->id
3642 << "' because it was not found in"
3643 " InnoDB internal table SYS_FOREIGN_COLS. The"
3644 " closest entry we found is:"
3645 " (ID='";
3646 sout.write(field, len);
3647 sout << "', POS=" << mach_read_from_4(pos)
3648 << ", FOR_COL_NAME='";
3649 sout.write(for_col_name, for_col_name_len);
3650 sout << "', REF_COL_NAME='";
3651 sout.write(ref_col_name, ref_col_name_len);
3652 sout << "')";
3653 }
3654
3655 field = rec_get_nth_field_old(
3656 rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
3657 ut_a(len == 4);
3658 ut_a(i == mach_read_from_4(field));
3659
3660 field = rec_get_nth_field_old(
3661 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
3662 foreign->foreign_col_names[i] = mem_heap_strdupl(
3663 foreign->heap, (char*) field, len);
3664
3665 field = rec_get_nth_field_old(
3666 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
3667 foreign->referenced_col_names[i] = mem_heap_strdupl(
3668 foreign->heap, (char*) field, len);
3669
3670 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
3671 }
3672
3673 btr_pcur_close(&pcur);
3674 mtr_commit(&mtr);
3675 }
3676
3677 /***********************************************************************//**
3678 Loads a foreign key constraint to the dictionary cache. If the referenced
3679 table is not yet loaded, it is added in the output parameter (fk_tables).
3680 @return DB_SUCCESS or error code */
3681 static MY_ATTRIBUTE((nonnull(1), warn_unused_result))
3682 dberr_t
dict_load_foreign(const char * id,const char ** col_names,bool check_recursive,bool check_charsets,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)3683 dict_load_foreign(
3684 /*==============*/
3685 const char* id,
3686 /*!< in: foreign constraint id, must be
3687 '\0'-terminated */
3688 const char** col_names,
3689 /*!< in: column names, or NULL
3690 to use foreign->foreign_table->col_names */
3691 bool check_recursive,
3692 /*!< in: whether to record the foreign table
3693 parent count to avoid unlimited recursive
3694 load of chained foreign tables */
3695 bool check_charsets,
3696 /*!< in: whether to check charset
3697 compatibility */
3698 dict_err_ignore_t ignore_err,
3699 /*!< in: error to be ignored */
3700 dict_names_t& fk_tables)
3701 /*!< out: the foreign key constraint is added
3702 to the dictionary cache only if the referenced
3703 table is already in cache. Otherwise, the
3704 foreign key constraint is not added to cache,
3705 and the referenced table is added to this
3706 stack. */
3707 {
3708 dict_foreign_t* foreign;
3709 dict_table_t* sys_foreign;
3710 btr_pcur_t pcur;
3711 dict_index_t* sys_index;
3712 dtuple_t* tuple;
3713 mem_heap_t* heap2;
3714 dfield_t* dfield;
3715 const rec_t* rec;
3716 const byte* field;
3717 ulint len;
3718 ulint n_fields_and_type;
3719 mtr_t mtr;
3720 dict_table_t* for_table;
3721 dict_table_t* ref_table;
3722 size_t id_len;
3723
3724 DBUG_ENTER("dict_load_foreign");
3725 DBUG_PRINT("dict_load_foreign",
3726 ("id: '%s', check_recursive: %d", id, check_recursive));
3727
3728 ut_ad(mutex_own(&dict_sys->mutex));
3729
3730 id_len = strlen(id);
3731
3732 heap2 = mem_heap_create(1000);
3733
3734 mtr_start(&mtr);
3735
3736 sys_foreign = dict_table_get_low("SYS_FOREIGN");
3737
3738 sys_index = UT_LIST_GET_FIRST(sys_foreign->indexes);
3739 ut_ad(!dict_table_is_comp(sys_foreign));
3740
3741 tuple = dtuple_create(heap2, 1);
3742 dfield = dtuple_get_nth_field(tuple, 0);
3743
3744 dfield_set_data(dfield, id, id_len);
3745 dict_index_copy_types(tuple, sys_index, 1);
3746
3747 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3748 BTR_SEARCH_LEAF, &pcur, &mtr);
3749 rec = btr_pcur_get_rec(&pcur);
3750
3751 if (!btr_pcur_is_on_user_rec(&pcur)
3752 || rec_get_deleted_flag(rec, 0)) {
3753 /* Not found */
3754
3755 ib::error() << "Cannot load foreign constraint " << id
3756 << ": could not find the relevant record in "
3757 << "SYS_FOREIGN";
3758
3759 btr_pcur_close(&pcur);
3760 mtr_commit(&mtr);
3761 mem_heap_free(heap2);
3762
3763 DBUG_RETURN(DB_ERROR);
3764 }
3765
3766 field = rec_get_nth_field_old(rec, DICT_FLD__SYS_FOREIGN__ID, &len);
3767
3768 /* Check if the id in record is the searched one */
3769 if (len != id_len || ut_memcmp(id, field, len) != 0) {
3770
3771 {
3772 ib::error err;
3773 err << "Cannot load foreign constraint " << id
3774 << ": found ";
3775 err.write(field, len);
3776 err << " instead in SYS_FOREIGN";
3777 }
3778
3779 btr_pcur_close(&pcur);
3780 mtr_commit(&mtr);
3781 mem_heap_free(heap2);
3782
3783 DBUG_RETURN(DB_ERROR);
3784 }
3785
3786 /* Read the table names and the number of columns associated
3787 with the constraint */
3788
3789 mem_heap_free(heap2);
3790
3791 foreign = dict_mem_foreign_create();
3792
3793 n_fields_and_type = mach_read_from_4(
3794 rec_get_nth_field_old(
3795 rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len));
3796
3797 ut_a(len == 4);
3798
3799 /* We store the type in the bits 24..29 of n_fields_and_type. */
3800
3801 foreign->type = (unsigned int) (n_fields_and_type >> 24);
3802 foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL);
3803
3804 foreign->id = mem_heap_strdupl(foreign->heap, id, id_len);
3805
3806 field = rec_get_nth_field_old(
3807 rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
3808
3809 foreign->foreign_table_name = mem_heap_strdupl(
3810 foreign->heap, (char*) field, len);
3811 dict_mem_foreign_table_name_lookup_set(foreign, TRUE);
3812
3813 const ulint foreign_table_name_len = len;
3814
3815 field = rec_get_nth_field_old(
3816 rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
3817 foreign->referenced_table_name = mem_heap_strdupl(
3818 foreign->heap, (char*) field, len);
3819 dict_mem_referenced_table_name_lookup_set(foreign, TRUE);
3820
3821 btr_pcur_close(&pcur);
3822 mtr_commit(&mtr);
3823
3824 dict_load_foreign_cols(foreign);
3825
3826 ref_table = dict_table_check_if_in_cache_low(
3827 foreign->referenced_table_name_lookup);
3828 for_table = dict_table_check_if_in_cache_low(
3829 foreign->foreign_table_name_lookup);
3830
3831 if (!for_table) {
3832 /* To avoid recursively loading the tables related through
3833 the foreign key constraints, the child table name is saved
3834 here. The child table will be loaded later, along with its
3835 foreign key constraint. */
3836
3837 lint old_size = mem_heap_get_size(ref_table->heap);
3838
3839 ut_a(ref_table != NULL);
3840 fk_tables.push_back(
3841 mem_heap_strdupl(ref_table->heap,
3842 foreign->foreign_table_name_lookup,
3843 foreign_table_name_len));
3844
3845 lint new_size = mem_heap_get_size(ref_table->heap);
3846 dict_sys->size += new_size - old_size;
3847
3848 dict_foreign_remove_from_cache(foreign);
3849 DBUG_RETURN(DB_SUCCESS);
3850 }
3851
3852 ut_a(for_table || ref_table);
3853
3854 /* Note that there may already be a foreign constraint object in
3855 the dictionary cache for this constraint: then the following
3856 call only sets the pointers in it to point to the appropriate table
3857 and index objects and frees the newly created object foreign.
3858 Adding to the cache should always succeed since we are not creating
3859 a new foreign key constraint but loading one from the data
3860 dictionary. */
3861
3862 DBUG_RETURN(dict_foreign_add_to_cache(foreign, col_names,
3863 check_charsets,
3864 ignore_err));
3865 }
3866
3867 /***********************************************************************//**
3868 Loads foreign key constraints where the table is either the foreign key
3869 holder or where the table is referenced by a foreign key. Adds these
3870 constraints to the data dictionary.
3871
3872 The foreign key constraint is loaded only if the referenced table is also
3873 in the dictionary cache. If the referenced table is not in dictionary
3874 cache, then it is added to the output parameter (fk_tables).
3875
3876 @return DB_SUCCESS or error code */
3877 dberr_t
dict_load_foreigns(const char * table_name,const char ** col_names,bool check_recursive,bool check_charsets,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)3878 dict_load_foreigns(
3879 /*===============*/
3880 const char* table_name, /*!< in: table name */
3881 const char** col_names, /*!< in: column names, or NULL
3882 to use table->col_names */
3883 bool check_recursive,/*!< in: Whether to check
3884 recursive load of tables
3885 chained by FK */
3886 bool check_charsets, /*!< in: whether to check
3887 charset compatibility */
3888 dict_err_ignore_t ignore_err, /*!< in: error to be ignored */
3889 dict_names_t& fk_tables)
3890 /*!< out: stack of table
3891 names which must be loaded
3892 subsequently to load all the
3893 foreign key constraints. */
3894 {
3895 ulint tuple_buf[(DTUPLE_EST_ALLOC(1) + sizeof(ulint) - 1)
3896 / sizeof(ulint)];
3897 btr_pcur_t pcur;
3898 dtuple_t* tuple;
3899 dfield_t* dfield;
3900 dict_index_t* sec_index;
3901 dict_table_t* sys_foreign;
3902 const rec_t* rec;
3903 const byte* field;
3904 ulint len;
3905 dberr_t err;
3906 mtr_t mtr;
3907
3908 DBUG_ENTER("dict_load_foreigns");
3909
3910 ut_ad(mutex_own(&dict_sys->mutex));
3911
3912 sys_foreign = dict_table_get_low("SYS_FOREIGN");
3913
3914 if (sys_foreign == NULL) {
3915 /* No foreign keys defined yet in this database */
3916
3917 ib::info() << "No foreign key system tables in the database";
3918 DBUG_RETURN(DB_ERROR);
3919 }
3920
3921 ut_ad(!dict_table_is_comp(sys_foreign));
3922 mtr_start(&mtr);
3923
3924 /* Get the secondary index based on FOR_NAME from table
3925 SYS_FOREIGN */
3926
3927 sec_index = dict_table_get_next_index(
3928 dict_table_get_first_index(sys_foreign));
3929 ut_ad(!dict_index_is_clust(sec_index));
3930 start_load:
3931
3932 tuple = dtuple_create_from_mem(tuple_buf, sizeof(tuple_buf), 1, 0);
3933 dfield = dtuple_get_nth_field(tuple, 0);
3934
3935 dfield_set_data(dfield, table_name, ut_strlen(table_name));
3936 dict_index_copy_types(tuple, sec_index, 1);
3937
3938 btr_pcur_open_on_user_rec(sec_index, tuple, PAGE_CUR_GE,
3939 BTR_SEARCH_LEAF, &pcur, &mtr);
3940 loop:
3941 rec = btr_pcur_get_rec(&pcur);
3942
3943 if (!btr_pcur_is_on_user_rec(&pcur)) {
3944 /* End of index */
3945
3946 goto load_next_index;
3947 }
3948
3949 /* Now we have the record in the secondary index containing a table
3950 name and a foreign constraint ID */
3951
3952 field = rec_get_nth_field_old(
3953 rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__NAME, &len);
3954
3955 /* Check if the table name in the record is the one searched for; the
3956 following call does the comparison in the latin1_swedish_ci
3957 charset-collation, in a case-insensitive way. */
3958
3959 if (0 != cmp_data_data(dfield_get_type(dfield)->mtype,
3960 dfield_get_type(dfield)->prtype,
3961 static_cast<const byte*>(
3962 dfield_get_data(dfield)),
3963 dfield_get_len(dfield),
3964 field, len)) {
3965
3966 goto load_next_index;
3967 }
3968
3969 /* Since table names in SYS_FOREIGN are stored in a case-insensitive
3970 order, we have to check that the table name matches also in a binary
3971 string comparison. On Unix, MySQL allows table names that only differ
3972 in character case. If lower_case_table_names=2 then what is stored
3973 may not be the same case, but the previous comparison showed that they
3974 match with no-case. */
3975
3976 if (rec_get_deleted_flag(rec, 0)) {
3977 goto next_rec;
3978 }
3979
3980 if ((innobase_get_lower_case_table_names() != 2)
3981 && (0 != ut_memcmp(field, table_name, len))) {
3982 goto next_rec;
3983 }
3984
3985 /* Now we get a foreign key constraint id */
3986 field = rec_get_nth_field_old(
3987 rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__ID, &len);
3988
3989 /* Copy the string because the page may be modified or evicted
3990 after mtr_commit() below. */
3991 char fk_id[MAX_TABLE_NAME_LEN + 1];
3992
3993 ut_a(len <= MAX_TABLE_NAME_LEN);
3994 memcpy(fk_id, field, len);
3995 fk_id[len] = '\0';
3996
3997 btr_pcur_store_position(&pcur, &mtr);
3998
3999 mtr_commit(&mtr);
4000
4001 /* Load the foreign constraint definition to the dictionary cache */
4002
4003 err = dict_load_foreign(fk_id, col_names,
4004 check_recursive, check_charsets, ignore_err,
4005 fk_tables);
4006
4007 if (err != DB_SUCCESS) {
4008 btr_pcur_close(&pcur);
4009
4010 DBUG_RETURN(err);
4011 }
4012
4013 mtr_start(&mtr);
4014
4015 btr_pcur_restore_position(BTR_SEARCH_LEAF, &pcur, &mtr);
4016 next_rec:
4017 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
4018
4019 goto loop;
4020
4021 load_next_index:
4022 btr_pcur_close(&pcur);
4023 mtr_commit(&mtr);
4024
4025 sec_index = dict_table_get_next_index(sec_index);
4026
4027 if (sec_index != NULL) {
4028
4029 mtr_start(&mtr);
4030
4031 /* Switch to scan index on REF_NAME, fk_max_recusive_level
4032 already been updated when scanning FOR_NAME index, no need to
4033 update again */
4034 check_recursive = FALSE;
4035
4036 goto start_load;
4037 }
4038
4039 DBUG_RETURN(DB_SUCCESS);
4040 }
4041
4042 /***********************************************************************//**
4043 Loads a table id based on the index id.
4044 @return true if found */
4045 static
4046 bool
dict_load_table_id_on_index_id(index_id_t index_id,table_id_t * table_id)4047 dict_load_table_id_on_index_id(
4048 /*===========================*/
4049 index_id_t index_id, /*!< in: index id */
4050 table_id_t* table_id) /*!< out: table id */
4051 {
4052 /* check hard coded indexes */
4053 switch(index_id) {
4054 case DICT_TABLES_ID:
4055 case DICT_COLUMNS_ID:
4056 case DICT_INDEXES_ID:
4057 case DICT_FIELDS_ID:
4058 *table_id = index_id;
4059 return true;
4060 case DICT_TABLE_IDS_ID:
4061 /* The following is a secondary index on SYS_TABLES */
4062 *table_id = DICT_TABLES_ID;
4063 return true;
4064 }
4065
4066 bool found = false;
4067 mtr_t mtr;
4068
4069 ut_ad(mutex_own(&(dict_sys->mutex)));
4070
4071 /* NOTE that the operation of this function is protected by
4072 the dictionary mutex, and therefore no deadlocks can occur
4073 with other dictionary operations. */
4074
4075 mtr_start(&mtr);
4076
4077 btr_pcur_t pcur;
4078 const rec_t* rec = dict_startscan_system(&pcur, &mtr, SYS_INDEXES);
4079
4080 while (rec) {
4081 ulint len;
4082 const byte* field = rec_get_nth_field_old(
4083 rec, DICT_FLD__SYS_INDEXES__ID, &len);
4084 ut_ad(len == 8);
4085
4086 /* Check if the index id is the one searched for */
4087 if (index_id == mach_read_from_8(field)) {
4088 found = true;
4089 /* Now we get the table id */
4090 const byte* field = rec_get_nth_field_old(
4091 rec,
4092 DICT_FLD__SYS_INDEXES__TABLE_ID,
4093 &len);
4094 *table_id = mach_read_from_8(field);
4095 break;
4096 }
4097 mtr_commit(&mtr);
4098 mtr_start(&mtr);
4099 rec = dict_getnext_system(&pcur, &mtr);
4100 }
4101
4102 btr_pcur_close(&pcur);
4103 mtr_commit(&mtr);
4104
4105 return(found);
4106 }
4107
4108 dict_table_t*
dict_table_open_on_index_id(index_id_t index_id,bool dict_locked)4109 dict_table_open_on_index_id(
4110 /*========================*/
4111 index_id_t index_id, /*!< in: index id */
4112 bool dict_locked) /*!< in: dict locked */
4113 {
4114 if (!dict_locked) {
4115 mutex_enter(&dict_sys->mutex);
4116 }
4117
4118 ut_ad(mutex_own(&dict_sys->mutex));
4119 table_id_t table_id;
4120 dict_table_t * table = NULL;
4121 if (dict_load_table_id_on_index_id(index_id, &table_id)) {
4122 bool local_dict_locked = true;
4123 table = dict_table_open_on_id(table_id,
4124 local_dict_locked,
4125 DICT_TABLE_OP_LOAD_TABLESPACE);
4126 }
4127
4128 if (!dict_locked) {
4129 mutex_exit(&dict_sys->mutex);
4130 }
4131 return table;
4132 }
4133
4134