1 /*****************************************************************************
2
3 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2016, 2021, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /**************************************************//**
21 @file dict/dict0load.cc
22 Loads to the memory cache database object definitions
23 from dictionary tables
24
25 Created 4/24/1996 Heikki Tuuri
26 *******************************************************/
27
28 #include "dict0load.h"
29
30 #include "mysql_version.h"
31 #include "btr0pcur.h"
32 #include "btr0btr.h"
33 #include "dict0boot.h"
34 #include "dict0crea.h"
35 #include "dict0dict.h"
36 #include "dict0mem.h"
37 #include "dict0priv.h"
38 #include "dict0stats.h"
39 #include "fsp0file.h"
40 #include "fts0priv.h"
41 #include "mach0data.h"
42 #include "page0page.h"
43 #include "rem0cmp.h"
44 #include "srv0start.h"
45 #include "srv0srv.h"
46 #include "fts0opt.h"
47
48 /** Following are the InnoDB system tables. The positions in
49 this array are referenced by enum dict_system_table_id. */
50 static const char* SYSTEM_TABLE_NAME[] = {
51 "SYS_TABLES",
52 "SYS_INDEXES",
53 "SYS_COLUMNS",
54 "SYS_FIELDS",
55 "SYS_FOREIGN",
56 "SYS_FOREIGN_COLS",
57 "SYS_TABLESPACES",
58 "SYS_DATAFILES",
59 "SYS_VIRTUAL"
60 };
61
62 /** Loads a table definition and also all its index definitions.
63
64 Loads those foreign key constraints whose referenced table is already in
65 dictionary cache. If a foreign key constraint is not loaded, then the
66 referenced table is pushed into the output stack (fk_tables), if it is not
67 NULL. These tables must be subsequently loaded so that all the foreign
68 key constraints are loaded into memory.
69
70 @param[in] name Table name in the db/tablename format
71 @param[in] ignore_err Error to be ignored when loading table
72 and its index definition
73 @param[out] fk_tables Related table names that must also be
74 loaded to ensure that all foreign key
75 constraints are loaded.
76 @return table, NULL if does not exist; if the table is stored in an
77 .ibd file, but the file does not exist, then we set the
78 file_unreadable flag in the table object we return */
79 static
80 dict_table_t*
81 dict_load_table_one(
82 const table_name_t& name,
83 dict_err_ignore_t ignore_err,
84 dict_names_t& fk_tables);
85
86 /** Load a table definition from a SYS_TABLES record to dict_table_t.
87 Do not load any columns or indexes.
88 @param[in] name Table name
89 @param[in] rec SYS_TABLES record
90 @param[out,own] table table, or NULL
91 @return error message
92 @retval NULL on success */
93 static const char* dict_load_table_low(const table_name_t& name,
94 const rec_t* rec, dict_table_t** table)
95 MY_ATTRIBUTE((nonnull, warn_unused_result));
96
97 /** Load an index definition from a SYS_INDEXES record to dict_index_t.
98 If allocate=TRUE, we will create a dict_index_t structure and fill it
99 accordingly. If allocated=FALSE, the dict_index_t will be supplied by
100 the caller and filled with information read from the record.
101 @return error message
102 @retval NULL on success */
103 static
104 const char*
105 dict_load_index_low(
106 byte* table_id, /*!< in/out: table id (8 bytes),
107 an "in" value if allocate=TRUE
108 and "out" when allocate=FALSE */
109 mem_heap_t* heap, /*!< in/out: temporary memory heap */
110 const rec_t* rec, /*!< in: SYS_INDEXES record */
111 ibool allocate, /*!< in: TRUE=allocate *index,
112 FALSE=fill in a pre-allocated
113 *index */
114 dict_index_t** index); /*!< out,own: index, or NULL */
115
116 /** Load a table column definition from a SYS_COLUMNS record to dict_table_t.
117 @return error message
118 @retval NULL on success */
119 static
120 const char*
121 dict_load_column_low(
122 dict_table_t* table, /*!< in/out: table, could be NULL
123 if we just populate a dict_column_t
124 struct with information from
125 a SYS_COLUMNS record */
126 mem_heap_t* heap, /*!< in/out: memory heap
127 for temporary storage */
128 dict_col_t* column, /*!< out: dict_column_t to fill,
129 or NULL if table != NULL */
130 table_id_t* table_id, /*!< out: table id */
131 const char** col_name, /*!< out: column name */
132 const rec_t* rec, /*!< in: SYS_COLUMNS record */
133 ulint* nth_v_col); /*!< out: if not NULL, this
134 records the "n" of "nth" virtual
135 column */
136
137 /** Load a virtual column "mapping" (to base columns) information
138 from a SYS_VIRTUAL record
139 @param[in,out] table table
140 @param[in,out] column mapped base column's dict_column_t
141 @param[in,out] table_id table id
142 @param[in,out] pos virtual column position
143 @param[in,out] base_pos base column position
144 @param[in] rec SYS_VIRTUAL record
145 @return error message
146 @retval NULL on success */
147 static
148 const char*
149 dict_load_virtual_low(
150 dict_table_t* table,
151 dict_col_t** column,
152 table_id_t* table_id,
153 ulint* pos,
154 ulint* base_pos,
155 const rec_t* rec);
156
157 /** Load an index field definition from a SYS_FIELDS record to dict_index_t.
158 @return error message
159 @retval NULL on success */
160 static
161 const char*
162 dict_load_field_low(
163 byte* index_id, /*!< in/out: index id (8 bytes)
164 an "in" value if index != NULL
165 and "out" if index == NULL */
166 dict_index_t* index, /*!< in/out: index, could be NULL
167 if we just populate a dict_field_t
168 struct with information from
169 a SYS_FIELDS record */
170 dict_field_t* sys_field, /*!< out: dict_field_t to be
171 filled */
172 ulint* pos, /*!< out: Field position */
173 byte* last_index_id, /*!< in: last index id */
174 mem_heap_t* heap, /*!< in/out: memory heap
175 for temporary storage */
176 const rec_t* rec); /*!< in: SYS_FIELDS record */
177
178 /* If this flag is TRUE, then we will load the cluster index's (and tables')
179 metadata even if it is marked as "corrupted". */
180 my_bool srv_load_corrupted;
181
182 #ifdef UNIV_DEBUG
183 /****************************************************************//**
184 Compare the name of an index column.
185 @return TRUE if the i'th column of index is 'name'. */
186 static
187 ibool
name_of_col_is(const dict_table_t * table,const dict_index_t * index,ulint i,const char * name)188 name_of_col_is(
189 /*===========*/
190 const dict_table_t* table, /*!< in: table */
191 const dict_index_t* index, /*!< in: index */
192 ulint i, /*!< in: index field offset */
193 const char* name) /*!< in: name to compare to */
194 {
195 ulint tmp = dict_col_get_no(dict_field_get_col(
196 dict_index_get_nth_field(
197 index, i)));
198
199 return(strcmp(name, dict_table_get_col_name(table, tmp)) == 0);
200 }
201 #endif /* UNIV_DEBUG */
202
203 /********************************************************************//**
204 Finds the first table name in the given database.
205 @return own: table name, NULL if does not exist; the caller must free
206 the memory in the string! */
207 char*
dict_get_first_table_name_in_db(const char * name)208 dict_get_first_table_name_in_db(
209 /*============================*/
210 const char* name) /*!< in: database name which ends in '/' */
211 {
212 dict_table_t* sys_tables;
213 btr_pcur_t pcur;
214 dict_index_t* sys_index;
215 dtuple_t* tuple;
216 mem_heap_t* heap;
217 dfield_t* dfield;
218 const rec_t* rec;
219 const byte* field;
220 ulint len;
221 mtr_t mtr;
222
223 ut_ad(mutex_own(&dict_sys.mutex));
224
225 heap = mem_heap_create(1000);
226
227 mtr_start(&mtr);
228
229 sys_tables = dict_table_get_low("SYS_TABLES");
230 sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
231 ut_ad(!dict_table_is_comp(sys_tables));
232
233 tuple = dtuple_create(heap, 1);
234 dfield = dtuple_get_nth_field(tuple, 0);
235
236 dfield_set_data(dfield, name, strlen(name));
237 dict_index_copy_types(tuple, sys_index, 1);
238
239 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
240 BTR_SEARCH_LEAF, &pcur, &mtr);
241 loop:
242 rec = btr_pcur_get_rec(&pcur);
243
244 if (!btr_pcur_is_on_user_rec(&pcur)) {
245 /* Not found */
246
247 btr_pcur_close(&pcur);
248 mtr_commit(&mtr);
249 mem_heap_free(heap);
250
251 return(NULL);
252 }
253
254 field = rec_get_nth_field_old(
255 rec, DICT_FLD__SYS_TABLES__NAME, &len);
256
257 if (len < strlen(name)
258 || memcmp(name, field, strlen(name))) {
259 /* Not found */
260
261 btr_pcur_close(&pcur);
262 mtr_commit(&mtr);
263 mem_heap_free(heap);
264
265 return(NULL);
266 }
267
268 if (!rec_get_deleted_flag(rec, 0)) {
269
270 /* We found one */
271
272 char* table_name = mem_strdupl((char*) field, len);
273
274 btr_pcur_close(&pcur);
275 mtr_commit(&mtr);
276 mem_heap_free(heap);
277
278 return(table_name);
279 }
280
281 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
282
283 goto loop;
284 }
285
286 /********************************************************************//**
287 This function gets the next system table record as it scans the table.
288 @return the next record if found, NULL if end of scan */
289 static
290 const rec_t*
dict_getnext_system_low(btr_pcur_t * pcur,mtr_t * mtr)291 dict_getnext_system_low(
292 /*====================*/
293 btr_pcur_t* pcur, /*!< in/out: persistent cursor to the
294 record*/
295 mtr_t* mtr) /*!< in: the mini-transaction */
296 {
297 rec_t* rec = NULL;
298
299 while (!rec || rec_get_deleted_flag(rec, 0)) {
300 btr_pcur_move_to_next_user_rec(pcur, mtr);
301
302 rec = btr_pcur_get_rec(pcur);
303
304 if (!btr_pcur_is_on_user_rec(pcur)) {
305 /* end of index */
306 btr_pcur_close(pcur);
307
308 return(NULL);
309 }
310 }
311
312 /* Get a record, let's save the position */
313 btr_pcur_store_position(pcur, mtr);
314
315 return(rec);
316 }
317
318 /********************************************************************//**
319 This function opens a system table, and returns the first record.
320 @return first record of the system table */
321 const rec_t*
dict_startscan_system(btr_pcur_t * pcur,mtr_t * mtr,dict_system_id_t system_id)322 dict_startscan_system(
323 /*==================*/
324 btr_pcur_t* pcur, /*!< out: persistent cursor to
325 the record */
326 mtr_t* mtr, /*!< in: the mini-transaction */
327 dict_system_id_t system_id) /*!< in: which system table to open */
328 {
329 dict_table_t* system_table;
330 dict_index_t* clust_index;
331 const rec_t* rec;
332
333 ut_a(system_id < SYS_NUM_SYSTEM_TABLES);
334
335 system_table = dict_table_get_low(SYSTEM_TABLE_NAME[system_id]);
336
337 clust_index = UT_LIST_GET_FIRST(system_table->indexes);
338
339 btr_pcur_open_at_index_side(true, clust_index, BTR_SEARCH_LEAF, pcur,
340 true, 0, mtr);
341
342 rec = dict_getnext_system_low(pcur, mtr);
343
344 return(rec);
345 }
346
347 /********************************************************************//**
348 This function gets the next system table record as it scans the table.
349 @return the next record if found, NULL if end of scan */
350 const rec_t*
dict_getnext_system(btr_pcur_t * pcur,mtr_t * mtr)351 dict_getnext_system(
352 /*================*/
353 btr_pcur_t* pcur, /*!< in/out: persistent cursor
354 to the record */
355 mtr_t* mtr) /*!< in: the mini-transaction */
356 {
357 const rec_t* rec;
358
359 /* Restore the position */
360 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
361
362 /* Get the next record */
363 rec = dict_getnext_system_low(pcur, mtr);
364
365 return(rec);
366 }
367
368 /********************************************************************//**
369 This function processes one SYS_TABLES record and populate the dict_table_t
370 struct for the table.
371 @return error message, or NULL on success */
372 const char*
dict_process_sys_tables_rec_and_mtr_commit(mem_heap_t * heap,const rec_t * rec,dict_table_t ** table,bool cached,mtr_t * mtr)373 dict_process_sys_tables_rec_and_mtr_commit(
374 /*=======================================*/
375 mem_heap_t* heap, /*!< in/out: temporary memory heap */
376 const rec_t* rec, /*!< in: SYS_TABLES record */
377 dict_table_t** table, /*!< out: dict_table_t to fill */
378 bool cached, /*!< in: whether to load from cache */
379 mtr_t* mtr) /*!< in/out: mini-transaction,
380 will be committed */
381 {
382 ulint len;
383 const char* field;
384
385 field = (const char*) rec_get_nth_field_old(
386 rec, DICT_FLD__SYS_TABLES__NAME, &len);
387
388 ut_a(!rec_get_deleted_flag(rec, 0));
389
390 ut_ad(mtr->memo_contains_page_flagged(rec, MTR_MEMO_PAGE_S_FIX));
391
392 /* Get the table name */
393 table_name_t table_name(mem_heap_strdupl(heap, field, len));
394
395 if (cached) {
396 /* Commit before load the table again */
397 mtr_commit(mtr);
398
399 *table = dict_table_get_low(table_name.m_name);
400 return *table ? NULL : "Table not found in cache";
401 } else {
402 const char* err = dict_load_table_low(table_name, rec, table);
403 mtr_commit(mtr);
404 return err;
405 }
406 }
407
408 /********************************************************************//**
409 This function parses a SYS_INDEXES record and populate a dict_index_t
410 structure with the information from the record. For detail information
411 about SYS_INDEXES fields, please refer to dict_boot() function.
412 @return error message, or NULL on success */
413 const char*
dict_process_sys_indexes_rec(mem_heap_t * heap,const rec_t * rec,dict_index_t * index,table_id_t * table_id)414 dict_process_sys_indexes_rec(
415 /*=========================*/
416 mem_heap_t* heap, /*!< in/out: heap memory */
417 const rec_t* rec, /*!< in: current SYS_INDEXES rec */
418 dict_index_t* index, /*!< out: index to be filled */
419 table_id_t* table_id) /*!< out: index table id */
420 {
421 const char* err_msg;
422 byte* buf;
423
424 ut_d(index->is_dummy = true);
425 ut_d(index->in_instant_init = false);
426 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
427
428 /* Parse the record, and get "dict_index_t" struct filled */
429 err_msg = dict_load_index_low(buf, heap, rec, FALSE, &index);
430
431 *table_id = mach_read_from_8(buf);
432
433 return(err_msg);
434 }
435
436 /********************************************************************//**
437 This function parses a SYS_COLUMNS record and populate a dict_column_t
438 structure with the information from the record.
439 @return error message, or NULL on success */
440 const char*
dict_process_sys_columns_rec(mem_heap_t * heap,const rec_t * rec,dict_col_t * column,table_id_t * table_id,const char ** col_name,ulint * nth_v_col)441 dict_process_sys_columns_rec(
442 /*=========================*/
443 mem_heap_t* heap, /*!< in/out: heap memory */
444 const rec_t* rec, /*!< in: current SYS_COLUMNS rec */
445 dict_col_t* column, /*!< out: dict_col_t to be filled */
446 table_id_t* table_id, /*!< out: table id */
447 const char** col_name, /*!< out: column name */
448 ulint* nth_v_col) /*!< out: if virtual col, this is
449 record's sequence number */
450 {
451 const char* err_msg;
452
453 /* Parse the record, and get "dict_col_t" struct filled */
454 err_msg = dict_load_column_low(NULL, heap, column,
455 table_id, col_name, rec, nth_v_col);
456
457 return(err_msg);
458 }
459
460 /** This function parses a SYS_VIRTUAL record and extracts virtual column
461 information
462 @param[in] rec current SYS_COLUMNS rec
463 @param[in,out] table_id table id
464 @param[in,out] pos virtual column position
465 @param[in,out] base_pos base column position
466 @return error message, or NULL on success */
467 const char*
dict_process_sys_virtual_rec(const rec_t * rec,table_id_t * table_id,ulint * pos,ulint * base_pos)468 dict_process_sys_virtual_rec(
469 const rec_t* rec,
470 table_id_t* table_id,
471 ulint* pos,
472 ulint* base_pos)
473 {
474 const char* err_msg;
475
476 /* Parse the record, and get "dict_col_t" struct filled */
477 err_msg = dict_load_virtual_low(NULL, NULL, table_id,
478 pos, base_pos, rec);
479
480 return(err_msg);
481 }
482
483 /********************************************************************//**
484 This function parses a SYS_FIELDS record and populates a dict_field_t
485 structure with the information from the record.
486 @return error message, or NULL on success */
487 const char*
dict_process_sys_fields_rec(mem_heap_t * heap,const rec_t * rec,dict_field_t * sys_field,ulint * pos,index_id_t * index_id,index_id_t last_id)488 dict_process_sys_fields_rec(
489 /*========================*/
490 mem_heap_t* heap, /*!< in/out: heap memory */
491 const rec_t* rec, /*!< in: current SYS_FIELDS rec */
492 dict_field_t* sys_field, /*!< out: dict_field_t to be
493 filled */
494 ulint* pos, /*!< out: Field position */
495 index_id_t* index_id, /*!< out: current index id */
496 index_id_t last_id) /*!< in: previous index id */
497 {
498 byte* buf;
499 byte* last_index_id;
500 const char* err_msg;
501
502 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
503
504 last_index_id = static_cast<byte*>(mem_heap_alloc(heap, 8));
505 mach_write_to_8(last_index_id, last_id);
506
507 err_msg = dict_load_field_low(buf, NULL, sys_field,
508 pos, last_index_id, heap, rec);
509
510 *index_id = mach_read_from_8(buf);
511
512 return(err_msg);
513
514 }
515
516 /********************************************************************//**
517 This function parses a SYS_FOREIGN record and populate a dict_foreign_t
518 structure with the information from the record. For detail information
519 about SYS_FOREIGN fields, please refer to dict_load_foreign() function.
520 @return error message, or NULL on success */
521 const char*
dict_process_sys_foreign_rec(mem_heap_t * heap,const rec_t * rec,dict_foreign_t * foreign)522 dict_process_sys_foreign_rec(
523 /*=========================*/
524 mem_heap_t* heap, /*!< in/out: heap memory */
525 const rec_t* rec, /*!< in: current SYS_FOREIGN rec */
526 dict_foreign_t* foreign) /*!< out: dict_foreign_t struct
527 to be filled */
528 {
529 ulint len;
530 const byte* field;
531
532 if (rec_get_deleted_flag(rec, 0)) {
533 return("delete-marked record in SYS_FOREIGN");
534 }
535
536 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN) {
537 return("wrong number of columns in SYS_FOREIGN record");
538 }
539
540 field = rec_get_nth_field_old(
541 rec, DICT_FLD__SYS_FOREIGN__ID, &len);
542 if (len == 0 || len == UNIV_SQL_NULL) {
543 err_len:
544 return("incorrect column length in SYS_FOREIGN");
545 }
546
547 /* This receives a dict_foreign_t* that points to a stack variable.
548 So dict_foreign_free(foreign) is not used as elsewhere.
549 Since the heap used here is freed elsewhere, foreign->heap
550 is not assigned. */
551 foreign->id = mem_heap_strdupl(heap, (const char*) field, len);
552
553 rec_get_nth_field_offs_old(
554 rec, DICT_FLD__SYS_FOREIGN__DB_TRX_ID, &len);
555 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
556 goto err_len;
557 }
558 rec_get_nth_field_offs_old(
559 rec, DICT_FLD__SYS_FOREIGN__DB_ROLL_PTR, &len);
560 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
561 goto err_len;
562 }
563
564 /* The _lookup versions of the referenced and foreign table names
565 are not assigned since they are not used in this dict_foreign_t */
566
567 field = rec_get_nth_field_old(
568 rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
569 if (len == 0 || len == UNIV_SQL_NULL) {
570 goto err_len;
571 }
572 foreign->foreign_table_name = mem_heap_strdupl(
573 heap, (const char*) field, len);
574
575 field = rec_get_nth_field_old(
576 rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
577 if (len == 0 || len == UNIV_SQL_NULL) {
578 goto err_len;
579 }
580 foreign->referenced_table_name = mem_heap_strdupl(
581 heap, (const char*) field, len);
582
583 field = rec_get_nth_field_old(
584 rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len);
585 if (len != 4) {
586 goto err_len;
587 }
588 uint32_t n_fields_and_type = mach_read_from_4(field);
589
590 foreign->type = n_fields_and_type >> 24 & ((1U << 6) - 1);
591 foreign->n_fields = n_fields_and_type & dict_index_t::MAX_N_FIELDS;
592
593 return(NULL);
594 }
595
596 /********************************************************************//**
597 This function parses a SYS_FOREIGN_COLS record and extract necessary
598 information from the record and return to caller.
599 @return error message, or NULL on success */
600 const char*
dict_process_sys_foreign_col_rec(mem_heap_t * heap,const rec_t * rec,const char ** name,const char ** for_col_name,const char ** ref_col_name,ulint * pos)601 dict_process_sys_foreign_col_rec(
602 /*=============================*/
603 mem_heap_t* heap, /*!< in/out: heap memory */
604 const rec_t* rec, /*!< in: current SYS_FOREIGN_COLS rec */
605 const char** name, /*!< out: foreign key constraint name */
606 const char** for_col_name, /*!< out: referencing column name */
607 const char** ref_col_name, /*!< out: referenced column name
608 in referenced table */
609 ulint* pos) /*!< out: column position */
610 {
611 ulint len;
612 const byte* field;
613
614 if (rec_get_deleted_flag(rec, 0)) {
615 return("delete-marked record in SYS_FOREIGN_COLS");
616 }
617
618 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN_COLS) {
619 return("wrong number of columns in SYS_FOREIGN_COLS record");
620 }
621
622 field = rec_get_nth_field_old(
623 rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
624 if (len == 0 || len == UNIV_SQL_NULL) {
625 err_len:
626 return("incorrect column length in SYS_FOREIGN_COLS");
627 }
628 *name = mem_heap_strdupl(heap, (char*) field, len);
629
630 field = rec_get_nth_field_old(
631 rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
632 if (len != 4) {
633 goto err_len;
634 }
635 *pos = mach_read_from_4(field);
636
637 rec_get_nth_field_offs_old(
638 rec, DICT_FLD__SYS_FOREIGN_COLS__DB_TRX_ID, &len);
639 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
640 goto err_len;
641 }
642 rec_get_nth_field_offs_old(
643 rec, DICT_FLD__SYS_FOREIGN_COLS__DB_ROLL_PTR, &len);
644 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
645 goto err_len;
646 }
647
648 field = rec_get_nth_field_old(
649 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
650 if (len == 0 || len == UNIV_SQL_NULL) {
651 goto err_len;
652 }
653 *for_col_name = mem_heap_strdupl(heap, (char*) field, len);
654
655 field = rec_get_nth_field_old(
656 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
657 if (len == 0 || len == UNIV_SQL_NULL) {
658 goto err_len;
659 }
660 *ref_col_name = mem_heap_strdupl(heap, (char*) field, len);
661
662 return(NULL);
663 }
664
665 /********************************************************************//**
666 This function parses a SYS_TABLESPACES record, extracts necessary
667 information from the record and returns to caller.
668 @return error message, or NULL on success */
669 const char*
dict_process_sys_tablespaces(mem_heap_t * heap,const rec_t * rec,uint32_t * space,const char ** name,ulint * flags)670 dict_process_sys_tablespaces(
671 /*=========================*/
672 mem_heap_t* heap, /*!< in/out: heap memory */
673 const rec_t* rec, /*!< in: current SYS_TABLESPACES rec */
674 uint32_t* space, /*!< out: tablespace identifier */
675 const char** name, /*!< out: tablespace name */
676 ulint* flags) /*!< out: tablespace flags */
677 {
678 ulint len;
679 const byte* field;
680
681 if (rec_get_deleted_flag(rec, 0)) {
682 return("delete-marked record in SYS_TABLESPACES");
683 }
684
685 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLESPACES) {
686 return("wrong number of columns in SYS_TABLESPACES record");
687 }
688
689 field = rec_get_nth_field_old(
690 rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
691 if (len != DICT_FLD_LEN_SPACE) {
692 err_len:
693 return("incorrect column length in SYS_TABLESPACES");
694 }
695 *space = mach_read_from_4(field);
696
697 rec_get_nth_field_offs_old(
698 rec, DICT_FLD__SYS_TABLESPACES__DB_TRX_ID, &len);
699 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
700 goto err_len;
701 }
702
703 rec_get_nth_field_offs_old(
704 rec, DICT_FLD__SYS_TABLESPACES__DB_ROLL_PTR, &len);
705 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
706 goto err_len;
707 }
708
709 field = rec_get_nth_field_old(
710 rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
711 if (len == 0 || len == UNIV_SQL_NULL) {
712 goto err_len;
713 }
714 *name = mem_heap_strdupl(heap, (char*) field, len);
715
716 field = rec_get_nth_field_old(
717 rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
718 if (len != DICT_FLD_LEN_FLAGS) {
719 goto err_len;
720 }
721 *flags = mach_read_from_4(field);
722
723 return(NULL);
724 }
725
726 /********************************************************************//**
727 This function parses a SYS_DATAFILES record, extracts necessary
728 information from the record and returns it to the caller.
729 @return error message, or NULL on success */
730 const char*
dict_process_sys_datafiles(mem_heap_t * heap,const rec_t * rec,uint32_t * space,const char ** path)731 dict_process_sys_datafiles(
732 /*=======================*/
733 mem_heap_t* heap, /*!< in/out: heap memory */
734 const rec_t* rec, /*!< in: current SYS_DATAFILES rec */
735 uint32_t* space, /*!< out: space id */
736 const char** path) /*!< out: datafile paths */
737 {
738 ulint len;
739 const byte* field;
740
741 if (rec_get_deleted_flag(rec, 0)) {
742 return("delete-marked record in SYS_DATAFILES");
743 }
744
745 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_DATAFILES) {
746 return("wrong number of columns in SYS_DATAFILES record");
747 }
748
749 field = rec_get_nth_field_old(
750 rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
751 if (len != DICT_FLD_LEN_SPACE) {
752 err_len:
753 return("incorrect column length in SYS_DATAFILES");
754 }
755 *space = mach_read_from_4(field);
756
757 rec_get_nth_field_offs_old(
758 rec, DICT_FLD__SYS_DATAFILES__DB_TRX_ID, &len);
759 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
760 goto err_len;
761 }
762
763 rec_get_nth_field_offs_old(
764 rec, DICT_FLD__SYS_DATAFILES__DB_ROLL_PTR, &len);
765 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
766 goto err_len;
767 }
768
769 field = rec_get_nth_field_old(
770 rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
771 if (len == 0 || len == UNIV_SQL_NULL) {
772 goto err_len;
773 }
774 *path = mem_heap_strdupl(heap, (char*) field, len);
775
776 return(NULL);
777 }
778
779 /** Get the first filepath from SYS_DATAFILES for a given space_id.
780 @param[in] space_id Tablespace ID
781 @return First filepath (caller must invoke ut_free() on it)
782 @retval NULL if no SYS_DATAFILES entry was found. */
783 static char*
dict_get_first_path(ulint space_id)784 dict_get_first_path(
785 ulint space_id)
786 {
787 mtr_t mtr;
788 dict_table_t* sys_datafiles;
789 dict_index_t* sys_index;
790 dtuple_t* tuple;
791 dfield_t* dfield;
792 byte* buf;
793 btr_pcur_t pcur;
794 const rec_t* rec;
795 const byte* field;
796 ulint len;
797 char* filepath = NULL;
798 mem_heap_t* heap = mem_heap_create(1024);
799
800 ut_ad(mutex_own(&dict_sys.mutex));
801
802 mtr_start(&mtr);
803
804 sys_datafiles = dict_table_get_low("SYS_DATAFILES");
805 sys_index = UT_LIST_GET_FIRST(sys_datafiles->indexes);
806
807 ut_ad(!dict_table_is_comp(sys_datafiles));
808 ut_ad(name_of_col_is(sys_datafiles, sys_index,
809 DICT_FLD__SYS_DATAFILES__SPACE, "SPACE"));
810 ut_ad(name_of_col_is(sys_datafiles, sys_index,
811 DICT_FLD__SYS_DATAFILES__PATH, "PATH"));
812
813 tuple = dtuple_create(heap, 1);
814 dfield = dtuple_get_nth_field(tuple, DICT_FLD__SYS_DATAFILES__SPACE);
815
816 buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
817 mach_write_to_4(buf, space_id);
818
819 dfield_set_data(dfield, buf, 4);
820 dict_index_copy_types(tuple, sys_index, 1);
821
822 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
823 BTR_SEARCH_LEAF, &pcur, &mtr);
824
825 rec = btr_pcur_get_rec(&pcur);
826
827 /* Get the filepath from this SYS_DATAFILES record. */
828 if (btr_pcur_is_on_user_rec(&pcur)) {
829 field = rec_get_nth_field_old(
830 rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
831 ut_a(len == 4);
832
833 if (space_id == mach_read_from_4(field)) {
834 /* A record for this space ID was found. */
835 field = rec_get_nth_field_old(
836 rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
837
838 ut_ad(len > 0);
839 ut_ad(len < OS_FILE_MAX_PATH);
840
841 if (len > 0 && len < UNIV_SQL_NULL) {
842 filepath = mem_strdupl(
843 reinterpret_cast<const char*>(field),
844 len);
845 ut_ad(filepath != NULL);
846
847 /* The dictionary may have been written on
848 another OS. */
849 os_normalize_path(filepath);
850 }
851 }
852 }
853
854 btr_pcur_close(&pcur);
855 mtr_commit(&mtr);
856 mem_heap_free(heap);
857
858 return(filepath);
859 }
860
861 /** Update the record for space_id in SYS_TABLESPACES to this filepath.
862 @param[in] space_id Tablespace ID
863 @param[in] filepath Tablespace filepath
864 @return DB_SUCCESS if OK, dberr_t if the insert failed */
865 dberr_t
dict_update_filepath(ulint space_id,const char * filepath)866 dict_update_filepath(
867 ulint space_id,
868 const char* filepath)
869 {
870 if (!srv_sys_tablespaces_open) {
871 /* Startup procedure is not yet ready for updates. */
872 return(DB_SUCCESS);
873 }
874
875 dberr_t err = DB_SUCCESS;
876 trx_t* trx;
877
878 ut_d(dict_sys.assert_locked());
879
880 trx = trx_create();
881 trx->op_info = "update filepath";
882 trx->dict_operation_lock_mode = RW_X_LATCH;
883 trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
884
885 pars_info_t* info = pars_info_create();
886
887 pars_info_add_int4_literal(info, "space", space_id);
888 pars_info_add_str_literal(info, "path", filepath);
889
890 err = que_eval_sql(info,
891 "PROCEDURE UPDATE_FILEPATH () IS\n"
892 "BEGIN\n"
893 "UPDATE SYS_DATAFILES"
894 " SET PATH = :path\n"
895 " WHERE SPACE = :space;\n"
896 "END;\n", FALSE, trx);
897
898 trx_commit_for_mysql(trx);
899 trx->dict_operation_lock_mode = 0;
900 trx->free();
901
902 if (UNIV_LIKELY(err == DB_SUCCESS)) {
903 /* We just updated SYS_DATAFILES due to the contents in
904 a link file. Make a note that we did this. */
905 ib::info() << "The InnoDB data dictionary table SYS_DATAFILES"
906 " for tablespace ID " << space_id
907 << " was updated to use file " << filepath << ".";
908 } else {
909 ib::warn() << "Error occurred while updating InnoDB data"
910 " dictionary table SYS_DATAFILES for tablespace ID "
911 << space_id << " to file " << filepath << ": "
912 << err << ".";
913 }
914
915 return(err);
916 }
917
918 /** Replace records in SYS_TABLESPACES and SYS_DATAFILES associated with
919 the given space_id using an independent transaction.
920 @param[in] space_id Tablespace ID
921 @param[in] name Tablespace name
922 @param[in] filepath First filepath
923 @param[in] fsp_flags Tablespace flags
924 @return DB_SUCCESS if OK, dberr_t if the insert failed */
925 dberr_t
dict_replace_tablespace_and_filepath(ulint space_id,const char * name,const char * filepath,ulint fsp_flags)926 dict_replace_tablespace_and_filepath(
927 ulint space_id,
928 const char* name,
929 const char* filepath,
930 ulint fsp_flags)
931 {
932 if (!srv_sys_tablespaces_open) {
933 /* Startup procedure is not yet ready for updates.
934 Return success since this will likely get updated
935 later. */
936 return(DB_SUCCESS);
937 }
938
939 dberr_t err = DB_SUCCESS;
940 trx_t* trx;
941
942 DBUG_EXECUTE_IF("innodb_fail_to_update_tablespace_dict",
943 return(DB_INTERRUPTED););
944
945 ut_d(dict_sys.assert_locked());
946 ut_ad(filepath);
947
948 trx = trx_create();
949 trx->op_info = "insert tablespace and filepath";
950 trx->dict_operation_lock_mode = RW_X_LATCH;
951 trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
952
953 /* A record for this space ID was not found in
954 SYS_DATAFILES. Assume the record is also missing in
955 SYS_TABLESPACES. Insert records into them both. */
956 err = dict_replace_tablespace_in_dictionary(
957 space_id, name, fsp_flags, filepath, trx);
958
959 trx_commit_for_mysql(trx);
960 trx->dict_operation_lock_mode = 0;
961 trx->free();
962
963 return(err);
964 }
965
966 /** Check the validity of a SYS_TABLES record
967 Make sure the fields are the right length and that they
968 do not contain invalid contents.
969 @param[in] rec SYS_TABLES record
970 @return error message, or NULL on success */
971 static
972 const char*
dict_sys_tables_rec_check(const rec_t * rec)973 dict_sys_tables_rec_check(
974 const rec_t* rec)
975 {
976 const byte* field;
977 ulint len;
978
979 ut_ad(mutex_own(&dict_sys.mutex));
980
981 if (rec_get_deleted_flag(rec, 0)) {
982 return("delete-marked record in SYS_TABLES");
983 }
984
985 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLES) {
986 return("wrong number of columns in SYS_TABLES record");
987 }
988
989 rec_get_nth_field_offs_old(
990 rec, DICT_FLD__SYS_TABLES__NAME, &len);
991 if (len == 0 || len == UNIV_SQL_NULL) {
992 err_len:
993 return("incorrect column length in SYS_TABLES");
994 }
995 rec_get_nth_field_offs_old(
996 rec, DICT_FLD__SYS_TABLES__DB_TRX_ID, &len);
997 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
998 goto err_len;
999 }
1000 rec_get_nth_field_offs_old(
1001 rec, DICT_FLD__SYS_TABLES__DB_ROLL_PTR, &len);
1002 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1003 goto err_len;
1004 }
1005
1006 rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__ID, &len);
1007 if (len != 8) {
1008 goto err_len;
1009 }
1010
1011 field = rec_get_nth_field_old(
1012 rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1013 if (field == NULL || len != 4) {
1014 goto err_len;
1015 }
1016
1017 rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1018 if (len != 4) {
1019 goto err_len;
1020 }
1021
1022 rec_get_nth_field_offs_old(
1023 rec, DICT_FLD__SYS_TABLES__MIX_ID, &len);
1024 if (len != 8) {
1025 goto err_len;
1026 }
1027
1028 field = rec_get_nth_field_old(
1029 rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1030 if (field == NULL || len != 4) {
1031 goto err_len;
1032 }
1033
1034 rec_get_nth_field_offs_old(
1035 rec, DICT_FLD__SYS_TABLES__CLUSTER_ID, &len);
1036 if (len != UNIV_SQL_NULL) {
1037 goto err_len;
1038 }
1039
1040 field = rec_get_nth_field_old(
1041 rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1042 if (field == NULL || len != 4) {
1043 goto err_len;
1044 }
1045
1046 return(NULL);
1047 }
1048
1049 /** Read and return the contents of a SYS_TABLESPACES record.
1050 @param[in] rec A record of SYS_TABLESPACES
1051 @param[out] id Pointer to the space_id for this table
1052 @param[in,out] name Buffer for Tablespace Name of length NAME_LEN
1053 @param[out] flags Pointer to tablespace flags
1054 @return true if the record was read correctly, false if not. */
1055 bool
dict_sys_tablespaces_rec_read(const rec_t * rec,ulint * id,char * name,ulint * flags)1056 dict_sys_tablespaces_rec_read(
1057 const rec_t* rec,
1058 ulint* id,
1059 char* name,
1060 ulint* flags)
1061 {
1062 const byte* field;
1063 ulint len;
1064
1065 field = rec_get_nth_field_old(
1066 rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
1067 if (len != DICT_FLD_LEN_SPACE) {
1068 ib::error() << "Wrong field length in SYS_TABLESPACES.SPACE: "
1069 << len;
1070 return(false);
1071 }
1072 *id = mach_read_from_4(field);
1073
1074 field = rec_get_nth_field_old(
1075 rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
1076 if (len == 0 || len == UNIV_SQL_NULL) {
1077 ib::error() << "Wrong field length in SYS_TABLESPACES.NAME: "
1078 << len;
1079 return(false);
1080 }
1081 strncpy(name, reinterpret_cast<const char*>(field), NAME_LEN);
1082
1083 /* read the 4 byte flags from the TYPE field */
1084 field = rec_get_nth_field_old(
1085 rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
1086 if (len != 4) {
1087 ib::error() << "Wrong field length in SYS_TABLESPACES.FLAGS: "
1088 << len;
1089 return(false);
1090 }
1091 *flags = mach_read_from_4(field);
1092
1093 return(true);
1094 }
1095
1096 /** Check if SYS_TABLES.TYPE is valid
1097 @param[in] type SYS_TABLES.TYPE
1098 @param[in] not_redundant whether ROW_FORMAT=REDUNDANT is not used
1099 @return whether the SYS_TABLES.TYPE value is valid */
1100 static
1101 bool
dict_sys_tables_type_valid(ulint type,bool not_redundant)1102 dict_sys_tables_type_valid(ulint type, bool not_redundant)
1103 {
1104 /* The DATA_DIRECTORY flag can be assigned fully independently
1105 of all other persistent table flags. */
1106 type &= ~DICT_TF_MASK_DATA_DIR;
1107
1108 if (type == 1) {
1109 return(true); /* ROW_FORMAT=REDUNDANT or ROW_FORMAT=COMPACT */
1110 }
1111
1112 if (!(type & 1)) {
1113 /* For ROW_FORMAT=REDUNDANT and ROW_FORMAT=COMPACT,
1114 SYS_TABLES.TYPE=1. Else, it is the same as
1115 dict_table_t::flags, and the least significant bit
1116 would be set. So, the bit never can be 0. */
1117 return(false);
1118 }
1119
1120 if (!not_redundant) {
1121 /* SYS_TABLES.TYPE must be 1 or 1|DICT_TF_MASK_NO_ROLLBACK
1122 for ROW_FORMAT=REDUNDANT. */
1123 return !(type & ~(1U | DICT_TF_MASK_NO_ROLLBACK));
1124 }
1125
1126 if (type >= 1U << DICT_TF_POS_UNUSED) {
1127 /* Some unknown bits are set. */
1128 return(false);
1129 }
1130
1131 return(dict_tf_is_valid_not_redundant(type));
1132 }
1133
1134 /** Convert SYS_TABLES.TYPE to dict_table_t::flags.
1135 @param[in] type SYS_TABLES.TYPE
1136 @param[in] not_redundant whether ROW_FORMAT=REDUNDANT is not used
1137 @return table flags */
1138 static
1139 ulint
dict_sys_tables_type_to_tf(ulint type,bool not_redundant)1140 dict_sys_tables_type_to_tf(ulint type, bool not_redundant)
1141 {
1142 ut_ad(dict_sys_tables_type_valid(type, not_redundant));
1143 ulint flags = not_redundant ? 1 : 0;
1144
1145 /* ZIP_SSIZE, ATOMIC_BLOBS, DATA_DIR, PAGE_COMPRESSION,
1146 PAGE_COMPRESSION_LEVEL are the same. */
1147 flags |= type & (DICT_TF_MASK_ZIP_SSIZE
1148 | DICT_TF_MASK_ATOMIC_BLOBS
1149 | DICT_TF_MASK_DATA_DIR
1150 | DICT_TF_MASK_PAGE_COMPRESSION
1151 | DICT_TF_MASK_PAGE_COMPRESSION_LEVEL
1152 | DICT_TF_MASK_NO_ROLLBACK);
1153
1154 ut_ad(dict_tf_is_valid(flags));
1155 return(flags);
1156 }
1157
1158 /** Read and return 5 integer fields from a SYS_TABLES record.
1159 @param[in] rec A record of SYS_TABLES
1160 @param[in] name Table Name, the same as SYS_TABLES.NAME
1161 @param[out] table_id Pointer to the table_id for this table
1162 @param[out] space_id Pointer to the space_id for this table
1163 @param[out] n_cols Pointer to number of columns for this table.
1164 @param[out] flags Pointer to table flags
1165 @param[out] flags2 Pointer to table flags2
1166 @return true if the record was read correctly, false if not. */
1167 MY_ATTRIBUTE((warn_unused_result))
1168 static
1169 bool
dict_sys_tables_rec_read(const rec_t * rec,const table_name_t & table_name,table_id_t * table_id,ulint * space_id,ulint * n_cols,ulint * flags,ulint * flags2)1170 dict_sys_tables_rec_read(
1171 const rec_t* rec,
1172 const table_name_t& table_name,
1173 table_id_t* table_id,
1174 ulint* space_id,
1175 ulint* n_cols,
1176 ulint* flags,
1177 ulint* flags2)
1178 {
1179 const byte* field;
1180 ulint len;
1181 ulint type;
1182
1183 field = rec_get_nth_field_old(
1184 rec, DICT_FLD__SYS_TABLES__ID, &len);
1185 ut_ad(len == 8);
1186 *table_id = static_cast<table_id_t>(mach_read_from_8(field));
1187
1188 field = rec_get_nth_field_old(
1189 rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1190 ut_ad(len == 4);
1191 *space_id = mach_read_from_4(field);
1192
1193 /* Read the 4 byte flags from the TYPE field */
1194 field = rec_get_nth_field_old(
1195 rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1196 ut_a(len == 4);
1197 type = mach_read_from_4(field);
1198
1199 /* Handle MDEV-12873 InnoDB SYS_TABLES.TYPE incompatibility
1200 for PAGE_COMPRESSED=YES in MariaDB 10.2.2 to 10.2.6.
1201
1202 MariaDB 10.2.2 introduced the SHARED_SPACE flag from MySQL 5.7,
1203 shifting the flags PAGE_COMPRESSION, PAGE_COMPRESSION_LEVEL,
1204 ATOMIC_WRITES (repurposed to NO_ROLLBACK in 10.3.1) by one bit.
1205 The SHARED_SPACE flag would always
1206 be written as 0 by MariaDB, because MariaDB does not support
1207 CREATE TABLESPACE or CREATE TABLE...TABLESPACE for InnoDB.
1208
1209 So, instead of the bits AALLLLCxxxxxxx we would have
1210 AALLLLC0xxxxxxx if the table was created with MariaDB 10.2.2
1211 to 10.2.6. (AA=ATOMIC_WRITES, LLLL=PAGE_COMPRESSION_LEVEL,
1212 C=PAGE_COMPRESSED, xxxxxxx=7 bits that were not moved.)
1213
1214 The case LLLLC=00000 is not a problem. The problem is the case
1215 AALLLL10DB00001 where D is the (mostly ignored) DATA_DIRECTORY
1216 flag and B is the ATOMIC_BLOBS flag (1 for ROW_FORMAT=DYNAMIC
1217 and 0 for ROW_FORMAT=COMPACT in this case). Other low-order
1218 bits must be so, because PAGE_COMPRESSED=YES is only allowed
1219 for ROW_FORMAT=DYNAMIC and ROW_FORMAT=COMPACT, not for
1220 ROW_FORMAT=REDUNDANT or ROW_FORMAT=COMPRESSED.
1221
1222 Starting with MariaDB 10.2.4, the flags would be
1223 00LLLL10DB00001, because ATOMIC_WRITES is always written as 0.
1224
1225 We will concentrate on the PAGE_COMPRESSION_LEVEL and
1226 PAGE_COMPRESSED=YES. PAGE_COMPRESSED=NO implies
1227 PAGE_COMPRESSION_LEVEL=0, and in that case all the affected
1228 bits will be 0. For PAGE_COMPRESSED=YES, the values 1..9 are
1229 allowed for PAGE_COMPRESSION_LEVEL. That is, we must interpret
1230 the bits AALLLL10DB00001 as AALLLL1DB00001.
1231
1232 If someone created a table in MariaDB 10.2.2 or 10.2.3 with
1233 the attribute ATOMIC_WRITES=OFF (value 2) and without
1234 PAGE_COMPRESSED=YES or PAGE_COMPRESSION_LEVEL, that should be
1235 rejected. The value ATOMIC_WRITES=ON (1) would look like
1236 ATOMIC_WRITES=OFF, but it would be ignored starting with
1237 MariaDB 10.2.4. */
1238 compile_time_assert(DICT_TF_POS_PAGE_COMPRESSION == 7);
1239 compile_time_assert(DICT_TF_POS_UNUSED == 14);
1240
1241 if ((type & 0x19f) != 0x101) {
1242 /* The table cannot have been created with MariaDB
1243 10.2.2 to 10.2.6, because they would write the
1244 low-order bits of SYS_TABLES.TYPE as 0b10xx00001 for
1245 PAGE_COMPRESSED=YES. No adjustment is applicable. */
1246 } else if (type >= 3 << 13) {
1247 /* 10.2.2 and 10.2.3 write ATOMIC_WRITES less than 3,
1248 and no other flags above that can be set for the
1249 SYS_TABLES.TYPE to be in the 10.2.2..10.2.6 format.
1250 This would in any case be invalid format for 10.2 and
1251 earlier releases. */
1252 ut_ad(!dict_sys_tables_type_valid(type, true));
1253 } else {
1254 /* SYS_TABLES.TYPE is of the form AALLLL10DB00001. We
1255 must still validate that the LLLL bits are between 0
1256 and 9 before we can discard the extraneous 0 bit. */
1257 ut_ad(!DICT_TF_GET_PAGE_COMPRESSION(type));
1258
1259 if ((((type >> 9) & 0xf) - 1) < 9) {
1260 ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) & 1);
1261
1262 type = (type & 0x7fU) | (type >> 1 & ~0x7fU);
1263
1264 ut_ad(DICT_TF_GET_PAGE_COMPRESSION(type));
1265 ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) >= 1);
1266 ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) <= 9);
1267 } else {
1268 ut_ad(!dict_sys_tables_type_valid(type, true));
1269 }
1270 }
1271
1272 /* The low order bit of SYS_TABLES.TYPE is always set to 1. But in
1273 dict_table_t::flags the low order bit is used to determine if the
1274 ROW_FORMAT=REDUNDANT (0) or anything else (1).
1275 Read the 4 byte N_COLS field and look at the high order bit. It
1276 should be set for COMPACT and later. It should not be set for
1277 REDUNDANT. */
1278 field = rec_get_nth_field_old(
1279 rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1280 ut_a(len == 4);
1281 *n_cols = mach_read_from_4(field);
1282
1283 const bool not_redundant = 0 != (*n_cols & DICT_N_COLS_COMPACT);
1284
1285 if (!dict_sys_tables_type_valid(type, not_redundant)) {
1286 ib::error() << "Table " << table_name << " in InnoDB"
1287 " data dictionary contains invalid flags."
1288 " SYS_TABLES.TYPE=" << type <<
1289 " SYS_TABLES.N_COLS=" << *n_cols;
1290 return(false);
1291 }
1292
1293 *flags = dict_sys_tables_type_to_tf(type, not_redundant);
1294
1295 /* For tables created before MySQL 4.1, there may be
1296 garbage in SYS_TABLES.MIX_LEN where flags2 are found. Such tables
1297 would always be in ROW_FORMAT=REDUNDANT which do not have the
1298 high bit set in n_cols, and flags would be zero.
1299 MySQL 4.1 was the first version to support innodb_file_per_table,
1300 that is, *space_id != 0. */
1301 if (not_redundant || *space_id != 0 || *n_cols & DICT_N_COLS_COMPACT) {
1302
1303 /* Get flags2 from SYS_TABLES.MIX_LEN */
1304 field = rec_get_nth_field_old(
1305 rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1306 *flags2 = mach_read_from_4(field);
1307
1308 if (!dict_tf2_is_valid(*flags, *flags2)) {
1309 ib::error() << "Table " << table_name << " in InnoDB"
1310 " data dictionary contains invalid flags."
1311 " SYS_TABLES.TYPE=" << type
1312 << " SYS_TABLES.MIX_LEN=" << *flags2;
1313 return(false);
1314 }
1315
1316 /* DICT_TF2_FTS will be set when indexes are being loaded */
1317 *flags2 &= ~DICT_TF2_FTS;
1318
1319 /* Now that we have used this bit, unset it. */
1320 *n_cols &= ~DICT_N_COLS_COMPACT;
1321 } else {
1322 *flags2 = 0;
1323 }
1324
1325 return(true);
1326 }
1327
1328 /** Load and check each non-predefined tablespace mentioned in SYS_TABLES.
1329 Search SYS_TABLES and check each tablespace mentioned that has not
1330 already been added to the fil_system. If it is valid, add it to the
1331 file_system list.
1332 @return the highest space ID found. */
dict_check_sys_tables()1333 static ulint dict_check_sys_tables()
1334 {
1335 ulint max_space_id = 0;
1336 btr_pcur_t pcur;
1337 const rec_t* rec;
1338 mtr_t mtr;
1339
1340 DBUG_ENTER("dict_check_sys_tables");
1341
1342 ut_d(dict_sys.assert_locked());
1343
1344 mtr_start(&mtr);
1345
1346 /* Before traversing SYS_TABLES, let's make sure we have
1347 SYS_TABLESPACES and SYS_DATAFILES loaded. */
1348 dict_table_t* sys_tablespaces;
1349 dict_table_t* sys_datafiles;
1350 sys_tablespaces = dict_table_get_low("SYS_TABLESPACES");
1351 ut_a(sys_tablespaces != NULL);
1352 sys_datafiles = dict_table_get_low("SYS_DATAFILES");
1353 ut_a(sys_datafiles != NULL);
1354
1355 for (rec = dict_startscan_system(&pcur, &mtr, SYS_TABLES);
1356 rec != NULL;
1357 mtr.commit(), mtr.start(),
1358 rec = dict_getnext_system(&pcur, &mtr)) {
1359 const byte* field;
1360 ulint len;
1361 table_id_t table_id;
1362 ulint space_id;
1363 ulint n_cols;
1364 ulint flags;
1365 ulint flags2;
1366
1367 /* If a table record is not useable, ignore it and continue
1368 on to the next record. Error messages were logged. */
1369 if (dict_sys_tables_rec_check(rec) != NULL) {
1370 continue;
1371 }
1372
1373 /* Copy the table name from rec */
1374 field = rec_get_nth_field_old(
1375 rec, DICT_FLD__SYS_TABLES__NAME, &len);
1376
1377 table_name_t table_name(mem_strdupl((char*) field, len));
1378 DBUG_PRINT("dict_check_sys_tables",
1379 ("name: %p, '%s'", table_name.m_name,
1380 table_name.m_name));
1381
1382 if (!dict_sys_tables_rec_read(rec, table_name,
1383 &table_id, &space_id,
1384 &n_cols, &flags, &flags2)
1385 || space_id == TRX_SYS_SPACE) {
1386 next:
1387 ut_free(table_name.m_name);
1388 continue;
1389 }
1390
1391 if (strstr(table_name.m_name, "/" TEMP_FILE_PREFIX "-")) {
1392 /* This table will be dropped by
1393 row_mysql_drop_garbage_tables().
1394 We do not care if the file exists. */
1395 goto next;
1396 }
1397
1398 if (flags2 & DICT_TF2_DISCARDED) {
1399 ib::info() << "Ignoring tablespace for " << table_name
1400 << " because the DISCARD flag is set .";
1401 goto next;
1402 }
1403
1404 /* For tables or partitions using .ibd files, the flag
1405 DICT_TF2_USE_FILE_PER_TABLE was not set in MIX_LEN
1406 before MySQL 5.6.5. The flag should not have been
1407 introduced in persistent storage. MariaDB will keep
1408 setting the flag when writing SYS_TABLES entries for
1409 newly created or rebuilt tables or partitions, but
1410 will otherwise ignore the flag. */
1411
1412 /* Now that we have the proper name for this tablespace,
1413 look to see if it is already in the tablespace cache. */
1414 if (const fil_space_t* space
1415 = fil_space_for_table_exists_in_mem(
1416 space_id, table_name.m_name, flags)) {
1417 /* Recovery can open a datafile that does not
1418 match SYS_DATAFILES. If they don't match, update
1419 SYS_DATAFILES. */
1420 char *dict_path = dict_get_first_path(space_id);
1421 const char *fil_path = space->chain.start->name;
1422 if (dict_path
1423 && strcmp(dict_path, fil_path)) {
1424 dict_update_filepath(space_id, fil_path);
1425 }
1426 ut_free(dict_path);
1427 ut_free(table_name.m_name);
1428 continue;
1429 }
1430
1431 /* Set the expected filepath from the data dictionary.
1432 If the file is found elsewhere (from an ISL or the default
1433 location) or this path is the same file but looks different,
1434 fil_ibd_open() will update the dictionary with what is
1435 opened. */
1436 char* filepath = dict_get_first_path(space_id);
1437
1438 /* Check that the .ibd file exists. */
1439 if (!fil_ibd_open(
1440 false,
1441 !srv_read_only_mode && srv_log_file_size != 0,
1442 FIL_TYPE_TABLESPACE,
1443 space_id, dict_tf_to_fsp_flags(flags),
1444 table_name, filepath)) {
1445 ib::warn() << "Ignoring tablespace for "
1446 << table_name
1447 << " because it could not be opened.";
1448 }
1449
1450 max_space_id = ut_max(max_space_id, space_id);
1451
1452 ut_free(table_name.m_name);
1453 ut_free(filepath);
1454 }
1455
1456 mtr_commit(&mtr);
1457
1458 DBUG_RETURN(max_space_id);
1459 }
1460
1461 /** Check each tablespace found in the data dictionary.
1462 Then look at each table defined in SYS_TABLES that has a space_id > 0
1463 to find all the file-per-table tablespaces.
1464
1465 In a crash recovery we already have some tablespace objects created from
1466 processing the REDO log. Any other tablespace in SYS_TABLESPACES not
1467 previously used in recovery will be opened here. We will compare the
1468 space_id information in the data dictionary to what we find in the
1469 tablespace file. In addition, more validation will be done if recovery
1470 was needed and force_recovery is not set.
1471
1472 We also scan the biggest space id, and store it to fil_system. */
dict_check_tablespaces_and_store_max_id()1473 void dict_check_tablespaces_and_store_max_id()
1474 {
1475 mtr_t mtr;
1476
1477 DBUG_ENTER("dict_check_tablespaces_and_store_max_id");
1478
1479 dict_sys_lock();
1480
1481 /* Initialize the max space_id from sys header */
1482 mtr.start();
1483 ulint max_space_id = mach_read_from_4(DICT_HDR_MAX_SPACE_ID
1484 + DICT_HDR
1485 + dict_hdr_get(&mtr)->frame);
1486 mtr.commit();
1487
1488 fil_set_max_space_id_if_bigger(max_space_id);
1489
1490 /* Open all tablespaces referenced in SYS_TABLES.
1491 This will update SYS_TABLESPACES and SYS_DATAFILES if it
1492 finds any file-per-table tablespaces not already there. */
1493 max_space_id = dict_check_sys_tables();
1494 fil_set_max_space_id_if_bigger(max_space_id);
1495
1496 dict_sys_unlock();
1497
1498 DBUG_VOID_RETURN;
1499 }
1500
1501 /** Error message for a delete-marked record in dict_load_column_low() */
1502 static const char* dict_load_column_del = "delete-marked record in SYS_COLUMN";
1503
1504 /** Load a table column definition from a SYS_COLUMNS record to dict_table_t.
1505 @return error message
1506 @retval NULL on success */
1507 static
1508 const char*
dict_load_column_low(dict_table_t * table,mem_heap_t * heap,dict_col_t * column,table_id_t * table_id,const char ** col_name,const rec_t * rec,ulint * nth_v_col)1509 dict_load_column_low(
1510 dict_table_t* table, /*!< in/out: table, could be NULL
1511 if we just populate a dict_column_t
1512 struct with information from
1513 a SYS_COLUMNS record */
1514 mem_heap_t* heap, /*!< in/out: memory heap
1515 for temporary storage */
1516 dict_col_t* column, /*!< out: dict_column_t to fill,
1517 or NULL if table != NULL */
1518 table_id_t* table_id, /*!< out: table id */
1519 const char** col_name, /*!< out: column name */
1520 const rec_t* rec, /*!< in: SYS_COLUMNS record */
1521 ulint* nth_v_col) /*!< out: if not NULL, this
1522 records the "n" of "nth" virtual
1523 column */
1524 {
1525 char* name;
1526 const byte* field;
1527 ulint len;
1528 ulint mtype;
1529 ulint prtype;
1530 ulint col_len;
1531 ulint pos;
1532 ulint num_base;
1533
1534 ut_ad(!table == !!column);
1535
1536 if (rec_get_deleted_flag(rec, 0)) {
1537 return(dict_load_column_del);
1538 }
1539
1540 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_COLUMNS) {
1541 return("wrong number of columns in SYS_COLUMNS record");
1542 }
1543
1544 field = rec_get_nth_field_old(
1545 rec, DICT_FLD__SYS_COLUMNS__TABLE_ID, &len);
1546 if (len != 8) {
1547 err_len:
1548 return("incorrect column length in SYS_COLUMNS");
1549 }
1550
1551 if (table_id) {
1552 *table_id = mach_read_from_8(field);
1553 } else if (table->id != mach_read_from_8(field)) {
1554 return("SYS_COLUMNS.TABLE_ID mismatch");
1555 }
1556
1557 field = rec_get_nth_field_old(
1558 rec, DICT_FLD__SYS_COLUMNS__POS, &len);
1559 if (len != 4) {
1560 goto err_len;
1561 }
1562
1563 pos = mach_read_from_4(field);
1564
1565 rec_get_nth_field_offs_old(
1566 rec, DICT_FLD__SYS_COLUMNS__DB_TRX_ID, &len);
1567 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1568 goto err_len;
1569 }
1570 rec_get_nth_field_offs_old(
1571 rec, DICT_FLD__SYS_COLUMNS__DB_ROLL_PTR, &len);
1572 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1573 goto err_len;
1574 }
1575
1576 field = rec_get_nth_field_old(
1577 rec, DICT_FLD__SYS_COLUMNS__NAME, &len);
1578 if (len == 0 || len == UNIV_SQL_NULL) {
1579 goto err_len;
1580 }
1581
1582 name = mem_heap_strdupl(heap, (const char*) field, len);
1583
1584 if (col_name) {
1585 *col_name = name;
1586 }
1587
1588 field = rec_get_nth_field_old(
1589 rec, DICT_FLD__SYS_COLUMNS__MTYPE, &len);
1590 if (len != 4) {
1591 goto err_len;
1592 }
1593
1594 mtype = mach_read_from_4(field);
1595
1596 field = rec_get_nth_field_old(
1597 rec, DICT_FLD__SYS_COLUMNS__PRTYPE, &len);
1598 if (len != 4) {
1599 goto err_len;
1600 }
1601 prtype = mach_read_from_4(field);
1602
1603 if (dtype_get_charset_coll(prtype) == 0
1604 && dtype_is_string_type(mtype)) {
1605 /* The table was created with < 4.1.2. */
1606
1607 if (dtype_is_binary_string_type(mtype, prtype)) {
1608 /* Use the binary collation for
1609 string columns of binary type. */
1610
1611 prtype = dtype_form_prtype(
1612 prtype,
1613 DATA_MYSQL_BINARY_CHARSET_COLL);
1614 } else {
1615 /* Use the default charset for
1616 other than binary columns. */
1617
1618 prtype = dtype_form_prtype(
1619 prtype,
1620 data_mysql_default_charset_coll);
1621 }
1622 }
1623
1624 if (table && table->n_def != pos && !(prtype & DATA_VIRTUAL)) {
1625 return("SYS_COLUMNS.POS mismatch");
1626 }
1627
1628 field = rec_get_nth_field_old(
1629 rec, DICT_FLD__SYS_COLUMNS__LEN, &len);
1630 if (len != 4) {
1631 goto err_len;
1632 }
1633 col_len = mach_read_from_4(field);
1634 field = rec_get_nth_field_old(
1635 rec, DICT_FLD__SYS_COLUMNS__PREC, &len);
1636 if (len != 4) {
1637 goto err_len;
1638 }
1639 num_base = mach_read_from_4(field);
1640
1641 if (table) {
1642 if (prtype & DATA_VIRTUAL) {
1643 #ifdef UNIV_DEBUG
1644 dict_v_col_t* vcol =
1645 #endif
1646 dict_mem_table_add_v_col(
1647 table, heap, name, mtype,
1648 prtype, col_len,
1649 dict_get_v_col_mysql_pos(pos), num_base);
1650 ut_ad(vcol->v_pos == dict_get_v_col_pos(pos));
1651 } else {
1652 ut_ad(num_base == 0);
1653 dict_mem_table_add_col(table, heap, name, mtype,
1654 prtype, col_len);
1655 }
1656 } else {
1657 dict_mem_fill_column_struct(column, pos, mtype,
1658 prtype, col_len);
1659 }
1660
1661 /* Report the virtual column number */
1662 if ((prtype & DATA_VIRTUAL) && nth_v_col != NULL) {
1663 *nth_v_col = dict_get_v_col_pos(pos);
1664 }
1665
1666 return(NULL);
1667 }
1668
1669 /** Error message for a delete-marked record in dict_load_virtual_low() */
1670 static const char* dict_load_virtual_del = "delete-marked record in SYS_VIRTUAL";
1671
1672 /** Load a virtual column "mapping" (to base columns) information
1673 from a SYS_VIRTUAL record
1674 @param[in,out] table table
1675 @param[in,out] column mapped base column's dict_column_t
1676 @param[in,out] table_id table id
1677 @param[in,out] pos virtual column position
1678 @param[in,out] base_pos base column position
1679 @param[in] rec SYS_VIRTUAL record
1680 @return error message
1681 @retval NULL on success */
1682 static
1683 const char*
dict_load_virtual_low(dict_table_t * table,dict_col_t ** column,table_id_t * table_id,ulint * pos,ulint * base_pos,const rec_t * rec)1684 dict_load_virtual_low(
1685 dict_table_t* table,
1686 dict_col_t** column,
1687 table_id_t* table_id,
1688 ulint* pos,
1689 ulint* base_pos,
1690 const rec_t* rec)
1691 {
1692 const byte* field;
1693 ulint len;
1694 ulint base;
1695
1696 if (rec_get_deleted_flag(rec, 0)) {
1697 return(dict_load_virtual_del);
1698 }
1699
1700 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_VIRTUAL) {
1701 return("wrong number of columns in SYS_VIRTUAL record");
1702 }
1703
1704 field = rec_get_nth_field_old(
1705 rec, DICT_FLD__SYS_VIRTUAL__TABLE_ID, &len);
1706 if (len != 8) {
1707 err_len:
1708 return("incorrect column length in SYS_VIRTUAL");
1709 }
1710
1711 if (table_id != NULL) {
1712 *table_id = mach_read_from_8(field);
1713 } else if (table->id != mach_read_from_8(field)) {
1714 return("SYS_VIRTUAL.TABLE_ID mismatch");
1715 }
1716
1717 field = rec_get_nth_field_old(
1718 rec, DICT_FLD__SYS_VIRTUAL__POS, &len);
1719 if (len != 4) {
1720 goto err_len;
1721 }
1722
1723 if (pos != NULL) {
1724 *pos = mach_read_from_4(field);
1725 }
1726
1727 field = rec_get_nth_field_old(
1728 rec, DICT_FLD__SYS_VIRTUAL__BASE_POS, &len);
1729 if (len != 4) {
1730 goto err_len;
1731 }
1732
1733 base = mach_read_from_4(field);
1734
1735 if (base_pos != NULL) {
1736 *base_pos = base;
1737 }
1738
1739 rec_get_nth_field_offs_old(
1740 rec, DICT_FLD__SYS_VIRTUAL__DB_TRX_ID, &len);
1741 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1742 goto err_len;
1743 }
1744
1745 rec_get_nth_field_offs_old(
1746 rec, DICT_FLD__SYS_VIRTUAL__DB_ROLL_PTR, &len);
1747 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1748 goto err_len;
1749 }
1750
1751 if (column != NULL) {
1752 *column = dict_table_get_nth_col(table, base);
1753 }
1754
1755 return(NULL);
1756 }
1757
1758 /********************************************************************//**
1759 Loads definitions for table columns. */
1760 static
1761 void
dict_load_columns(dict_table_t * table,mem_heap_t * heap)1762 dict_load_columns(
1763 /*==============*/
1764 dict_table_t* table, /*!< in/out: table */
1765 mem_heap_t* heap) /*!< in/out: memory heap
1766 for temporary storage */
1767 {
1768 dict_table_t* sys_columns;
1769 dict_index_t* sys_index;
1770 btr_pcur_t pcur;
1771 dtuple_t* tuple;
1772 dfield_t* dfield;
1773 const rec_t* rec;
1774 byte* buf;
1775 ulint i;
1776 mtr_t mtr;
1777 ulint n_skipped = 0;
1778
1779 ut_ad(mutex_own(&dict_sys.mutex));
1780
1781 mtr_start(&mtr);
1782
1783 sys_columns = dict_table_get_low("SYS_COLUMNS");
1784 sys_index = UT_LIST_GET_FIRST(sys_columns->indexes);
1785 ut_ad(!dict_table_is_comp(sys_columns));
1786
1787 ut_ad(name_of_col_is(sys_columns, sys_index,
1788 DICT_FLD__SYS_COLUMNS__NAME, "NAME"));
1789 ut_ad(name_of_col_is(sys_columns, sys_index,
1790 DICT_FLD__SYS_COLUMNS__PREC, "PREC"));
1791
1792 tuple = dtuple_create(heap, 1);
1793 dfield = dtuple_get_nth_field(tuple, 0);
1794
1795 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
1796 mach_write_to_8(buf, table->id);
1797
1798 dfield_set_data(dfield, buf, 8);
1799 dict_index_copy_types(tuple, sys_index, 1);
1800
1801 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
1802 BTR_SEARCH_LEAF, &pcur, &mtr);
1803
1804 ut_ad(table->n_t_cols == static_cast<ulint>(
1805 table->n_cols) + static_cast<ulint>(table->n_v_cols));
1806
1807 for (i = 0;
1808 i + DATA_N_SYS_COLS < table->n_t_cols + n_skipped;
1809 i++) {
1810 const char* err_msg;
1811 const char* name = NULL;
1812 ulint nth_v_col = ULINT_UNDEFINED;
1813
1814 rec = btr_pcur_get_rec(&pcur);
1815
1816 ut_a(btr_pcur_is_on_user_rec(&pcur));
1817
1818 err_msg = dict_load_column_low(table, heap, NULL, NULL,
1819 &name, rec, &nth_v_col);
1820
1821 if (err_msg == dict_load_column_del) {
1822 n_skipped++;
1823 goto next_rec;
1824 } else if (err_msg) {
1825 ib::fatal() << err_msg;
1826 }
1827
1828 /* Note: Currently we have one DOC_ID column that is
1829 shared by all FTS indexes on a table. And only non-virtual
1830 column can be used for FULLTEXT index */
1831 if (innobase_strcasecmp(name,
1832 FTS_DOC_ID_COL_NAME) == 0
1833 && nth_v_col == ULINT_UNDEFINED) {
1834 dict_col_t* col;
1835 /* As part of normal loading of tables the
1836 table->flag is not set for tables with FTS
1837 till after the FTS indexes are loaded. So we
1838 create the fts_t instance here if there isn't
1839 one already created.
1840
1841 This case does not arise for table create as
1842 the flag is set before the table is created. */
1843 if (table->fts == NULL) {
1844 table->fts = fts_create(table);
1845 }
1846
1847 ut_a(table->fts->doc_col == ULINT_UNDEFINED);
1848
1849 col = dict_table_get_nth_col(table, i - n_skipped);
1850
1851 ut_ad(col->len == sizeof(doc_id_t));
1852
1853 if (col->prtype & DATA_FTS_DOC_ID) {
1854 DICT_TF2_FLAG_SET(
1855 table, DICT_TF2_FTS_HAS_DOC_ID);
1856 DICT_TF2_FLAG_UNSET(
1857 table, DICT_TF2_FTS_ADD_DOC_ID);
1858 }
1859
1860 table->fts->doc_col = i - n_skipped;
1861 }
1862 next_rec:
1863 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1864 }
1865
1866 btr_pcur_close(&pcur);
1867 mtr_commit(&mtr);
1868 }
1869
1870 /** Loads SYS_VIRTUAL info for one virtual column
1871 @param[in,out] table table
1872 @param[in] nth_v_col virtual column sequence num
1873 @param[in,out] v_col virtual column
1874 @param[in,out] heap memory heap
1875 */
1876 static
1877 void
dict_load_virtual_one_col(dict_table_t * table,ulint nth_v_col,dict_v_col_t * v_col,mem_heap_t * heap)1878 dict_load_virtual_one_col(
1879 dict_table_t* table,
1880 ulint nth_v_col,
1881 dict_v_col_t* v_col,
1882 mem_heap_t* heap)
1883 {
1884 dict_table_t* sys_virtual;
1885 dict_index_t* sys_virtual_index;
1886 btr_pcur_t pcur;
1887 dtuple_t* tuple;
1888 dfield_t* dfield;
1889 const rec_t* rec;
1890 byte* buf;
1891 ulint i = 0;
1892 mtr_t mtr;
1893 ulint skipped = 0;
1894
1895 ut_ad(mutex_own(&dict_sys.mutex));
1896
1897 if (v_col->num_base == 0) {
1898 return;
1899 }
1900
1901 mtr_start(&mtr);
1902
1903 sys_virtual = dict_table_get_low("SYS_VIRTUAL");
1904 sys_virtual_index = UT_LIST_GET_FIRST(sys_virtual->indexes);
1905 ut_ad(!dict_table_is_comp(sys_virtual));
1906
1907 ut_ad(name_of_col_is(sys_virtual, sys_virtual_index,
1908 DICT_FLD__SYS_VIRTUAL__POS, "POS"));
1909
1910 tuple = dtuple_create(heap, 2);
1911
1912 /* table ID field */
1913 dfield = dtuple_get_nth_field(tuple, 0);
1914
1915 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
1916 mach_write_to_8(buf, table->id);
1917
1918 dfield_set_data(dfield, buf, 8);
1919
1920 /* virtual column pos field */
1921 dfield = dtuple_get_nth_field(tuple, 1);
1922
1923 buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1924 ulint vcol_pos = dict_create_v_col_pos(nth_v_col, v_col->m_col.ind);
1925 mach_write_to_4(buf, vcol_pos);
1926
1927 dfield_set_data(dfield, buf, 4);
1928
1929 dict_index_copy_types(tuple, sys_virtual_index, 2);
1930
1931 btr_pcur_open_on_user_rec(sys_virtual_index, tuple, PAGE_CUR_GE,
1932 BTR_SEARCH_LEAF, &pcur, &mtr);
1933
1934 for (i = 0; i < unsigned{v_col->num_base} + skipped; i++) {
1935 const char* err_msg;
1936 ulint pos;
1937
1938 ut_ad(btr_pcur_is_on_user_rec(&pcur));
1939
1940 rec = btr_pcur_get_rec(&pcur);
1941
1942 ut_a(btr_pcur_is_on_user_rec(&pcur));
1943
1944 err_msg = dict_load_virtual_low(table,
1945 &v_col->base_col[i - skipped],
1946 NULL,
1947 &pos, NULL, rec);
1948
1949 if (err_msg) {
1950 if (err_msg != dict_load_virtual_del) {
1951 ib::fatal() << err_msg;
1952 } else {
1953 skipped++;
1954 }
1955 } else {
1956 ut_ad(pos == vcol_pos);
1957 }
1958
1959 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1960 }
1961
1962 btr_pcur_close(&pcur);
1963 mtr_commit(&mtr);
1964 }
1965
1966 /** Loads info from SYS_VIRTUAL for virtual columns.
1967 @param[in,out] table table
1968 @param[in] heap memory heap
1969 */
1970 static
1971 void
dict_load_virtual(dict_table_t * table,mem_heap_t * heap)1972 dict_load_virtual(
1973 dict_table_t* table,
1974 mem_heap_t* heap)
1975 {
1976 for (ulint i = 0; i < table->n_v_cols; i++) {
1977 dict_v_col_t* v_col = dict_table_get_nth_v_col(table, i);
1978
1979 dict_load_virtual_one_col(table, i, v_col, heap);
1980 }
1981 }
1982
1983 /** Error message for a delete-marked record in dict_load_field_low() */
1984 static const char* dict_load_field_del = "delete-marked record in SYS_FIELDS";
1985
1986 /** Load an index field definition from a SYS_FIELDS record to dict_index_t.
1987 @return error message
1988 @retval NULL on success */
1989 static
1990 const char*
dict_load_field_low(byte * index_id,dict_index_t * index,dict_field_t * sys_field,ulint * pos,byte * last_index_id,mem_heap_t * heap,const rec_t * rec)1991 dict_load_field_low(
1992 byte* index_id, /*!< in/out: index id (8 bytes)
1993 an "in" value if index != NULL
1994 and "out" if index == NULL */
1995 dict_index_t* index, /*!< in/out: index, could be NULL
1996 if we just populate a dict_field_t
1997 struct with information from
1998 a SYS_FIELDS record */
1999 dict_field_t* sys_field, /*!< out: dict_field_t to be
2000 filled */
2001 ulint* pos, /*!< out: Field position */
2002 byte* last_index_id, /*!< in: last index id */
2003 mem_heap_t* heap, /*!< in/out: memory heap
2004 for temporary storage */
2005 const rec_t* rec) /*!< in: SYS_FIELDS record */
2006 {
2007 const byte* field;
2008 ulint len;
2009 unsigned pos_and_prefix_len;
2010 unsigned prefix_len;
2011 bool first_field;
2012 ulint position;
2013
2014 /* Either index or sys_field is supplied, not both */
2015 ut_a((!index) || (!sys_field));
2016
2017 if (rec_get_deleted_flag(rec, 0)) {
2018 return(dict_load_field_del);
2019 }
2020
2021 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FIELDS) {
2022 return("wrong number of columns in SYS_FIELDS record");
2023 }
2024
2025 field = rec_get_nth_field_old(
2026 rec, DICT_FLD__SYS_FIELDS__INDEX_ID, &len);
2027 if (len != 8) {
2028 err_len:
2029 return("incorrect column length in SYS_FIELDS");
2030 }
2031
2032 if (!index) {
2033 ut_a(last_index_id);
2034 memcpy(index_id, (const char*) field, 8);
2035 first_field = memcmp(index_id, last_index_id, 8);
2036 } else {
2037 first_field = (index->n_def == 0);
2038 if (memcmp(field, index_id, 8)) {
2039 return("SYS_FIELDS.INDEX_ID mismatch");
2040 }
2041 }
2042
2043 /* The next field stores the field position in the index and a
2044 possible column prefix length if the index field does not
2045 contain the whole column. The storage format is like this: if
2046 there is at least one prefix field in the index, then the HIGH
2047 2 bytes contain the field number (index->n_def) and the low 2
2048 bytes the prefix length for the field. Otherwise the field
2049 number (index->n_def) is contained in the 2 LOW bytes. */
2050
2051 field = rec_get_nth_field_old(
2052 rec, DICT_FLD__SYS_FIELDS__POS, &len);
2053 if (len != 4) {
2054 goto err_len;
2055 }
2056
2057 pos_and_prefix_len = mach_read_from_4(field);
2058
2059 if (index && UNIV_UNLIKELY
2060 ((pos_and_prefix_len & 0xFFFFUL) != index->n_def
2061 && (pos_and_prefix_len >> 16 & 0xFFFF) != index->n_def)) {
2062 return("SYS_FIELDS.POS mismatch");
2063 }
2064
2065 if (first_field || pos_and_prefix_len > 0xFFFFUL) {
2066 prefix_len = pos_and_prefix_len & 0xFFFFUL;
2067 position = (pos_and_prefix_len & 0xFFFF0000UL) >> 16;
2068 } else {
2069 prefix_len = 0;
2070 position = pos_and_prefix_len & 0xFFFFUL;
2071 }
2072
2073 rec_get_nth_field_offs_old(
2074 rec, DICT_FLD__SYS_FIELDS__DB_TRX_ID, &len);
2075 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2076 goto err_len;
2077 }
2078 rec_get_nth_field_offs_old(
2079 rec, DICT_FLD__SYS_FIELDS__DB_ROLL_PTR, &len);
2080 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2081 goto err_len;
2082 }
2083
2084 field = rec_get_nth_field_old(
2085 rec, DICT_FLD__SYS_FIELDS__COL_NAME, &len);
2086 if (len == 0 || len == UNIV_SQL_NULL) {
2087 goto err_len;
2088 }
2089
2090 if (index) {
2091 dict_mem_index_add_field(
2092 index, mem_heap_strdupl(heap, (const char*) field, len),
2093 prefix_len);
2094 } else {
2095 ut_a(sys_field);
2096 ut_a(pos);
2097
2098 sys_field->name = mem_heap_strdupl(
2099 heap, (const char*) field, len);
2100 sys_field->prefix_len = prefix_len & ((1U << 12) - 1);
2101 *pos = position;
2102 }
2103
2104 return(NULL);
2105 }
2106
2107 /********************************************************************//**
2108 Loads definitions for index fields.
2109 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption */
2110 static
2111 ulint
dict_load_fields(dict_index_t * index,mem_heap_t * heap)2112 dict_load_fields(
2113 /*=============*/
2114 dict_index_t* index, /*!< in/out: index whose fields to load */
2115 mem_heap_t* heap) /*!< in: memory heap for temporary storage */
2116 {
2117 dict_table_t* sys_fields;
2118 dict_index_t* sys_index;
2119 btr_pcur_t pcur;
2120 dtuple_t* tuple;
2121 dfield_t* dfield;
2122 const rec_t* rec;
2123 byte* buf;
2124 ulint i;
2125 mtr_t mtr;
2126 dberr_t error;
2127
2128 ut_ad(mutex_own(&dict_sys.mutex));
2129
2130 mtr_start(&mtr);
2131
2132 sys_fields = dict_table_get_low("SYS_FIELDS");
2133 sys_index = UT_LIST_GET_FIRST(sys_fields->indexes);
2134 ut_ad(!dict_table_is_comp(sys_fields));
2135 ut_ad(name_of_col_is(sys_fields, sys_index,
2136 DICT_FLD__SYS_FIELDS__COL_NAME, "COL_NAME"));
2137
2138 tuple = dtuple_create(heap, 1);
2139 dfield = dtuple_get_nth_field(tuple, 0);
2140
2141 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2142 mach_write_to_8(buf, index->id);
2143
2144 dfield_set_data(dfield, buf, 8);
2145 dict_index_copy_types(tuple, sys_index, 1);
2146
2147 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2148 BTR_SEARCH_LEAF, &pcur, &mtr);
2149 for (i = 0; i < index->n_fields; i++) {
2150 const char* err_msg;
2151
2152 rec = btr_pcur_get_rec(&pcur);
2153
2154 ut_a(btr_pcur_is_on_user_rec(&pcur));
2155
2156 err_msg = dict_load_field_low(buf, index, NULL, NULL, NULL,
2157 heap, rec);
2158
2159 if (err_msg == dict_load_field_del) {
2160 /* There could be delete marked records in
2161 SYS_FIELDS because SYS_FIELDS.INDEX_ID can be
2162 updated by ALTER TABLE ADD INDEX. */
2163
2164 goto next_rec;
2165 } else if (err_msg) {
2166 ib::error() << err_msg;
2167 error = DB_CORRUPTION;
2168 goto func_exit;
2169 }
2170 next_rec:
2171 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2172 }
2173
2174 error = DB_SUCCESS;
2175 func_exit:
2176 btr_pcur_close(&pcur);
2177 mtr_commit(&mtr);
2178 return(error);
2179 }
2180
2181 /** Error message for a delete-marked record in dict_load_index_low() */
2182 static const char* dict_load_index_del = "delete-marked record in SYS_INDEXES";
2183 /** Error message for table->id mismatch in dict_load_index_low() */
2184 static const char* dict_load_index_id_err = "SYS_INDEXES.TABLE_ID mismatch";
2185 /** Error message for SYS_TABLES flags mismatch in dict_load_table_low() */
2186 static const char* dict_load_table_flags = "incorrect flags in SYS_TABLES";
2187
2188 /** Load an index definition from a SYS_INDEXES record to dict_index_t.
2189 If allocate=TRUE, we will create a dict_index_t structure and fill it
2190 accordingly. If allocated=FALSE, the dict_index_t will be supplied by
2191 the caller and filled with information read from the record.
2192 @return error message
2193 @retval NULL on success */
2194 static
2195 const char*
dict_load_index_low(byte * table_id,mem_heap_t * heap,const rec_t * rec,ibool allocate,dict_index_t ** index)2196 dict_load_index_low(
2197 byte* table_id, /*!< in/out: table id (8 bytes),
2198 an "in" value if allocate=TRUE
2199 and "out" when allocate=FALSE */
2200 mem_heap_t* heap, /*!< in/out: temporary memory heap */
2201 const rec_t* rec, /*!< in: SYS_INDEXES record */
2202 ibool allocate, /*!< in: TRUE=allocate *index,
2203 FALSE=fill in a pre-allocated
2204 *index */
2205 dict_index_t** index) /*!< out,own: index, or NULL */
2206 {
2207 const byte* field;
2208 ulint len;
2209 ulint name_len;
2210 char* name_buf;
2211 index_id_t id;
2212 ulint n_fields;
2213 ulint type;
2214 unsigned merge_threshold;
2215
2216 if (allocate) {
2217 /* If allocate=TRUE, no dict_index_t will
2218 be supplied. Initialize "*index" to NULL */
2219 *index = NULL;
2220 }
2221
2222 if (rec_get_deleted_flag(rec, 0)) {
2223 return(dict_load_index_del);
2224 }
2225
2226 if (rec_get_n_fields_old(rec) == DICT_NUM_FIELDS__SYS_INDEXES) {
2227 /* MERGE_THRESHOLD exists */
2228 field = rec_get_nth_field_old(
2229 rec, DICT_FLD__SYS_INDEXES__MERGE_THRESHOLD, &len);
2230 switch (len) {
2231 case 4:
2232 merge_threshold = mach_read_from_4(field);
2233 break;
2234 case UNIV_SQL_NULL:
2235 merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2236 break;
2237 default:
2238 return("incorrect MERGE_THRESHOLD length"
2239 " in SYS_INDEXES");
2240 }
2241 } else if (rec_get_n_fields_old(rec)
2242 == DICT_NUM_FIELDS__SYS_INDEXES - 1) {
2243 /* MERGE_THRESHOLD doesn't exist */
2244
2245 merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2246 } else {
2247 return("wrong number of columns in SYS_INDEXES record");
2248 }
2249
2250 field = rec_get_nth_field_old(
2251 rec, DICT_FLD__SYS_INDEXES__TABLE_ID, &len);
2252 if (len != 8) {
2253 err_len:
2254 return("incorrect column length in SYS_INDEXES");
2255 }
2256
2257 if (!allocate) {
2258 /* We are reading a SYS_INDEXES record. Copy the table_id */
2259 memcpy(table_id, (const char*) field, 8);
2260 } else if (memcmp(field, table_id, 8)) {
2261 /* Caller supplied table_id, verify it is the same
2262 id as on the index record */
2263 return(dict_load_index_id_err);
2264 }
2265
2266 field = rec_get_nth_field_old(
2267 rec, DICT_FLD__SYS_INDEXES__ID, &len);
2268 if (len != 8) {
2269 goto err_len;
2270 }
2271
2272 id = mach_read_from_8(field);
2273
2274 rec_get_nth_field_offs_old(
2275 rec, DICT_FLD__SYS_INDEXES__DB_TRX_ID, &len);
2276 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2277 goto err_len;
2278 }
2279 rec_get_nth_field_offs_old(
2280 rec, DICT_FLD__SYS_INDEXES__DB_ROLL_PTR, &len);
2281 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2282 goto err_len;
2283 }
2284
2285 field = rec_get_nth_field_old(
2286 rec, DICT_FLD__SYS_INDEXES__NAME, &name_len);
2287 if (name_len == UNIV_SQL_NULL) {
2288 goto err_len;
2289 }
2290
2291 name_buf = mem_heap_strdupl(heap, (const char*) field,
2292 name_len);
2293
2294 field = rec_get_nth_field_old(
2295 rec, DICT_FLD__SYS_INDEXES__N_FIELDS, &len);
2296 if (len != 4) {
2297 goto err_len;
2298 }
2299 n_fields = mach_read_from_4(field);
2300
2301 field = rec_get_nth_field_old(
2302 rec, DICT_FLD__SYS_INDEXES__TYPE, &len);
2303 if (len != 4) {
2304 goto err_len;
2305 }
2306 type = mach_read_from_4(field);
2307 if (type & (~0U << DICT_IT_BITS)) {
2308 return("unknown SYS_INDEXES.TYPE bits");
2309 }
2310
2311 field = rec_get_nth_field_old(
2312 rec, DICT_FLD__SYS_INDEXES__PAGE_NO, &len);
2313 if (len != 4) {
2314 goto err_len;
2315 }
2316
2317 if (allocate) {
2318 *index = dict_mem_index_create(NULL, name_buf, type, n_fields);
2319 } else {
2320 ut_a(*index);
2321
2322 dict_mem_fill_index_struct(*index, NULL, name_buf,
2323 type, n_fields);
2324 }
2325
2326 (*index)->id = id;
2327 (*index)->page = mach_read_from_4(field);
2328 ut_ad((*index)->page);
2329 (*index)->merge_threshold = merge_threshold & ((1U << 6) - 1);
2330
2331 return(NULL);
2332 }
2333
2334 /********************************************************************//**
2335 Loads definitions for table indexes. Adds them to the data dictionary
2336 cache.
2337 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption of dictionary
2338 table or DB_UNSUPPORTED if table has unknown index type */
2339 static MY_ATTRIBUTE((nonnull))
2340 dberr_t
dict_load_indexes(dict_table_t * table,mem_heap_t * heap,dict_err_ignore_t ignore_err)2341 dict_load_indexes(
2342 /*==============*/
2343 dict_table_t* table, /*!< in/out: table */
2344 mem_heap_t* heap, /*!< in: memory heap for temporary storage */
2345 dict_err_ignore_t ignore_err)
2346 /*!< in: error to be ignored when
2347 loading the index definition */
2348 {
2349 dict_table_t* sys_indexes;
2350 dict_index_t* sys_index;
2351 btr_pcur_t pcur;
2352 dtuple_t* tuple;
2353 dfield_t* dfield;
2354 const rec_t* rec;
2355 byte* buf;
2356 mtr_t mtr;
2357 dberr_t error = DB_SUCCESS;
2358
2359 ut_ad(mutex_own(&dict_sys.mutex));
2360
2361 mtr_start(&mtr);
2362
2363 sys_indexes = dict_table_get_low("SYS_INDEXES");
2364 sys_index = UT_LIST_GET_FIRST(sys_indexes->indexes);
2365 ut_ad(!dict_table_is_comp(sys_indexes));
2366 ut_ad(name_of_col_is(sys_indexes, sys_index,
2367 DICT_FLD__SYS_INDEXES__NAME, "NAME"));
2368 ut_ad(name_of_col_is(sys_indexes, sys_index,
2369 DICT_FLD__SYS_INDEXES__PAGE_NO, "PAGE_NO"));
2370
2371 tuple = dtuple_create(heap, 1);
2372 dfield = dtuple_get_nth_field(tuple, 0);
2373
2374 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2375 mach_write_to_8(buf, table->id);
2376
2377 dfield_set_data(dfield, buf, 8);
2378 dict_index_copy_types(tuple, sys_index, 1);
2379
2380 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2381 BTR_SEARCH_LEAF, &pcur, &mtr);
2382 for (;;) {
2383 dict_index_t* index = NULL;
2384 const char* err_msg;
2385
2386 if (!btr_pcur_is_on_user_rec(&pcur)) {
2387
2388 /* We should allow the table to open even
2389 without index when DICT_ERR_IGNORE_CORRUPT is set.
2390 DICT_ERR_IGNORE_CORRUPT is currently only set
2391 for drop table */
2392 if (dict_table_get_first_index(table) == NULL
2393 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2394 ib::warn() << "Cannot load table "
2395 << table->name
2396 << " because it has no indexes in"
2397 " InnoDB internal data dictionary.";
2398 error = DB_CORRUPTION;
2399 goto func_exit;
2400 }
2401
2402 break;
2403 }
2404
2405 rec = btr_pcur_get_rec(&pcur);
2406
2407 if ((ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2408 && (rec_get_n_fields_old(rec)
2409 == DICT_NUM_FIELDS__SYS_INDEXES
2410 /* a record for older SYS_INDEXES table
2411 (missing merge_threshold column) is acceptable. */
2412 || rec_get_n_fields_old(rec)
2413 == DICT_NUM_FIELDS__SYS_INDEXES - 1)) {
2414 const byte* field;
2415 ulint len;
2416 field = rec_get_nth_field_old(
2417 rec, DICT_FLD__SYS_INDEXES__NAME, &len);
2418
2419 if (len != UNIV_SQL_NULL
2420 && static_cast<char>(*field)
2421 == static_cast<char>(*TEMP_INDEX_PREFIX_STR)) {
2422 /* Skip indexes whose name starts with
2423 TEMP_INDEX_PREFIX_STR, because they will
2424 be dropped by row_merge_drop_temp_indexes()
2425 during crash recovery. */
2426 goto next_rec;
2427 }
2428 }
2429
2430 err_msg = dict_load_index_low(buf, heap, rec, TRUE, &index);
2431 ut_ad((index == NULL && err_msg != NULL)
2432 || (index != NULL && err_msg == NULL));
2433
2434 if (err_msg == dict_load_index_id_err) {
2435 /* TABLE_ID mismatch means that we have
2436 run out of index definitions for the table. */
2437
2438 if (dict_table_get_first_index(table) == NULL
2439 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2440
2441 ib::warn() << "Failed to load the"
2442 " clustered index for table "
2443 << table->name
2444 << " because of the following error: "
2445 << err_msg << "."
2446 " Refusing to load the rest of the"
2447 " indexes (if any) and the whole table"
2448 " altogether.";
2449 error = DB_CORRUPTION;
2450 goto func_exit;
2451 }
2452
2453 break;
2454 } else if (err_msg == dict_load_index_del) {
2455 /* Skip delete-marked records. */
2456 goto next_rec;
2457 } else if (err_msg) {
2458 ib::error() << err_msg;
2459 if (ignore_err & DICT_ERR_IGNORE_CORRUPT) {
2460 goto next_rec;
2461 }
2462 error = DB_CORRUPTION;
2463 goto func_exit;
2464 }
2465
2466 ut_ad(index);
2467 ut_ad(!dict_index_is_online_ddl(index));
2468
2469 /* Check whether the index is corrupted */
2470 if (index->is_corrupted()) {
2471 ib::error() << "Index " << index->name
2472 << " of table " << table->name
2473 << " is corrupted";
2474
2475 if (!srv_load_corrupted
2476 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)
2477 && dict_index_is_clust(index)) {
2478 dict_mem_index_free(index);
2479
2480 error = DB_INDEX_CORRUPT;
2481 goto func_exit;
2482 } else {
2483 /* We will load the index if
2484 1) srv_load_corrupted is TRUE
2485 2) ignore_err is set with
2486 DICT_ERR_IGNORE_CORRUPT
2487 3) if the index corrupted is a secondary
2488 index */
2489 ib::info() << "Load corrupted index "
2490 << index->name
2491 << " of table " << table->name;
2492 }
2493 }
2494
2495 if (index->type & DICT_FTS
2496 && !dict_table_has_fts_index(table)) {
2497 /* This should have been created by now. */
2498 ut_a(table->fts != NULL);
2499 DICT_TF2_FLAG_SET(table, DICT_TF2_FTS);
2500 }
2501
2502 /* We check for unsupported types first, so that the
2503 subsequent checks are relevant for the supported types. */
2504 if (index->type & ~(DICT_CLUSTERED | DICT_UNIQUE
2505 | DICT_CORRUPT | DICT_FTS
2506 | DICT_SPATIAL | DICT_VIRTUAL)) {
2507
2508 ib::error() << "Unknown type " << index->type
2509 << " of index " << index->name
2510 << " of table " << table->name;
2511
2512 error = DB_UNSUPPORTED;
2513 dict_mem_index_free(index);
2514 goto func_exit;
2515 } else if (index->page == FIL_NULL
2516 && table->is_readable()
2517 && (!(index->type & DICT_FTS))) {
2518
2519 ib::error() << "Trying to load index " << index->name
2520 << " for table " << table->name
2521 << ", but the index tree has been freed!";
2522
2523 if (ignore_err & DICT_ERR_IGNORE_INDEX_ROOT) {
2524 /* If caller can tolerate this error,
2525 we will continue to load the index and
2526 let caller deal with this error. However
2527 mark the index and table corrupted. We
2528 only need to mark such in the index
2529 dictionary cache for such metadata corruption,
2530 since we would always be able to set it
2531 when loading the dictionary cache */
2532 index->table = table;
2533 dict_set_corrupted_index_cache_only(index);
2534
2535 ib::info() << "Index is corrupt but forcing"
2536 " load into data dictionary";
2537 } else {
2538 corrupted:
2539 dict_mem_index_free(index);
2540 error = DB_CORRUPTION;
2541 goto func_exit;
2542 }
2543 } else if (!dict_index_is_clust(index)
2544 && NULL == dict_table_get_first_index(table)) {
2545
2546 ib::error() << "Trying to load index " << index->name
2547 << " for table " << table->name
2548 << ", but the first index is not clustered!";
2549
2550 goto corrupted;
2551 } else if (dict_is_sys_table(table->id)
2552 && (dict_index_is_clust(index)
2553 || ((table == dict_sys.sys_tables)
2554 && !strcmp("ID_IND", index->name)))) {
2555
2556 /* The index was created in memory already at booting
2557 of the database server */
2558 dict_mem_index_free(index);
2559 } else {
2560 dict_load_fields(index, heap);
2561 index->table = table;
2562
2563 /* The data dictionary tables should never contain
2564 invalid index definitions. If we ignored this error
2565 and simply did not load this index definition, the
2566 .frm file would disagree with the index definitions
2567 inside InnoDB. */
2568 if ((error = dict_index_add_to_cache(index,
2569 index->page))
2570 != DB_SUCCESS) {
2571 goto func_exit;
2572 }
2573
2574 #ifdef UNIV_DEBUG
2575 // The following assertion doesn't hold for FTS indexes
2576 // as it may have prefix_len=1 with any charset
2577 if (index->type != DICT_FTS) {
2578 for (uint i = 0; i < index->n_fields; i++) {
2579 dict_field_t &f = index->fields[i];
2580 ut_ad(f.col->mbmaxlen == 0
2581 || f.prefix_len
2582 % f.col->mbmaxlen == 0);
2583 }
2584 }
2585 #endif /* UNIV_DEBUG */
2586 }
2587 next_rec:
2588 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2589 }
2590
2591 ut_ad(table->fts_doc_id_index == NULL);
2592
2593 if (table->fts != NULL) {
2594 table->fts_doc_id_index = dict_table_get_index_on_name(
2595 table, FTS_DOC_ID_INDEX_NAME);
2596 }
2597
2598 /* If the table contains FTS indexes, populate table->fts->indexes */
2599 if (dict_table_has_fts_index(table)) {
2600 ut_ad(table->fts_doc_id_index != NULL);
2601 /* table->fts->indexes should have been created. */
2602 ut_a(table->fts->indexes != NULL);
2603 dict_table_get_all_fts_indexes(table, table->fts->indexes);
2604 }
2605
2606 func_exit:
2607 btr_pcur_close(&pcur);
2608 mtr_commit(&mtr);
2609
2610 return(error);
2611 }
2612
2613 /** Load a table definition from a SYS_TABLES record to dict_table_t.
2614 Do not load any columns or indexes.
2615 @param[in] name Table name
2616 @param[in] rec SYS_TABLES record
2617 @param[out,own] table table, or NULL
2618 @return error message
2619 @retval NULL on success */
dict_load_table_low(const table_name_t & name,const rec_t * rec,dict_table_t ** table)2620 static const char* dict_load_table_low(const table_name_t& name,
2621 const rec_t* rec, dict_table_t** table)
2622 {
2623 table_id_t table_id;
2624 ulint space_id;
2625 ulint n_cols;
2626 ulint t_num;
2627 ulint flags;
2628 ulint flags2;
2629 ulint n_v_col;
2630
2631 if (const char* error_text = dict_sys_tables_rec_check(rec)) {
2632 *table = NULL;
2633 return(error_text);
2634 }
2635
2636 if (!dict_sys_tables_rec_read(rec, name, &table_id, &space_id,
2637 &t_num, &flags, &flags2)) {
2638 *table = NULL;
2639 return(dict_load_table_flags);
2640 }
2641
2642 dict_table_decode_n_col(t_num, &n_cols, &n_v_col);
2643
2644 *table = dict_mem_table_create(
2645 name.m_name, NULL, n_cols + n_v_col, n_v_col, flags, flags2);
2646 (*table)->space_id = space_id;
2647 (*table)->id = table_id;
2648 (*table)->file_unreadable = !!(flags2 & DICT_TF2_DISCARDED);
2649
2650 return(NULL);
2651 }
2652
2653 /********************************************************************//**
2654 Using the table->heap, copy the null-terminated filepath into
2655 table->data_dir_path and replace the 'databasename/tablename.ibd'
2656 portion with 'tablename'.
2657 This allows SHOW CREATE TABLE to return the correct DATA DIRECTORY path.
2658 Make this data directory path only if it has not yet been saved. */
2659 static
2660 void
dict_save_data_dir_path(dict_table_t * table,const char * filepath)2661 dict_save_data_dir_path(
2662 /*====================*/
2663 dict_table_t* table, /*!< in/out: table */
2664 const char* filepath) /*!< in: filepath of tablespace */
2665 {
2666 ut_ad(mutex_own(&dict_sys.mutex));
2667 ut_a(DICT_TF_HAS_DATA_DIR(table->flags));
2668
2669 ut_a(!table->data_dir_path);
2670 ut_a(filepath);
2671
2672 /* Be sure this filepath is not the default filepath. */
2673 char* default_filepath = fil_make_filepath(
2674 NULL, table->name.m_name, IBD, false);
2675 if (default_filepath) {
2676 if (0 != strcmp(filepath, default_filepath)) {
2677 ulint pathlen = strlen(filepath);
2678 ut_a(pathlen < OS_FILE_MAX_PATH);
2679 ut_a(0 == strcmp(filepath + pathlen - 4, DOT_IBD));
2680
2681 table->data_dir_path = mem_heap_strdup(
2682 table->heap, filepath);
2683 os_file_make_data_dir_path(table->data_dir_path);
2684 }
2685
2686 ut_free(default_filepath);
2687 }
2688 }
2689
2690 /** Make sure the data_dir_path is saved in dict_table_t if DATA DIRECTORY
2691 was used. Try to read it from the fil_system first, then from SYS_DATAFILES.
2692 @param[in] table Table object
2693 @param[in] dict_mutex_own true if dict_sys.mutex is owned already */
2694 void
dict_get_and_save_data_dir_path(dict_table_t * table,bool dict_mutex_own)2695 dict_get_and_save_data_dir_path(
2696 dict_table_t* table,
2697 bool dict_mutex_own)
2698 {
2699 ut_ad(!table->is_temporary());
2700 ut_ad(!table->space || table->space->id == table->space_id);
2701
2702 if (!table->data_dir_path && table->space_id && table->space) {
2703 if (!dict_mutex_own) {
2704 dict_mutex_enter_for_mysql();
2705 }
2706
2707 table->flags |= 1 << DICT_TF_POS_DATA_DIR
2708 & ((1U << DICT_TF_BITS) - 1);
2709 dict_save_data_dir_path(table,
2710 table->space->chain.start->name);
2711
2712 if (table->data_dir_path == NULL) {
2713 /* Since we did not set the table data_dir_path,
2714 unset the flag. This does not change SYS_DATAFILES
2715 or SYS_TABLES or FSP_SPACE_FLAGS on the header page
2716 of the tablespace, but it makes dict_table_t
2717 consistent. */
2718 table->flags &= ~DICT_TF_MASK_DATA_DIR
2719 & ((1U << DICT_TF_BITS) - 1);
2720 }
2721
2722 if (!dict_mutex_own) {
2723 dict_mutex_exit_for_mysql();
2724 }
2725 }
2726 }
2727
2728 /** Loads a table definition and also all its index definitions, and also
2729 the cluster definition if the table is a member in a cluster. Also loads
2730 all foreign key constraints where the foreign key is in the table or where
2731 a foreign key references columns in this table.
2732 @param[in] name Table name in the dbname/tablename format
2733 @param[in] ignore_err Error to be ignored when loading
2734 table and its index definition
2735 @return table, NULL if does not exist; if the table is stored in an
2736 .ibd file, but the file does not exist, then we set the file_unreadable
2737 flag in the table object we return. */
dict_load_table(const char * name,dict_err_ignore_t ignore_err)2738 dict_table_t* dict_load_table(const char* name, dict_err_ignore_t ignore_err)
2739 {
2740 dict_names_t fk_list;
2741 dict_table_t* result;
2742 dict_names_t::iterator i;
2743
2744 DBUG_ENTER("dict_load_table");
2745 DBUG_PRINT("dict_load_table", ("loading table: '%s'", name));
2746
2747 ut_ad(mutex_own(&dict_sys.mutex));
2748
2749 result = dict_table_check_if_in_cache_low(name);
2750
2751 if (!result) {
2752 result = dict_load_table_one(const_cast<char*>(name),
2753 ignore_err, fk_list);
2754 while (!fk_list.empty()) {
2755 if (!dict_table_check_if_in_cache_low(fk_list.front()))
2756 dict_load_table_one(
2757 const_cast<char*>(fk_list.front()),
2758 ignore_err, fk_list);
2759 fk_list.pop_front();
2760 }
2761 }
2762
2763 DBUG_RETURN(result);
2764 }
2765
2766 /** Opens a tablespace for dict_load_table_one()
2767 @param[in,out] table A table that refers to the tablespace to open
2768 @param[in] ignore_err Whether to ignore an error. */
2769 UNIV_INLINE
2770 void
dict_load_tablespace(dict_table_t * table,dict_err_ignore_t ignore_err)2771 dict_load_tablespace(
2772 dict_table_t* table,
2773 dict_err_ignore_t ignore_err)
2774 {
2775 ut_ad(!table->is_temporary());
2776 ut_ad(!table->space);
2777 ut_ad(table->space_id < SRV_SPACE_ID_UPPER_BOUND);
2778 ut_ad(fil_system.sys_space);
2779
2780 if (table->space_id == TRX_SYS_SPACE) {
2781 table->space = fil_system.sys_space;
2782 return;
2783 }
2784
2785 if (table->flags2 & DICT_TF2_DISCARDED) {
2786 ib::warn() << "Tablespace for table " << table->name
2787 << " is set as discarded.";
2788 table->file_unreadable = true;
2789 return;
2790 }
2791
2792 /* The tablespace may already be open. */
2793 table->space = fil_space_for_table_exists_in_mem(
2794 table->space_id, table->name.m_name, table->flags);
2795 if (table->space) {
2796 return;
2797 }
2798
2799 if (ignore_err == DICT_ERR_IGNORE_DROP) {
2800 table->file_unreadable = true;
2801 return;
2802 }
2803
2804 if (!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)) {
2805 ib::error() << "Failed to find tablespace for table "
2806 << table->name << " in the cache. Attempting"
2807 " to load the tablespace with space id "
2808 << table->space_id;
2809 }
2810
2811 /* Use the remote filepath if needed. This parameter is optional
2812 in the call to fil_ibd_open(). If not supplied, it will be built
2813 from the table->name. */
2814 char* filepath = NULL;
2815 if (DICT_TF_HAS_DATA_DIR(table->flags)) {
2816 /* This will set table->data_dir_path from either
2817 fil_system or SYS_DATAFILES */
2818 dict_get_and_save_data_dir_path(table, true);
2819
2820 if (table->data_dir_path) {
2821 filepath = fil_make_filepath(
2822 table->data_dir_path,
2823 table->name.m_name, IBD, true);
2824 }
2825 }
2826
2827 /* Try to open the tablespace. We set the 2nd param (fix_dict) to
2828 false because we do not have an x-lock on dict_sys.latch */
2829 table->space = fil_ibd_open(
2830 true, false, FIL_TYPE_TABLESPACE, table->space_id,
2831 dict_tf_to_fsp_flags(table->flags),
2832 table->name, filepath);
2833
2834 if (!table->space) {
2835 /* We failed to find a sensible tablespace file */
2836 table->file_unreadable = true;
2837 }
2838
2839 ut_free(filepath);
2840 }
2841
2842 /** Loads a table definition and also all its index definitions.
2843
2844 Loads those foreign key constraints whose referenced table is already in
2845 dictionary cache. If a foreign key constraint is not loaded, then the
2846 referenced table is pushed into the output stack (fk_tables), if it is not
2847 NULL. These tables must be subsequently loaded so that all the foreign
2848 key constraints are loaded into memory.
2849
2850 @param[in] name Table name in the db/tablename format
2851 @param[in] ignore_err Error to be ignored when loading table
2852 and its index definition
2853 @param[out] fk_tables Related table names that must also be
2854 loaded to ensure that all foreign key
2855 constraints are loaded.
2856 @return table, NULL if does not exist; if the table is stored in an
2857 .ibd file, but the file does not exist, then we set the
2858 file_unreadable flag in the table object we return */
2859 static
2860 dict_table_t*
dict_load_table_one(const table_name_t & name,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)2861 dict_load_table_one(
2862 const table_name_t& name,
2863 dict_err_ignore_t ignore_err,
2864 dict_names_t& fk_tables)
2865 {
2866 dberr_t err;
2867 dict_table_t* sys_tables;
2868 btr_pcur_t pcur;
2869 dict_index_t* sys_index;
2870 dtuple_t* tuple;
2871 mem_heap_t* heap;
2872 dfield_t* dfield;
2873 const rec_t* rec;
2874 const byte* field;
2875 ulint len;
2876 mtr_t mtr;
2877
2878 DBUG_ENTER("dict_load_table_one");
2879 DBUG_PRINT("dict_load_table_one", ("table: %s", name.m_name));
2880
2881 ut_ad(mutex_own(&dict_sys.mutex));
2882
2883 heap = mem_heap_create(32000);
2884
2885 mtr_start(&mtr);
2886
2887 sys_tables = dict_table_get_low("SYS_TABLES");
2888 sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
2889 ut_ad(!dict_table_is_comp(sys_tables));
2890 ut_ad(name_of_col_is(sys_tables, sys_index,
2891 DICT_FLD__SYS_TABLES__ID, "ID"));
2892 ut_ad(name_of_col_is(sys_tables, sys_index,
2893 DICT_FLD__SYS_TABLES__N_COLS, "N_COLS"));
2894 ut_ad(name_of_col_is(sys_tables, sys_index,
2895 DICT_FLD__SYS_TABLES__TYPE, "TYPE"));
2896 ut_ad(name_of_col_is(sys_tables, sys_index,
2897 DICT_FLD__SYS_TABLES__MIX_LEN, "MIX_LEN"));
2898 ut_ad(name_of_col_is(sys_tables, sys_index,
2899 DICT_FLD__SYS_TABLES__SPACE, "SPACE"));
2900
2901 tuple = dtuple_create(heap, 1);
2902 dfield = dtuple_get_nth_field(tuple, 0);
2903
2904 dfield_set_data(dfield, name.m_name, strlen(name.m_name));
2905 dict_index_copy_types(tuple, sys_index, 1);
2906
2907 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2908 BTR_SEARCH_LEAF, &pcur, &mtr);
2909 rec = btr_pcur_get_rec(&pcur);
2910
2911 if (!btr_pcur_is_on_user_rec(&pcur)
2912 || rec_get_deleted_flag(rec, 0)) {
2913 /* Not found */
2914 err_exit:
2915 btr_pcur_close(&pcur);
2916 mtr_commit(&mtr);
2917 mem_heap_free(heap);
2918
2919 DBUG_RETURN(NULL);
2920 }
2921
2922 field = rec_get_nth_field_old(
2923 rec, DICT_FLD__SYS_TABLES__NAME, &len);
2924
2925 /* Check if the table name in record is the searched one */
2926 if (len != strlen(name.m_name)
2927 || memcmp(name.m_name, field, len)) {
2928
2929 goto err_exit;
2930 }
2931
2932 dict_table_t* table;
2933 if (const char* err_msg = dict_load_table_low(name, rec, &table)) {
2934 if (err_msg != dict_load_table_flags) {
2935 ib::error() << err_msg;
2936 }
2937 goto err_exit;
2938 }
2939
2940 btr_pcur_close(&pcur);
2941 mtr_commit(&mtr);
2942
2943 dict_load_tablespace(table, ignore_err);
2944
2945 dict_load_columns(table, heap);
2946
2947 dict_load_virtual(table, heap);
2948
2949 dict_table_add_system_columns(table, heap);
2950
2951 table->can_be_evicted = true;
2952 table->add_to_cache();
2953
2954 mem_heap_empty(heap);
2955
2956 ut_ad(dict_tf2_is_valid(table->flags, table->flags2));
2957
2958 /* If there is no tablespace for the table then we only need to
2959 load the index definitions. So that we can IMPORT the tablespace
2960 later. When recovering table locks for resurrected incomplete
2961 transactions, the tablespace should exist, because DDL operations
2962 were not allowed while the table is being locked by a transaction. */
2963 dict_err_ignore_t index_load_err =
2964 !(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2965 && !table->is_readable()
2966 ? DICT_ERR_IGNORE_ALL
2967 : ignore_err;
2968
2969 err = dict_load_indexes(table, heap, index_load_err);
2970
2971 if (err == DB_INDEX_CORRUPT) {
2972 /* Refuse to load the table if the table has a corrupted
2973 cluster index */
2974 if (!srv_load_corrupted) {
2975
2976 ib::error() << "Load table " << table->name
2977 << " failed, the table has"
2978 " corrupted clustered indexes. Turn on"
2979 " 'innodb_force_load_corrupted' to drop it";
2980 dict_sys.remove(table);
2981 table = NULL;
2982 goto func_exit;
2983 } else {
2984 if (table->indexes.start->is_corrupted()) {
2985 table->corrupted = true;
2986 }
2987 }
2988 }
2989
2990 if (err == DB_SUCCESS && table->is_readable()) {
2991 const auto root = dict_table_get_first_index(table)->page;
2992
2993 if (root >= table->space->get_size()) {
2994 corrupted:
2995 table->corrupted = true;
2996 table->file_unreadable = true;
2997 err = DB_CORRUPTION;
2998 } else {
2999 const page_id_t page_id(table->space->id, root);
3000 mtr.start();
3001 buf_block_t* block = buf_page_get(
3002 page_id, table->space->zip_size(),
3003 RW_S_LATCH, &mtr);
3004 const bool corrupted = !block
3005 || page_get_space_id(block->frame)
3006 != page_id.space()
3007 || page_get_page_no(block->frame)
3008 != page_id.page_no()
3009 || (mach_read_from_2(FIL_PAGE_TYPE
3010 + block->frame)
3011 != FIL_PAGE_INDEX
3012 && mach_read_from_2(FIL_PAGE_TYPE
3013 + block->frame)
3014 != FIL_PAGE_TYPE_INSTANT);
3015 mtr.commit();
3016 if (corrupted) {
3017 goto corrupted;
3018 }
3019
3020 if (table->supports_instant()) {
3021 err = btr_cur_instant_init(table);
3022 }
3023 }
3024 }
3025
3026 /* Initialize table foreign_child value. Its value could be
3027 changed when dict_load_foreigns() is called below */
3028 table->fk_max_recusive_level = 0;
3029
3030 /* If the force recovery flag is set, we open the table irrespective
3031 of the error condition, since the user may want to dump data from the
3032 clustered index. However we load the foreign key information only if
3033 all indexes were loaded. */
3034 if (!table->is_readable()) {
3035 /* Don't attempt to load the indexes from disk. */
3036 } else if (err == DB_SUCCESS) {
3037 err = dict_load_foreigns(table->name.m_name, NULL,
3038 true, true,
3039 ignore_err, fk_tables);
3040
3041 if (err != DB_SUCCESS) {
3042 ib::warn() << "Load table " << table->name
3043 << " failed, the table has missing"
3044 " foreign key indexes. Turn off"
3045 " 'foreign_key_checks' and try again.";
3046
3047 dict_sys.remove(table);
3048 table = NULL;
3049 } else {
3050 dict_mem_table_fill_foreign_vcol_set(table);
3051 table->fk_max_recusive_level = 0;
3052 }
3053 } else {
3054 dict_index_t* index;
3055
3056 /* Make sure that at least the clustered index was loaded.
3057 Otherwise refuse to load the table */
3058 index = dict_table_get_first_index(table);
3059
3060 if (!srv_force_recovery
3061 || !index
3062 || !index->is_primary()) {
3063 dict_sys.remove(table);
3064 table = NULL;
3065 } else if (index->is_corrupted()
3066 && table->is_readable()) {
3067 /* It is possible we force to load a corrupted
3068 clustered index if srv_load_corrupted is set.
3069 Mark the table as corrupted in this case */
3070 table->corrupted = true;
3071 }
3072 }
3073
3074 func_exit:
3075 mem_heap_free(heap);
3076
3077 ut_ad(!table
3078 || (ignore_err & ~DICT_ERR_IGNORE_FK_NOKEY)
3079 || !table->is_readable()
3080 || !table->corrupted);
3081
3082 if (table && table->fts) {
3083 if (!(dict_table_has_fts_index(table)
3084 || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)
3085 || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID))) {
3086 /* the table->fts could be created in dict_load_column
3087 when a user defined FTS_DOC_ID is present, but no
3088 FTS */
3089 fts_free(table);
3090 } else if (fts_optimize_wq) {
3091 fts_optimize_add_table(table);
3092 } else if (table->can_be_evicted) {
3093 /* fts_optimize_thread is not started yet.
3094 So make the table as non-evictable from cache. */
3095 dict_sys.prevent_eviction(table);
3096 }
3097 }
3098
3099 ut_ad(err != DB_SUCCESS || dict_foreign_set_validate(*table));
3100
3101 DBUG_RETURN(table);
3102 }
3103
3104 /***********************************************************************//**
3105 Loads a table object based on the table id.
3106 @return table; NULL if table does not exist */
3107 dict_table_t*
dict_load_table_on_id(table_id_t table_id,dict_err_ignore_t ignore_err)3108 dict_load_table_on_id(
3109 /*==================*/
3110 table_id_t table_id, /*!< in: table id */
3111 dict_err_ignore_t ignore_err) /*!< in: errors to ignore
3112 when loading the table */
3113 {
3114 byte id_buf[8];
3115 btr_pcur_t pcur;
3116 mem_heap_t* heap;
3117 dtuple_t* tuple;
3118 dfield_t* dfield;
3119 dict_index_t* sys_table_ids;
3120 dict_table_t* sys_tables;
3121 const rec_t* rec;
3122 const byte* field;
3123 ulint len;
3124 dict_table_t* table;
3125 mtr_t mtr;
3126
3127 ut_ad(mutex_own(&dict_sys.mutex));
3128
3129 table = NULL;
3130
3131 /* NOTE that the operation of this function is protected by
3132 the dictionary mutex, and therefore no deadlocks can occur
3133 with other dictionary operations. */
3134
3135 mtr_start(&mtr);
3136 /*---------------------------------------------------*/
3137 /* Get the secondary index based on ID for table SYS_TABLES */
3138 sys_tables = dict_sys.sys_tables;
3139 sys_table_ids = dict_table_get_next_index(
3140 dict_table_get_first_index(sys_tables));
3141 ut_ad(!dict_table_is_comp(sys_tables));
3142 ut_ad(!dict_index_is_clust(sys_table_ids));
3143 heap = mem_heap_create(256);
3144
3145 tuple = dtuple_create(heap, 1);
3146 dfield = dtuple_get_nth_field(tuple, 0);
3147
3148 /* Write the table id in byte format to id_buf */
3149 mach_write_to_8(id_buf, table_id);
3150
3151 dfield_set_data(dfield, id_buf, 8);
3152 dict_index_copy_types(tuple, sys_table_ids, 1);
3153
3154 btr_pcur_open_on_user_rec(sys_table_ids, tuple, PAGE_CUR_GE,
3155 BTR_SEARCH_LEAF, &pcur, &mtr);
3156
3157 rec = btr_pcur_get_rec(&pcur);
3158
3159 if (page_rec_is_user_rec(rec)) {
3160 /*---------------------------------------------------*/
3161 /* Now we have the record in the secondary index
3162 containing the table ID and NAME */
3163 check_rec:
3164 field = rec_get_nth_field_old(
3165 rec, DICT_FLD__SYS_TABLE_IDS__ID, &len);
3166 ut_ad(len == 8);
3167
3168 /* Check if the table id in record is the one searched for */
3169 if (table_id == mach_read_from_8(field)) {
3170 if (rec_get_deleted_flag(rec, 0)) {
3171 /* Until purge has completed, there
3172 may be delete-marked duplicate records
3173 for the same SYS_TABLES.ID, but different
3174 SYS_TABLES.NAME. */
3175 while (btr_pcur_move_to_next(&pcur, &mtr)) {
3176 rec = btr_pcur_get_rec(&pcur);
3177
3178 if (page_rec_is_user_rec(rec)) {
3179 goto check_rec;
3180 }
3181 }
3182 } else {
3183 /* Now we get the table name from the record */
3184 field = rec_get_nth_field_old(rec,
3185 DICT_FLD__SYS_TABLE_IDS__NAME, &len);
3186 /* Load the table definition to memory */
3187 char* table_name = mem_heap_strdupl(
3188 heap, (char*) field, len);
3189 table = dict_load_table(table_name, ignore_err);
3190 }
3191 }
3192 }
3193
3194 btr_pcur_close(&pcur);
3195 mtr_commit(&mtr);
3196 mem_heap_free(heap);
3197
3198 return(table);
3199 }
3200
3201 /********************************************************************//**
3202 This function is called when the database is booted. Loads system table
3203 index definitions except for the clustered index which is added to the
3204 dictionary cache at booting before calling this function. */
3205 void
dict_load_sys_table(dict_table_t * table)3206 dict_load_sys_table(
3207 /*================*/
3208 dict_table_t* table) /*!< in: system table */
3209 {
3210 mem_heap_t* heap;
3211
3212 ut_ad(mutex_own(&dict_sys.mutex));
3213
3214 heap = mem_heap_create(1000);
3215
3216 dict_load_indexes(table, heap, DICT_ERR_IGNORE_NONE);
3217
3218 mem_heap_free(heap);
3219 }
3220
3221 /********************************************************************//**
3222 Loads foreign key constraint col names (also for the referenced table).
3223 Members that must be set (and valid) in foreign:
3224 foreign->heap
3225 foreign->n_fields
3226 foreign->id ('\0'-terminated)
3227 Members that will be created and set by this function:
3228 foreign->foreign_col_names[i]
3229 foreign->referenced_col_names[i]
3230 (for i=0..foreign->n_fields-1) */
3231 static
3232 void
dict_load_foreign_cols(dict_foreign_t * foreign)3233 dict_load_foreign_cols(
3234 /*===================*/
3235 dict_foreign_t* foreign)/*!< in/out: foreign constraint object */
3236 {
3237 dict_table_t* sys_foreign_cols;
3238 dict_index_t* sys_index;
3239 btr_pcur_t pcur;
3240 dtuple_t* tuple;
3241 dfield_t* dfield;
3242 const rec_t* rec;
3243 const byte* field;
3244 ulint len;
3245 ulint i;
3246 mtr_t mtr;
3247 size_t id_len;
3248
3249 ut_ad(mutex_own(&dict_sys.mutex));
3250
3251 id_len = strlen(foreign->id);
3252
3253 foreign->foreign_col_names = static_cast<const char**>(
3254 mem_heap_alloc(foreign->heap,
3255 foreign->n_fields * sizeof(void*)));
3256
3257 foreign->referenced_col_names = static_cast<const char**>(
3258 mem_heap_alloc(foreign->heap,
3259 foreign->n_fields * sizeof(void*)));
3260
3261 mtr_start(&mtr);
3262
3263 sys_foreign_cols = dict_table_get_low("SYS_FOREIGN_COLS");
3264
3265 sys_index = UT_LIST_GET_FIRST(sys_foreign_cols->indexes);
3266 ut_ad(!dict_table_is_comp(sys_foreign_cols));
3267
3268 tuple = dtuple_create(foreign->heap, 1);
3269 dfield = dtuple_get_nth_field(tuple, 0);
3270
3271 dfield_set_data(dfield, foreign->id, id_len);
3272 dict_index_copy_types(tuple, sys_index, 1);
3273
3274 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3275 BTR_SEARCH_LEAF, &pcur, &mtr);
3276 for (i = 0; i < foreign->n_fields; i++) {
3277
3278 rec = btr_pcur_get_rec(&pcur);
3279
3280 ut_a(btr_pcur_is_on_user_rec(&pcur));
3281 ut_a(!rec_get_deleted_flag(rec, 0));
3282
3283 field = rec_get_nth_field_old(
3284 rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
3285
3286 if (len != id_len || memcmp(foreign->id, field, len)) {
3287 const rec_t* pos;
3288 ulint pos_len;
3289 const rec_t* for_col_name;
3290 ulint for_col_name_len;
3291 const rec_t* ref_col_name;
3292 ulint ref_col_name_len;
3293
3294 pos = rec_get_nth_field_old(
3295 rec, DICT_FLD__SYS_FOREIGN_COLS__POS,
3296 &pos_len);
3297
3298 for_col_name = rec_get_nth_field_old(
3299 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME,
3300 &for_col_name_len);
3301
3302 ref_col_name = rec_get_nth_field_old(
3303 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME,
3304 &ref_col_name_len);
3305
3306 ib::fatal sout;
3307
3308 sout << "Unable to load column names for foreign"
3309 " key '" << foreign->id
3310 << "' because it was not found in"
3311 " InnoDB internal table SYS_FOREIGN_COLS. The"
3312 " closest entry we found is:"
3313 " (ID='";
3314 sout.write(field, len);
3315 sout << "', POS=" << mach_read_from_4(pos)
3316 << ", FOR_COL_NAME='";
3317 sout.write(for_col_name, for_col_name_len);
3318 sout << "', REF_COL_NAME='";
3319 sout.write(ref_col_name, ref_col_name_len);
3320 sout << "')";
3321 }
3322
3323 field = rec_get_nth_field_old(
3324 rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
3325 ut_a(len == 4);
3326 ut_a(i == mach_read_from_4(field));
3327
3328 field = rec_get_nth_field_old(
3329 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
3330 foreign->foreign_col_names[i] = mem_heap_strdupl(
3331 foreign->heap, (char*) field, len);
3332
3333 field = rec_get_nth_field_old(
3334 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
3335 foreign->referenced_col_names[i] = mem_heap_strdupl(
3336 foreign->heap, (char*) field, len);
3337
3338 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
3339 }
3340
3341 btr_pcur_close(&pcur);
3342 mtr_commit(&mtr);
3343 }
3344
3345 /***********************************************************************//**
3346 Loads a foreign key constraint to the dictionary cache. If the referenced
3347 table is not yet loaded, it is added in the output parameter (fk_tables).
3348 @return DB_SUCCESS or error code */
3349 static MY_ATTRIBUTE((nonnull(1), warn_unused_result))
3350 dberr_t
dict_load_foreign(const char * id,const char ** col_names,bool check_recursive,bool check_charsets,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)3351 dict_load_foreign(
3352 /*==============*/
3353 const char* id,
3354 /*!< in: foreign constraint id, must be
3355 '\0'-terminated */
3356 const char** col_names,
3357 /*!< in: column names, or NULL
3358 to use foreign->foreign_table->col_names */
3359 bool check_recursive,
3360 /*!< in: whether to record the foreign table
3361 parent count to avoid unlimited recursive
3362 load of chained foreign tables */
3363 bool check_charsets,
3364 /*!< in: whether to check charset
3365 compatibility */
3366 dict_err_ignore_t ignore_err,
3367 /*!< in: error to be ignored */
3368 dict_names_t& fk_tables)
3369 /*!< out: the foreign key constraint is added
3370 to the dictionary cache only if the referenced
3371 table is already in cache. Otherwise, the
3372 foreign key constraint is not added to cache,
3373 and the referenced table is added to this
3374 stack. */
3375 {
3376 dict_foreign_t* foreign;
3377 dict_table_t* sys_foreign;
3378 btr_pcur_t pcur;
3379 dict_index_t* sys_index;
3380 dtuple_t* tuple;
3381 mem_heap_t* heap2;
3382 dfield_t* dfield;
3383 const rec_t* rec;
3384 const byte* field;
3385 ulint len;
3386 mtr_t mtr;
3387 dict_table_t* for_table;
3388 dict_table_t* ref_table;
3389 size_t id_len;
3390
3391 DBUG_ENTER("dict_load_foreign");
3392 DBUG_PRINT("dict_load_foreign",
3393 ("id: '%s', check_recursive: %d", id, check_recursive));
3394
3395 ut_ad(mutex_own(&dict_sys.mutex));
3396
3397 id_len = strlen(id);
3398
3399 heap2 = mem_heap_create(1000);
3400
3401 mtr_start(&mtr);
3402
3403 sys_foreign = dict_table_get_low("SYS_FOREIGN");
3404
3405 sys_index = UT_LIST_GET_FIRST(sys_foreign->indexes);
3406 ut_ad(!dict_table_is_comp(sys_foreign));
3407
3408 tuple = dtuple_create(heap2, 1);
3409 dfield = dtuple_get_nth_field(tuple, 0);
3410
3411 dfield_set_data(dfield, id, id_len);
3412 dict_index_copy_types(tuple, sys_index, 1);
3413
3414 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3415 BTR_SEARCH_LEAF, &pcur, &mtr);
3416 rec = btr_pcur_get_rec(&pcur);
3417
3418 if (!btr_pcur_is_on_user_rec(&pcur)
3419 || rec_get_deleted_flag(rec, 0)) {
3420 /* Not found */
3421
3422 ib::error() << "Cannot load foreign constraint " << id
3423 << ": could not find the relevant record in "
3424 << "SYS_FOREIGN";
3425
3426 btr_pcur_close(&pcur);
3427 mtr_commit(&mtr);
3428 mem_heap_free(heap2);
3429
3430 DBUG_RETURN(DB_ERROR);
3431 }
3432
3433 field = rec_get_nth_field_old(rec, DICT_FLD__SYS_FOREIGN__ID, &len);
3434
3435 /* Check if the id in record is the searched one */
3436 if (len != id_len || memcmp(id, field, len)) {
3437 {
3438 ib::error err;
3439 err << "Cannot load foreign constraint " << id
3440 << ": found ";
3441 err.write(field, len);
3442 err << " instead in SYS_FOREIGN";
3443 }
3444
3445 btr_pcur_close(&pcur);
3446 mtr_commit(&mtr);
3447 mem_heap_free(heap2);
3448
3449 DBUG_RETURN(DB_ERROR);
3450 }
3451
3452 /* Read the table names and the number of columns associated
3453 with the constraint */
3454
3455 mem_heap_free(heap2);
3456
3457 foreign = dict_mem_foreign_create();
3458
3459 uint32_t n_fields_and_type = mach_read_from_4(
3460 rec_get_nth_field_old(
3461 rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len));
3462
3463 ut_a(len == 4);
3464
3465 /* We store the type in the bits 24..29 of n_fields_and_type. */
3466
3467 foreign->type = (n_fields_and_type >> 24) & ((1U << 6) - 1);
3468 foreign->n_fields = n_fields_and_type & dict_index_t::MAX_N_FIELDS;
3469
3470 foreign->id = mem_heap_strdupl(foreign->heap, id, id_len);
3471
3472 field = rec_get_nth_field_old(
3473 rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
3474
3475 foreign->foreign_table_name = mem_heap_strdupl(
3476 foreign->heap, (char*) field, len);
3477 dict_mem_foreign_table_name_lookup_set(foreign, TRUE);
3478
3479 const ulint foreign_table_name_len = len;
3480
3481 field = rec_get_nth_field_old(
3482 rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
3483 foreign->referenced_table_name = mem_heap_strdupl(
3484 foreign->heap, (char*) field, len);
3485 dict_mem_referenced_table_name_lookup_set(foreign, TRUE);
3486
3487 btr_pcur_close(&pcur);
3488 mtr_commit(&mtr);
3489
3490 dict_load_foreign_cols(foreign);
3491
3492 ref_table = dict_table_check_if_in_cache_low(
3493 foreign->referenced_table_name_lookup);
3494 for_table = dict_table_check_if_in_cache_low(
3495 foreign->foreign_table_name_lookup);
3496
3497 if (!for_table) {
3498 /* To avoid recursively loading the tables related through
3499 the foreign key constraints, the child table name is saved
3500 here. The child table will be loaded later, along with its
3501 foreign key constraint. */
3502
3503 ut_a(ref_table != NULL);
3504 fk_tables.push_back(
3505 mem_heap_strdupl(ref_table->heap,
3506 foreign->foreign_table_name_lookup,
3507 foreign_table_name_len));
3508
3509 dict_foreign_remove_from_cache(foreign);
3510 DBUG_RETURN(DB_SUCCESS);
3511 }
3512
3513 ut_a(for_table || ref_table);
3514
3515 /* Note that there may already be a foreign constraint object in
3516 the dictionary cache for this constraint: then the following
3517 call only sets the pointers in it to point to the appropriate table
3518 and index objects and frees the newly created object foreign.
3519 Adding to the cache should always succeed since we are not creating
3520 a new foreign key constraint but loading one from the data
3521 dictionary. */
3522
3523 DBUG_RETURN(dict_foreign_add_to_cache(foreign, col_names,
3524 check_charsets,
3525 ignore_err));
3526 }
3527
3528 /***********************************************************************//**
3529 Loads foreign key constraints where the table is either the foreign key
3530 holder or where the table is referenced by a foreign key. Adds these
3531 constraints to the data dictionary.
3532
3533 The foreign key constraint is loaded only if the referenced table is also
3534 in the dictionary cache. If the referenced table is not in dictionary
3535 cache, then it is added to the output parameter (fk_tables).
3536
3537 @return DB_SUCCESS or error code */
3538 dberr_t
dict_load_foreigns(const char * table_name,const char ** col_names,bool check_recursive,bool check_charsets,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)3539 dict_load_foreigns(
3540 const char* table_name, /*!< in: table name */
3541 const char** col_names, /*!< in: column names, or NULL
3542 to use table->col_names */
3543 bool check_recursive,/*!< in: Whether to check
3544 recursive load of tables
3545 chained by FK */
3546 bool check_charsets, /*!< in: whether to check
3547 charset compatibility */
3548 dict_err_ignore_t ignore_err, /*!< in: error to be ignored */
3549 dict_names_t& fk_tables)
3550 /*!< out: stack of table
3551 names which must be loaded
3552 subsequently to load all the
3553 foreign key constraints. */
3554 {
3555 ulint tuple_buf[(DTUPLE_EST_ALLOC(1) + sizeof(ulint) - 1)
3556 / sizeof(ulint)];
3557 btr_pcur_t pcur;
3558 dtuple_t* tuple;
3559 dfield_t* dfield;
3560 dict_index_t* sec_index;
3561 dict_table_t* sys_foreign;
3562 const rec_t* rec;
3563 const byte* field;
3564 ulint len;
3565 dberr_t err;
3566 mtr_t mtr;
3567
3568 DBUG_ENTER("dict_load_foreigns");
3569
3570 ut_ad(mutex_own(&dict_sys.mutex));
3571
3572 sys_foreign = dict_table_get_low("SYS_FOREIGN");
3573
3574 if (sys_foreign == NULL) {
3575 /* No foreign keys defined yet in this database */
3576
3577 ib::info() << "No foreign key system tables in the database";
3578 DBUG_RETURN(DB_ERROR);
3579 }
3580
3581 ut_ad(!dict_table_is_comp(sys_foreign));
3582 mtr_start(&mtr);
3583
3584 /* Get the secondary index based on FOR_NAME from table
3585 SYS_FOREIGN */
3586
3587 sec_index = dict_table_get_next_index(
3588 dict_table_get_first_index(sys_foreign));
3589 ut_ad(!dict_index_is_clust(sec_index));
3590 start_load:
3591
3592 tuple = dtuple_create_from_mem(tuple_buf, sizeof(tuple_buf), 1, 0);
3593 dfield = dtuple_get_nth_field(tuple, 0);
3594
3595 dfield_set_data(dfield, table_name, strlen(table_name));
3596 dict_index_copy_types(tuple, sec_index, 1);
3597
3598 btr_pcur_open_on_user_rec(sec_index, tuple, PAGE_CUR_GE,
3599 BTR_SEARCH_LEAF, &pcur, &mtr);
3600 loop:
3601 rec = btr_pcur_get_rec(&pcur);
3602
3603 if (!btr_pcur_is_on_user_rec(&pcur)) {
3604 /* End of index */
3605
3606 goto load_next_index;
3607 }
3608
3609 /* Now we have the record in the secondary index containing a table
3610 name and a foreign constraint ID */
3611
3612 field = rec_get_nth_field_old(
3613 rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__NAME, &len);
3614
3615 /* Check if the table name in the record is the one searched for; the
3616 following call does the comparison in the latin1_swedish_ci
3617 charset-collation, in a case-insensitive way. */
3618
3619 if (0 != cmp_data_data(dfield_get_type(dfield)->mtype,
3620 dfield_get_type(dfield)->prtype,
3621 static_cast<const byte*>(
3622 dfield_get_data(dfield)),
3623 dfield_get_len(dfield),
3624 field, len)) {
3625
3626 goto load_next_index;
3627 }
3628
3629 /* Since table names in SYS_FOREIGN are stored in a case-insensitive
3630 order, we have to check that the table name matches also in a binary
3631 string comparison. On Unix, MySQL allows table names that only differ
3632 in character case. If lower_case_table_names=2 then what is stored
3633 may not be the same case, but the previous comparison showed that they
3634 match with no-case. */
3635
3636 if (rec_get_deleted_flag(rec, 0)) {
3637 goto next_rec;
3638 }
3639
3640 if (innobase_get_lower_case_table_names() != 2
3641 && memcmp(field, table_name, len)) {
3642 goto next_rec;
3643 }
3644
3645 /* Now we get a foreign key constraint id */
3646 field = rec_get_nth_field_old(
3647 rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__ID, &len);
3648
3649 /* Copy the string because the page may be modified or evicted
3650 after mtr_commit() below. */
3651 char fk_id[MAX_TABLE_NAME_LEN + 1];
3652
3653 ut_a(len <= MAX_TABLE_NAME_LEN);
3654 memcpy(fk_id, field, len);
3655 fk_id[len] = '\0';
3656
3657 btr_pcur_store_position(&pcur, &mtr);
3658
3659 mtr_commit(&mtr);
3660
3661 /* Load the foreign constraint definition to the dictionary cache */
3662
3663 err = dict_load_foreign(fk_id, col_names,
3664 check_recursive, check_charsets, ignore_err,
3665 fk_tables);
3666
3667 if (err != DB_SUCCESS) {
3668 btr_pcur_close(&pcur);
3669
3670 DBUG_RETURN(err);
3671 }
3672
3673 mtr_start(&mtr);
3674
3675 btr_pcur_restore_position(BTR_SEARCH_LEAF, &pcur, &mtr);
3676 next_rec:
3677 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
3678
3679 goto loop;
3680
3681 load_next_index:
3682 btr_pcur_close(&pcur);
3683 mtr_commit(&mtr);
3684
3685 sec_index = dict_table_get_next_index(sec_index);
3686
3687 if (sec_index != NULL) {
3688
3689 mtr_start(&mtr);
3690
3691 /* Switch to scan index on REF_NAME, fk_max_recusive_level
3692 already been updated when scanning FOR_NAME index, no need to
3693 update again */
3694 check_recursive = FALSE;
3695
3696 goto start_load;
3697 }
3698
3699 DBUG_RETURN(DB_SUCCESS);
3700 }
3701