1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2016, 2020, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file dict/dict0load.cc
22 Loads to the memory cache database object definitions
23 from dictionary tables
24 
25 Created 4/24/1996 Heikki Tuuri
26 *******************************************************/
27 
28 #include "dict0load.h"
29 
30 #include "mysql_version.h"
31 #include "btr0pcur.h"
32 #include "btr0btr.h"
33 #include "dict0boot.h"
34 #include "dict0crea.h"
35 #include "dict0dict.h"
36 #include "dict0mem.h"
37 #include "dict0priv.h"
38 #include "dict0stats.h"
39 #include "fsp0file.h"
40 #include "fts0priv.h"
41 #include "mach0data.h"
42 #include "page0page.h"
43 #include "rem0cmp.h"
44 #include "srv0start.h"
45 #include "srv0srv.h"
46 #include "fts0opt.h"
47 
48 /** Following are the InnoDB system tables. The positions in
49 this array are referenced by enum dict_system_table_id. */
50 static const char* SYSTEM_TABLE_NAME[] = {
51 	"SYS_TABLES",
52 	"SYS_INDEXES",
53 	"SYS_COLUMNS",
54 	"SYS_FIELDS",
55 	"SYS_FOREIGN",
56 	"SYS_FOREIGN_COLS",
57 	"SYS_TABLESPACES",
58 	"SYS_DATAFILES",
59 	"SYS_VIRTUAL"
60 };
61 
62 /** Loads a table definition and also all its index definitions.
63 
64 Loads those foreign key constraints whose referenced table is already in
65 dictionary cache.  If a foreign key constraint is not loaded, then the
66 referenced table is pushed into the output stack (fk_tables), if it is not
67 NULL.  These tables must be subsequently loaded so that all the foreign
68 key constraints are loaded into memory.
69 
70 @param[in]	name		Table name in the db/tablename format
71 @param[in]	ignore_err	Error to be ignored when loading table
72 				and its index definition
73 @param[out]	fk_tables	Related table names that must also be
74 				loaded to ensure that all foreign key
75 				constraints are loaded.
76 @return table, NULL if does not exist; if the table is stored in an
77 .ibd file, but the file does not exist, then we set the
78 file_unreadable flag in the table object we return */
79 static
80 dict_table_t*
81 dict_load_table_one(
82 	const table_name_t&	name,
83 	dict_err_ignore_t	ignore_err,
84 	dict_names_t&		fk_tables);
85 
86 /** Load a table definition from a SYS_TABLES record to dict_table_t.
87 Do not load any columns or indexes.
88 @param[in]	name		Table name
89 @param[in]	rec		SYS_TABLES record
90 @param[out,own]	table		table, or NULL
91 @return	error message
92 @retval	NULL on success */
93 static const char* dict_load_table_low(const table_name_t& name,
94 				       const rec_t* rec, dict_table_t** table)
95 	MY_ATTRIBUTE((nonnull, warn_unused_result));
96 
97 /** Load an index definition from a SYS_INDEXES record to dict_index_t.
98 If allocate=TRUE, we will create a dict_index_t structure and fill it
99 accordingly. If allocated=FALSE, the dict_index_t will be supplied by
100 the caller and filled with information read from the record.
101 @return	error message
102 @retval	NULL on success */
103 static
104 const char*
105 dict_load_index_low(
106 	byte*		table_id,	/*!< in/out: table id (8 bytes),
107 					an "in" value if allocate=TRUE
108 					and "out" when allocate=FALSE */
109 	mem_heap_t*	heap,		/*!< in/out: temporary memory heap */
110 	const rec_t*	rec,		/*!< in: SYS_INDEXES record */
111 	ibool		allocate,	/*!< in: TRUE=allocate *index,
112 					FALSE=fill in a pre-allocated
113 					*index */
114 	dict_index_t**	index);		/*!< out,own: index, or NULL */
115 
116 /** Load a table column definition from a SYS_COLUMNS record to dict_table_t.
117 @return	error message
118 @retval	NULL on success */
119 static
120 const char*
121 dict_load_column_low(
122 	dict_table_t*	table,		/*!< in/out: table, could be NULL
123 					if we just populate a dict_column_t
124 					struct with information from
125 					a SYS_COLUMNS record */
126 	mem_heap_t*	heap,		/*!< in/out: memory heap
127 					for temporary storage */
128 	dict_col_t*	column,		/*!< out: dict_column_t to fill,
129 					or NULL if table != NULL */
130 	table_id_t*	table_id,	/*!< out: table id */
131 	const char**	col_name,	/*!< out: column name */
132 	const rec_t*	rec,		/*!< in: SYS_COLUMNS record */
133 	ulint*		nth_v_col);	/*!< out: if not NULL, this
134 					records the "n" of "nth" virtual
135 					column */
136 
137 /** Load a virtual column "mapping" (to base columns) information
138 from a SYS_VIRTUAL record
139 @param[in,out]	table		table
140 @param[in,out]	column		mapped base column's dict_column_t
141 @param[in,out]	table_id	table id
142 @param[in,out]	pos		virtual column position
143 @param[in,out]	base_pos	base column position
144 @param[in]	rec		SYS_VIRTUAL record
145 @return	error message
146 @retval	NULL on success */
147 static
148 const char*
149 dict_load_virtual_low(
150 	dict_table_t*	table,
151 	dict_col_t**	column,
152 	table_id_t*	table_id,
153 	ulint*		pos,
154 	ulint*		base_pos,
155 	const rec_t*	rec);
156 
157 /** Load an index field definition from a SYS_FIELDS record to dict_index_t.
158 @return	error message
159 @retval	NULL on success */
160 static
161 const char*
162 dict_load_field_low(
163 	byte*		index_id,	/*!< in/out: index id (8 bytes)
164 					an "in" value if index != NULL
165 					and "out" if index == NULL */
166 	dict_index_t*	index,		/*!< in/out: index, could be NULL
167 					if we just populate a dict_field_t
168 					struct with information from
169 					a SYS_FIELDS record */
170 	dict_field_t*	sys_field,	/*!< out: dict_field_t to be
171 					filled */
172 	ulint*		pos,		/*!< out: Field position */
173 	byte*		last_index_id,	/*!< in: last index id */
174 	mem_heap_t*	heap,		/*!< in/out: memory heap
175 					for temporary storage */
176 	const rec_t*	rec);		/*!< in: SYS_FIELDS record */
177 
178 /* If this flag is TRUE, then we will load the cluster index's (and tables')
179 metadata even if it is marked as "corrupted". */
180 my_bool     srv_load_corrupted;
181 
182 #ifdef UNIV_DEBUG
183 /****************************************************************//**
184 Compare the name of an index column.
185 @return TRUE if the i'th column of index is 'name'. */
186 static
187 ibool
188 name_of_col_is(
189 /*===========*/
190 	const dict_table_t*	table,	/*!< in: table */
191 	const dict_index_t*	index,	/*!< in: index */
192 	ulint			i,	/*!< in: index field offset */
193 	const char*		name)	/*!< in: name to compare to */
194 {
195 	ulint	tmp = dict_col_get_no(dict_field_get_col(
196 					      dict_index_get_nth_field(
197 						      index, i)));
198 
199 	return(strcmp(name, dict_table_get_col_name(table, tmp)) == 0);
200 }
201 #endif /* UNIV_DEBUG */
202 
203 /********************************************************************//**
204 Finds the first table name in the given database.
205 @return own: table name, NULL if does not exist; the caller must free
206 the memory in the string! */
207 char*
208 dict_get_first_table_name_in_db(
209 /*============================*/
210 	const char*	name)	/*!< in: database name which ends in '/' */
211 {
212 	dict_table_t*	sys_tables;
213 	btr_pcur_t	pcur;
214 	dict_index_t*	sys_index;
215 	dtuple_t*	tuple;
216 	mem_heap_t*	heap;
217 	dfield_t*	dfield;
218 	const rec_t*	rec;
219 	const byte*	field;
220 	ulint		len;
221 	mtr_t		mtr;
222 
223 	ut_ad(mutex_own(&dict_sys->mutex));
224 
225 	heap = mem_heap_create(1000);
226 
227 	mtr_start(&mtr);
228 
229 	sys_tables = dict_table_get_low("SYS_TABLES");
230 	sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
231 	ut_ad(!dict_table_is_comp(sys_tables));
232 
233 	tuple = dtuple_create(heap, 1);
234 	dfield = dtuple_get_nth_field(tuple, 0);
235 
236 	dfield_set_data(dfield, name, ut_strlen(name));
237 	dict_index_copy_types(tuple, sys_index, 1);
238 
239 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
240 				  BTR_SEARCH_LEAF, &pcur, &mtr);
241 loop:
242 	rec = btr_pcur_get_rec(&pcur);
243 
244 	if (!btr_pcur_is_on_user_rec(&pcur)) {
245 		/* Not found */
246 
247 		btr_pcur_close(&pcur);
248 		mtr_commit(&mtr);
249 		mem_heap_free(heap);
250 
251 		return(NULL);
252 	}
253 
254 	field = rec_get_nth_field_old(
255 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
256 
257 	if (len < strlen(name)
258 	    || ut_memcmp(name, field, strlen(name)) != 0) {
259 		/* Not found */
260 
261 		btr_pcur_close(&pcur);
262 		mtr_commit(&mtr);
263 		mem_heap_free(heap);
264 
265 		return(NULL);
266 	}
267 
268 	if (!rec_get_deleted_flag(rec, 0)) {
269 
270 		/* We found one */
271 
272 		char*	table_name = mem_strdupl((char*) field, len);
273 
274 		btr_pcur_close(&pcur);
275 		mtr_commit(&mtr);
276 		mem_heap_free(heap);
277 
278 		return(table_name);
279 	}
280 
281 	btr_pcur_move_to_next_user_rec(&pcur, &mtr);
282 
283 	goto loop;
284 }
285 
286 /********************************************************************//**
287 This function gets the next system table record as it scans the table.
288 @return the next record if found, NULL if end of scan */
289 static
290 const rec_t*
291 dict_getnext_system_low(
292 /*====================*/
293 	btr_pcur_t*	pcur,		/*!< in/out: persistent cursor to the
294 					record*/
295 	mtr_t*		mtr)		/*!< in: the mini-transaction */
296 {
297 	rec_t*	rec = NULL;
298 
299 	while (!rec || rec_get_deleted_flag(rec, 0)) {
300 		btr_pcur_move_to_next_user_rec(pcur, mtr);
301 
302 		rec = btr_pcur_get_rec(pcur);
303 
304 		if (!btr_pcur_is_on_user_rec(pcur)) {
305 			/* end of index */
306 			btr_pcur_close(pcur);
307 
308 			return(NULL);
309 		}
310 	}
311 
312 	/* Get a record, let's save the position */
313 	btr_pcur_store_position(pcur, mtr);
314 
315 	return(rec);
316 }
317 
318 /********************************************************************//**
319 This function opens a system table, and returns the first record.
320 @return first record of the system table */
321 const rec_t*
322 dict_startscan_system(
323 /*==================*/
324 	btr_pcur_t*	pcur,		/*!< out: persistent cursor to
325 					the record */
326 	mtr_t*		mtr,		/*!< in: the mini-transaction */
327 	dict_system_id_t system_id)	/*!< in: which system table to open */
328 {
329 	dict_table_t*	system_table;
330 	dict_index_t*	clust_index;
331 	const rec_t*	rec;
332 
333 	ut_a(system_id < SYS_NUM_SYSTEM_TABLES);
334 
335 	system_table = dict_table_get_low(SYSTEM_TABLE_NAME[system_id]);
336 
337 	clust_index = UT_LIST_GET_FIRST(system_table->indexes);
338 
339 	btr_pcur_open_at_index_side(true, clust_index, BTR_SEARCH_LEAF, pcur,
340 				    true, 0, mtr);
341 
342 	rec = dict_getnext_system_low(pcur, mtr);
343 
344 	return(rec);
345 }
346 
347 /********************************************************************//**
348 This function gets the next system table record as it scans the table.
349 @return the next record if found, NULL if end of scan */
350 const rec_t*
351 dict_getnext_system(
352 /*================*/
353 	btr_pcur_t*	pcur,		/*!< in/out: persistent cursor
354 					to the record */
355 	mtr_t*		mtr)		/*!< in: the mini-transaction */
356 {
357 	const rec_t*	rec;
358 
359 	/* Restore the position */
360 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
361 
362 	/* Get the next record */
363 	rec = dict_getnext_system_low(pcur, mtr);
364 
365 	return(rec);
366 }
367 
368 /********************************************************************//**
369 This function processes one SYS_TABLES record and populate the dict_table_t
370 struct for the table.
371 @return error message, or NULL on success */
372 const char*
373 dict_process_sys_tables_rec_and_mtr_commit(
374 /*=======================================*/
375 	mem_heap_t*	heap,		/*!< in/out: temporary memory heap */
376 	const rec_t*	rec,		/*!< in: SYS_TABLES record */
377 	dict_table_t**	table,		/*!< out: dict_table_t to fill */
378 	bool		cached,		/*!< in: whether to load from cache */
379 	mtr_t*		mtr)		/*!< in/out: mini-transaction,
380 					will be committed */
381 {
382 	ulint		len;
383 	const char*	field;
384 
385 	field = (const char*) rec_get_nth_field_old(
386 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
387 
388 	ut_a(!rec_get_deleted_flag(rec, 0));
389 
390 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
391 
392 	/* Get the table name */
393 	table_name_t table_name(mem_heap_strdupl(heap, field, len));
394 
395 	if (cached) {
396 		/* Commit before load the table again */
397 		mtr_commit(mtr);
398 
399 		*table = dict_table_get_low(table_name.m_name);
400 		return *table ? NULL : "Table not found in cache";
401 	} else {
402 		const char* err = dict_load_table_low(table_name, rec, table);
403 		mtr_commit(mtr);
404 		return err;
405 	}
406 }
407 
408 /********************************************************************//**
409 This function parses a SYS_INDEXES record and populate a dict_index_t
410 structure with the information from the record. For detail information
411 about SYS_INDEXES fields, please refer to dict_boot() function.
412 @return error message, or NULL on success */
413 const char*
414 dict_process_sys_indexes_rec(
415 /*=========================*/
416 	mem_heap_t*	heap,		/*!< in/out: heap memory */
417 	const rec_t*	rec,		/*!< in: current SYS_INDEXES rec */
418 	dict_index_t*	index,		/*!< out: index to be filled */
419 	table_id_t*	table_id)	/*!< out: index table id */
420 {
421 	const char*	err_msg;
422 	byte*		buf;
423 
424 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
425 
426 	/* Parse the record, and get "dict_index_t" struct filled */
427 	err_msg = dict_load_index_low(buf, heap, rec, FALSE, &index);
428 
429 	*table_id = mach_read_from_8(buf);
430 
431 	return(err_msg);
432 }
433 
434 /********************************************************************//**
435 This function parses a SYS_COLUMNS record and populate a dict_column_t
436 structure with the information from the record.
437 @return error message, or NULL on success */
438 const char*
439 dict_process_sys_columns_rec(
440 /*=========================*/
441 	mem_heap_t*	heap,		/*!< in/out: heap memory */
442 	const rec_t*	rec,		/*!< in: current SYS_COLUMNS rec */
443 	dict_col_t*	column,		/*!< out: dict_col_t to be filled */
444 	table_id_t*	table_id,	/*!< out: table id */
445 	const char**	col_name,	/*!< out: column name */
446 	ulint*		nth_v_col)	/*!< out: if virtual col, this is
447 					record's sequence number */
448 {
449 	const char*	err_msg;
450 
451 	/* Parse the record, and get "dict_col_t" struct filled */
452 	err_msg = dict_load_column_low(NULL, heap, column,
453 				       table_id, col_name, rec, nth_v_col);
454 
455 	return(err_msg);
456 }
457 
458 /** This function parses a SYS_VIRTUAL record and extracts virtual column
459 information
460 @param[in]	rec		current SYS_COLUMNS rec
461 @param[in,out]	table_id	table id
462 @param[in,out]	pos		virtual column position
463 @param[in,out]	base_pos	base column position
464 @return error message, or NULL on success */
465 const char*
466 dict_process_sys_virtual_rec(
467 	const rec_t*	rec,
468 	table_id_t*	table_id,
469 	ulint*		pos,
470 	ulint*		base_pos)
471 {
472 	const char*	err_msg;
473 
474 	/* Parse the record, and get "dict_col_t" struct filled */
475 	err_msg = dict_load_virtual_low(NULL, NULL, table_id,
476 					pos, base_pos, rec);
477 
478 	return(err_msg);
479 }
480 
481 /********************************************************************//**
482 This function parses a SYS_FIELDS record and populates a dict_field_t
483 structure with the information from the record.
484 @return error message, or NULL on success */
485 const char*
486 dict_process_sys_fields_rec(
487 /*========================*/
488 	mem_heap_t*	heap,		/*!< in/out: heap memory */
489 	const rec_t*	rec,		/*!< in: current SYS_FIELDS rec */
490 	dict_field_t*	sys_field,	/*!< out: dict_field_t to be
491 					filled */
492 	ulint*		pos,		/*!< out: Field position */
493 	index_id_t*	index_id,	/*!< out: current index id */
494 	index_id_t	last_id)	/*!< in: previous index id */
495 {
496 	byte*		buf;
497 	byte*		last_index_id;
498 	const char*	err_msg;
499 
500 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
501 
502 	last_index_id = static_cast<byte*>(mem_heap_alloc(heap, 8));
503 	mach_write_to_8(last_index_id, last_id);
504 
505 	err_msg = dict_load_field_low(buf, NULL, sys_field,
506 				      pos, last_index_id, heap, rec);
507 
508 	*index_id = mach_read_from_8(buf);
509 
510 	return(err_msg);
511 
512 }
513 
514 /********************************************************************//**
515 This function parses a SYS_FOREIGN record and populate a dict_foreign_t
516 structure with the information from the record. For detail information
517 about SYS_FOREIGN fields, please refer to dict_load_foreign() function.
518 @return error message, or NULL on success */
519 const char*
520 dict_process_sys_foreign_rec(
521 /*=========================*/
522 	mem_heap_t*	heap,		/*!< in/out: heap memory */
523 	const rec_t*	rec,		/*!< in: current SYS_FOREIGN rec */
524 	dict_foreign_t*	foreign)	/*!< out: dict_foreign_t struct
525 					to be filled */
526 {
527 	ulint		len;
528 	const byte*	field;
529 	ulint		n_fields_and_type;
530 
531 	if (rec_get_deleted_flag(rec, 0)) {
532 		return("delete-marked record in SYS_FOREIGN");
533 	}
534 
535 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN) {
536 		return("wrong number of columns in SYS_FOREIGN record");
537 	}
538 
539 	field = rec_get_nth_field_old(
540 		rec, DICT_FLD__SYS_FOREIGN__ID, &len);
541 	if (len == 0 || len == UNIV_SQL_NULL) {
542 err_len:
543 		return("incorrect column length in SYS_FOREIGN");
544 	}
545 
546 	/* This receives a dict_foreign_t* that points to a stack variable.
547 	So dict_foreign_free(foreign) is not used as elsewhere.
548 	Since the heap used here is freed elsewhere, foreign->heap
549 	is not assigned. */
550 	foreign->id = mem_heap_strdupl(heap, (const char*) field, len);
551 
552 	rec_get_nth_field_offs_old(
553 		rec, DICT_FLD__SYS_FOREIGN__DB_TRX_ID, &len);
554 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
555 		goto err_len;
556 	}
557 	rec_get_nth_field_offs_old(
558 		rec, DICT_FLD__SYS_FOREIGN__DB_ROLL_PTR, &len);
559 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
560 		goto err_len;
561 	}
562 
563 	/* The _lookup versions of the referenced and foreign table names
564 	 are not assigned since they are not used in this dict_foreign_t */
565 
566 	field = rec_get_nth_field_old(
567 		rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
568 	if (len == 0 || len == UNIV_SQL_NULL) {
569 		goto err_len;
570 	}
571 	foreign->foreign_table_name = mem_heap_strdupl(
572 		heap, (const char*) field, len);
573 
574 	field = rec_get_nth_field_old(
575 		rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
576 	if (len == 0 || len == UNIV_SQL_NULL) {
577 		goto err_len;
578 	}
579 	foreign->referenced_table_name = mem_heap_strdupl(
580 		heap, (const char*) field, len);
581 
582 	field = rec_get_nth_field_old(
583 		rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len);
584 	if (len != 4) {
585 		goto err_len;
586 	}
587 	n_fields_and_type = mach_read_from_4(field);
588 
589 	foreign->type = (unsigned int) (n_fields_and_type >> 24);
590 	foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL);
591 
592 	return(NULL);
593 }
594 
595 /********************************************************************//**
596 This function parses a SYS_FOREIGN_COLS record and extract necessary
597 information from the record and return to caller.
598 @return error message, or NULL on success */
599 const char*
600 dict_process_sys_foreign_col_rec(
601 /*=============================*/
602 	mem_heap_t*	heap,		/*!< in/out: heap memory */
603 	const rec_t*	rec,		/*!< in: current SYS_FOREIGN_COLS rec */
604 	const char**	name,		/*!< out: foreign key constraint name */
605 	const char**	for_col_name,	/*!< out: referencing column name */
606 	const char**	ref_col_name,	/*!< out: referenced column name
607 					in referenced table */
608 	ulint*		pos)		/*!< out: column position */
609 {
610 	ulint		len;
611 	const byte*	field;
612 
613 	if (rec_get_deleted_flag(rec, 0)) {
614 		return("delete-marked record in SYS_FOREIGN_COLS");
615 	}
616 
617 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN_COLS) {
618 		return("wrong number of columns in SYS_FOREIGN_COLS record");
619 	}
620 
621 	field = rec_get_nth_field_old(
622 		rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
623 	if (len == 0 || len == UNIV_SQL_NULL) {
624 err_len:
625 		return("incorrect column length in SYS_FOREIGN_COLS");
626 	}
627 	*name = mem_heap_strdupl(heap, (char*) field, len);
628 
629 	field = rec_get_nth_field_old(
630 		rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
631 	if (len != 4) {
632 		goto err_len;
633 	}
634 	*pos = mach_read_from_4(field);
635 
636 	rec_get_nth_field_offs_old(
637 		rec, DICT_FLD__SYS_FOREIGN_COLS__DB_TRX_ID, &len);
638 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
639 		goto err_len;
640 	}
641 	rec_get_nth_field_offs_old(
642 		rec, DICT_FLD__SYS_FOREIGN_COLS__DB_ROLL_PTR, &len);
643 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
644 		goto err_len;
645 	}
646 
647 	field = rec_get_nth_field_old(
648 		rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
649 	if (len == 0 || len == UNIV_SQL_NULL) {
650 		goto err_len;
651 	}
652 	*for_col_name = mem_heap_strdupl(heap, (char*) field, len);
653 
654 	field = rec_get_nth_field_old(
655 		rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
656 	if (len == 0 || len == UNIV_SQL_NULL) {
657 		goto err_len;
658 	}
659 	*ref_col_name = mem_heap_strdupl(heap, (char*) field, len);
660 
661 	return(NULL);
662 }
663 
664 /********************************************************************//**
665 This function parses a SYS_TABLESPACES record, extracts necessary
666 information from the record and returns to caller.
667 @return error message, or NULL on success */
668 const char*
669 dict_process_sys_tablespaces(
670 /*=========================*/
671 	mem_heap_t*	heap,		/*!< in/out: heap memory */
672 	const rec_t*	rec,		/*!< in: current SYS_TABLESPACES rec */
673 	ulint*		space,		/*!< out: space id */
674 	const char**	name,		/*!< out: tablespace name */
675 	ulint*		flags)		/*!< out: tablespace flags */
676 {
677 	ulint		len;
678 	const byte*	field;
679 
680 	/* Initialize the output values */
681 	*space = ULINT_UNDEFINED;
682 	*name = NULL;
683 	*flags = ULINT_UNDEFINED;
684 
685 	if (rec_get_deleted_flag(rec, 0)) {
686 		return("delete-marked record in SYS_TABLESPACES");
687 	}
688 
689 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLESPACES) {
690 		return("wrong number of columns in SYS_TABLESPACES record");
691 	}
692 
693 	field = rec_get_nth_field_old(
694 		rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
695 	if (len != DICT_FLD_LEN_SPACE) {
696 err_len:
697 		return("incorrect column length in SYS_TABLESPACES");
698 	}
699 	*space = mach_read_from_4(field);
700 
701 	rec_get_nth_field_offs_old(
702 		rec, DICT_FLD__SYS_TABLESPACES__DB_TRX_ID, &len);
703 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
704 		goto err_len;
705 	}
706 
707 	rec_get_nth_field_offs_old(
708 		rec, DICT_FLD__SYS_TABLESPACES__DB_ROLL_PTR, &len);
709 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
710 		goto err_len;
711 	}
712 
713 	field = rec_get_nth_field_old(
714 		rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
715 	if (len == 0 || len == UNIV_SQL_NULL) {
716 		goto err_len;
717 	}
718 	*name = mem_heap_strdupl(heap, (char*) field, len);
719 
720 	field = rec_get_nth_field_old(
721 		rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
722 	if (len != DICT_FLD_LEN_FLAGS) {
723 		goto err_len;
724 	}
725 	*flags = mach_read_from_4(field);
726 
727 	return(NULL);
728 }
729 
730 /********************************************************************//**
731 This function parses a SYS_DATAFILES record, extracts necessary
732 information from the record and returns it to the caller.
733 @return error message, or NULL on success */
734 const char*
735 dict_process_sys_datafiles(
736 /*=======================*/
737 	mem_heap_t*	heap,		/*!< in/out: heap memory */
738 	const rec_t*	rec,		/*!< in: current SYS_DATAFILES rec */
739 	ulint*		space,		/*!< out: space id */
740 	const char**	path)		/*!< out: datafile paths */
741 {
742 	ulint		len;
743 	const byte*	field;
744 
745 	if (rec_get_deleted_flag(rec, 0)) {
746 		return("delete-marked record in SYS_DATAFILES");
747 	}
748 
749 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_DATAFILES) {
750 		return("wrong number of columns in SYS_DATAFILES record");
751 	}
752 
753 	field = rec_get_nth_field_old(
754 		rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
755 	if (len != DICT_FLD_LEN_SPACE) {
756 err_len:
757 		return("incorrect column length in SYS_DATAFILES");
758 	}
759 	*space = mach_read_from_4(field);
760 
761 	rec_get_nth_field_offs_old(
762 		rec, DICT_FLD__SYS_DATAFILES__DB_TRX_ID, &len);
763 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
764 		goto err_len;
765 	}
766 
767 	rec_get_nth_field_offs_old(
768 		rec, DICT_FLD__SYS_DATAFILES__DB_ROLL_PTR, &len);
769 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
770 		goto err_len;
771 	}
772 
773 	field = rec_get_nth_field_old(
774 		rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
775 	if (len == 0 || len == UNIV_SQL_NULL) {
776 		goto err_len;
777 	}
778 	*path = mem_heap_strdupl(heap, (char*) field, len);
779 
780 	return(NULL);
781 }
782 
783 /** Get the first filepath from SYS_DATAFILES for a given space_id.
784 @param[in]	space_id	Tablespace ID
785 @return First filepath (caller must invoke ut_free() on it)
786 @retval NULL if no SYS_DATAFILES entry was found. */
787 static char*
788 dict_get_first_path(
789 	ulint	space_id)
790 {
791 	mtr_t		mtr;
792 	dict_table_t*	sys_datafiles;
793 	dict_index_t*	sys_index;
794 	dtuple_t*	tuple;
795 	dfield_t*	dfield;
796 	byte*		buf;
797 	btr_pcur_t	pcur;
798 	const rec_t*	rec;
799 	const byte*	field;
800 	ulint		len;
801 	char*		filepath = NULL;
802 	mem_heap_t*	heap = mem_heap_create(1024);
803 
804 	ut_ad(mutex_own(&dict_sys->mutex));
805 
806 	mtr_start(&mtr);
807 
808 	sys_datafiles = dict_table_get_low("SYS_DATAFILES");
809 	sys_index = UT_LIST_GET_FIRST(sys_datafiles->indexes);
810 
811 	ut_ad(!dict_table_is_comp(sys_datafiles));
812 	ut_ad(name_of_col_is(sys_datafiles, sys_index,
813 			     DICT_FLD__SYS_DATAFILES__SPACE, "SPACE"));
814 	ut_ad(name_of_col_is(sys_datafiles, sys_index,
815 			     DICT_FLD__SYS_DATAFILES__PATH, "PATH"));
816 
817 	tuple = dtuple_create(heap, 1);
818 	dfield = dtuple_get_nth_field(tuple, DICT_FLD__SYS_DATAFILES__SPACE);
819 
820 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
821 	mach_write_to_4(buf, space_id);
822 
823 	dfield_set_data(dfield, buf, 4);
824 	dict_index_copy_types(tuple, sys_index, 1);
825 
826 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
827 				  BTR_SEARCH_LEAF, &pcur, &mtr);
828 
829 	rec = btr_pcur_get_rec(&pcur);
830 
831 	/* Get the filepath from this SYS_DATAFILES record. */
832 	if (btr_pcur_is_on_user_rec(&pcur)) {
833 		field = rec_get_nth_field_old(
834 			rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
835 		ut_a(len == 4);
836 
837 		if (space_id == mach_read_from_4(field)) {
838 			/* A record for this space ID was found. */
839 			field = rec_get_nth_field_old(
840 				rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
841 
842 			ut_ad(len > 0);
843 			ut_ad(len < OS_FILE_MAX_PATH);
844 
845 			if (len > 0 && len < UNIV_SQL_NULL) {
846 				filepath = mem_strdupl(
847 					reinterpret_cast<const char*>(field),
848 					len);
849 				ut_ad(filepath != NULL);
850 
851 				/* The dictionary may have been written on
852 				another OS. */
853 				os_normalize_path(filepath);
854 			}
855 		}
856 	}
857 
858 	btr_pcur_close(&pcur);
859 	mtr_commit(&mtr);
860 	mem_heap_free(heap);
861 
862 	return(filepath);
863 }
864 
865 /** Update the record for space_id in SYS_TABLESPACES to this filepath.
866 @param[in]	space_id	Tablespace ID
867 @param[in]	filepath	Tablespace filepath
868 @return DB_SUCCESS if OK, dberr_t if the insert failed */
869 dberr_t
870 dict_update_filepath(
871 	ulint		space_id,
872 	const char*	filepath)
873 {
874 	if (!srv_sys_tablespaces_open) {
875 		/* Startup procedure is not yet ready for updates. */
876 		return(DB_SUCCESS);
877 	}
878 
879 	dberr_t		err = DB_SUCCESS;
880 	trx_t*		trx;
881 
882 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X));
883 	ut_ad(mutex_own(&dict_sys->mutex));
884 
885 	trx = trx_create();
886 	trx->op_info = "update filepath";
887 	trx->dict_operation_lock_mode = RW_X_LATCH;
888 	trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
889 
890 	pars_info_t*	info = pars_info_create();
891 
892 	pars_info_add_int4_literal(info, "space", space_id);
893 	pars_info_add_str_literal(info, "path", filepath);
894 
895 	err = que_eval_sql(info,
896 			   "PROCEDURE UPDATE_FILEPATH () IS\n"
897 			   "BEGIN\n"
898 			   "UPDATE SYS_DATAFILES"
899 			   " SET PATH = :path\n"
900 			   " WHERE SPACE = :space;\n"
901 			   "END;\n", FALSE, trx);
902 
903 	trx_commit_for_mysql(trx);
904 	trx->dict_operation_lock_mode = 0;
905 	trx->free();
906 
907 	if (UNIV_LIKELY(err == DB_SUCCESS)) {
908 		/* We just updated SYS_DATAFILES due to the contents in
909 		a link file.  Make a note that we did this. */
910 		ib::info() << "The InnoDB data dictionary table SYS_DATAFILES"
911 			" for tablespace ID " << space_id
912 			<< " was updated to use file " << filepath << ".";
913 	} else {
914 		ib::warn() << "Error occurred while updating InnoDB data"
915 			" dictionary table SYS_DATAFILES for tablespace ID "
916 			<< space_id << " to file " << filepath << ": "
917 			<< err << ".";
918 	}
919 
920 	return(err);
921 }
922 
923 /** Replace records in SYS_TABLESPACES and SYS_DATAFILES associated with
924 the given space_id using an independent transaction.
925 @param[in]	space_id	Tablespace ID
926 @param[in]	name		Tablespace name
927 @param[in]	filepath	First filepath
928 @param[in]	fsp_flags	Tablespace flags
929 @return DB_SUCCESS if OK, dberr_t if the insert failed */
930 dberr_t
931 dict_replace_tablespace_and_filepath(
932 	ulint		space_id,
933 	const char*	name,
934 	const char*	filepath,
935 	ulint		fsp_flags)
936 {
937 	if (!srv_sys_tablespaces_open) {
938 		/* Startup procedure is not yet ready for updates.
939 		Return success since this will likely get updated
940 		later. */
941 		return(DB_SUCCESS);
942 	}
943 
944 	dberr_t		err = DB_SUCCESS;
945 	trx_t*		trx;
946 
947 	DBUG_EXECUTE_IF("innodb_fail_to_update_tablespace_dict",
948 			return(DB_INTERRUPTED););
949 
950 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X));
951 	ut_ad(mutex_own(&dict_sys->mutex));
952 	ut_ad(filepath);
953 
954 	trx = trx_create();
955 	trx->op_info = "insert tablespace and filepath";
956 	trx->dict_operation_lock_mode = RW_X_LATCH;
957 	trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
958 
959 	/* A record for this space ID was not found in
960 	SYS_DATAFILES. Assume the record is also missing in
961 	SYS_TABLESPACES.  Insert records into them both. */
962 	err = dict_replace_tablespace_in_dictionary(
963 		space_id, name, fsp_flags, filepath, trx);
964 
965 	trx_commit_for_mysql(trx);
966 	trx->dict_operation_lock_mode = 0;
967 	trx->free();
968 
969 	return(err);
970 }
971 
972 /** Check the validity of a SYS_TABLES record
973 Make sure the fields are the right length and that they
974 do not contain invalid contents.
975 @param[in]	rec	SYS_TABLES record
976 @return error message, or NULL on success */
977 static
978 const char*
979 dict_sys_tables_rec_check(
980 	const rec_t*	rec)
981 {
982 	const byte*	field;
983 	ulint		len;
984 
985 	ut_ad(mutex_own(&dict_sys->mutex));
986 
987 	if (rec_get_deleted_flag(rec, 0)) {
988 		return("delete-marked record in SYS_TABLES");
989 	}
990 
991 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLES) {
992 		return("wrong number of columns in SYS_TABLES record");
993 	}
994 
995 	rec_get_nth_field_offs_old(
996 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
997 	if (len == 0 || len == UNIV_SQL_NULL) {
998 err_len:
999 		return("incorrect column length in SYS_TABLES");
1000 	}
1001 	rec_get_nth_field_offs_old(
1002 		rec, DICT_FLD__SYS_TABLES__DB_TRX_ID, &len);
1003 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1004 		goto err_len;
1005 	}
1006 	rec_get_nth_field_offs_old(
1007 		rec, DICT_FLD__SYS_TABLES__DB_ROLL_PTR, &len);
1008 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1009 		goto err_len;
1010 	}
1011 
1012 	rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__ID, &len);
1013 	if (len != 8) {
1014 		goto err_len;
1015 	}
1016 
1017 	field = rec_get_nth_field_old(
1018 		rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1019 	if (field == NULL || len != 4) {
1020 		goto err_len;
1021 	}
1022 
1023 	rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1024 	if (len != 4) {
1025 		goto err_len;
1026 	}
1027 
1028 	rec_get_nth_field_offs_old(
1029 		rec, DICT_FLD__SYS_TABLES__MIX_ID, &len);
1030 	if (len != 8) {
1031 		goto err_len;
1032 	}
1033 
1034 	field = rec_get_nth_field_old(
1035 		rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1036 	if (field == NULL || len != 4) {
1037 		goto err_len;
1038 	}
1039 
1040 	rec_get_nth_field_offs_old(
1041 		rec, DICT_FLD__SYS_TABLES__CLUSTER_ID, &len);
1042 	if (len != UNIV_SQL_NULL) {
1043 		goto err_len;
1044 	}
1045 
1046 	field = rec_get_nth_field_old(
1047 		rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1048 	if (field == NULL || len != 4) {
1049 		goto err_len;
1050 	}
1051 
1052 	return(NULL);
1053 }
1054 
1055 /** Read and return the contents of a SYS_TABLESPACES record.
1056 @param[in]	rec	A record of SYS_TABLESPACES
1057 @param[out]	id	Pointer to the space_id for this table
1058 @param[in,out]	name	Buffer for Tablespace Name of length NAME_LEN
1059 @param[out]	flags	Pointer to tablespace flags
1060 @return true if the record was read correctly, false if not. */
1061 bool
1062 dict_sys_tablespaces_rec_read(
1063 	const rec_t*	rec,
1064 	ulint*		id,
1065 	char*		name,
1066 	ulint*		flags)
1067 {
1068 	const byte*	field;
1069 	ulint		len;
1070 
1071 	field = rec_get_nth_field_old(
1072 		rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
1073 	if (len != DICT_FLD_LEN_SPACE) {
1074 		ib::error() << "Wrong field length in SYS_TABLESPACES.SPACE: "
1075 		<< len;
1076 		return(false);
1077 	}
1078 	*id = mach_read_from_4(field);
1079 
1080 	field = rec_get_nth_field_old(
1081 		rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
1082 	if (len == 0 || len == UNIV_SQL_NULL) {
1083 		ib::error() << "Wrong field length in SYS_TABLESPACES.NAME: "
1084 			<< len;
1085 		return(false);
1086 	}
1087 	strncpy(name, reinterpret_cast<const char*>(field), NAME_LEN);
1088 
1089 	/* read the 4 byte flags from the TYPE field */
1090 	field = rec_get_nth_field_old(
1091 		rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
1092 	if (len != 4) {
1093 		ib::error() << "Wrong field length in SYS_TABLESPACES.FLAGS: "
1094 			<< len;
1095 		return(false);
1096 	}
1097 	*flags = mach_read_from_4(field);
1098 
1099 	return(true);
1100 }
1101 
1102 /** Check if SYS_TABLES.TYPE is valid
1103 @param[in]	type		SYS_TABLES.TYPE
1104 @param[in]	not_redundant	whether ROW_FORMAT=REDUNDANT is not used
1105 @return	whether the SYS_TABLES.TYPE value is valid */
1106 static
1107 bool
1108 dict_sys_tables_type_valid(ulint type, bool not_redundant)
1109 {
1110 	/* The DATA_DIRECTORY flag can be assigned fully independently
1111 	of all other persistent table flags. */
1112 	type &= ~DICT_TF_MASK_DATA_DIR;
1113 
1114 	if (type == 1) {
1115 		return(true); /* ROW_FORMAT=REDUNDANT or ROW_FORMAT=COMPACT */
1116 	}
1117 
1118 	if (!(type & 1)) {
1119 		/* For ROW_FORMAT=REDUNDANT and ROW_FORMAT=COMPACT,
1120 		SYS_TABLES.TYPE=1. Else, it is the same as
1121 		dict_table_t::flags, and the least significant bit
1122 		would be set. So, the bit never can be 0. */
1123 		return(false);
1124 	}
1125 
1126 	if (!not_redundant) {
1127 		/* SYS_TABLES.TYPE must be 1 or 1|DICT_TF_MASK_NO_ROLLBACK
1128 		for ROW_FORMAT=REDUNDANT. */
1129 		return !(type & ~(1U | DICT_TF_MASK_NO_ROLLBACK));
1130 	}
1131 
1132 	if (type >= 1U << DICT_TF_POS_UNUSED) {
1133 		/* Some unknown bits are set. */
1134 		return(false);
1135 	}
1136 
1137 	return(dict_tf_is_valid_not_redundant(type));
1138 }
1139 
1140 /** Convert SYS_TABLES.TYPE to dict_table_t::flags.
1141 @param[in]	type		SYS_TABLES.TYPE
1142 @param[in]	not_redundant	whether ROW_FORMAT=REDUNDANT is not used
1143 @return	table flags */
1144 static
1145 ulint
1146 dict_sys_tables_type_to_tf(ulint type, bool not_redundant)
1147 {
1148 	ut_ad(dict_sys_tables_type_valid(type, not_redundant));
1149 	ulint	flags = not_redundant ? 1 : 0;
1150 
1151 	/* ZIP_SSIZE, ATOMIC_BLOBS, DATA_DIR, PAGE_COMPRESSION,
1152 	PAGE_COMPRESSION_LEVEL are the same. */
1153 	flags |= type & (DICT_TF_MASK_ZIP_SSIZE
1154 			 | DICT_TF_MASK_ATOMIC_BLOBS
1155 			 | DICT_TF_MASK_DATA_DIR
1156 			 | DICT_TF_MASK_PAGE_COMPRESSION
1157 			 | DICT_TF_MASK_PAGE_COMPRESSION_LEVEL
1158 			 | DICT_TF_MASK_NO_ROLLBACK);
1159 
1160 	ut_ad(dict_tf_is_valid(flags));
1161 	return(flags);
1162 }
1163 
1164 /** Read and return 5 integer fields from a SYS_TABLES record.
1165 @param[in]	rec		A record of SYS_TABLES
1166 @param[in]	name		Table Name, the same as SYS_TABLES.NAME
1167 @param[out]	table_id	Pointer to the table_id for this table
1168 @param[out]	space_id	Pointer to the space_id for this table
1169 @param[out]	n_cols		Pointer to number of columns for this table.
1170 @param[out]	flags		Pointer to table flags
1171 @param[out]	flags2		Pointer to table flags2
1172 @return true if the record was read correctly, false if not. */
1173 MY_ATTRIBUTE((warn_unused_result))
1174 static
1175 bool
1176 dict_sys_tables_rec_read(
1177 	const rec_t*		rec,
1178 	const table_name_t&	table_name,
1179 	table_id_t*		table_id,
1180 	ulint*			space_id,
1181 	ulint*			n_cols,
1182 	ulint*			flags,
1183 	ulint*			flags2)
1184 {
1185 	const byte*	field;
1186 	ulint		len;
1187 	ulint		type;
1188 
1189 	field = rec_get_nth_field_old(
1190 		rec, DICT_FLD__SYS_TABLES__ID, &len);
1191 	ut_ad(len == 8);
1192 	*table_id = static_cast<table_id_t>(mach_read_from_8(field));
1193 
1194 	field = rec_get_nth_field_old(
1195 		rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1196 	ut_ad(len == 4);
1197 	*space_id = mach_read_from_4(field);
1198 
1199 	/* Read the 4 byte flags from the TYPE field */
1200 	field = rec_get_nth_field_old(
1201 		rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1202 	ut_a(len == 4);
1203 	type = mach_read_from_4(field);
1204 
1205 	/* Handle MDEV-12873 InnoDB SYS_TABLES.TYPE incompatibility
1206 	for PAGE_COMPRESSED=YES in MariaDB 10.2.2 to 10.2.6.
1207 
1208 	MariaDB 10.2.2 introduced the SHARED_SPACE flag from MySQL 5.7,
1209 	shifting the flags PAGE_COMPRESSION, PAGE_COMPRESSION_LEVEL,
1210 	ATOMIC_WRITES (repurposed to NO_ROLLBACK in 10.3.1) by one bit.
1211 	The SHARED_SPACE flag would always
1212 	be written as 0 by MariaDB, because MariaDB does not support
1213 	CREATE TABLESPACE or CREATE TABLE...TABLESPACE for InnoDB.
1214 
1215 	So, instead of the bits AALLLLCxxxxxxx we would have
1216 	AALLLLC0xxxxxxx if the table was created with MariaDB 10.2.2
1217 	to 10.2.6. (AA=ATOMIC_WRITES, LLLL=PAGE_COMPRESSION_LEVEL,
1218 	C=PAGE_COMPRESSED, xxxxxxx=7 bits that were not moved.)
1219 
1220 	The case LLLLC=00000 is not a problem. The problem is the case
1221 	AALLLL10DB00001 where D is the (mostly ignored) DATA_DIRECTORY
1222 	flag and B is the ATOMIC_BLOBS flag (1 for ROW_FORMAT=DYNAMIC
1223 	and 0 for ROW_FORMAT=COMPACT in this case). Other low-order
1224 	bits must be so, because PAGE_COMPRESSED=YES is only allowed
1225 	for ROW_FORMAT=DYNAMIC and ROW_FORMAT=COMPACT, not for
1226 	ROW_FORMAT=REDUNDANT or ROW_FORMAT=COMPRESSED.
1227 
1228 	Starting with MariaDB 10.2.4, the flags would be
1229 	00LLLL10DB00001, because ATOMIC_WRITES is always written as 0.
1230 
1231 	We will concentrate on the PAGE_COMPRESSION_LEVEL and
1232 	PAGE_COMPRESSED=YES. PAGE_COMPRESSED=NO implies
1233 	PAGE_COMPRESSION_LEVEL=0, and in that case all the affected
1234 	bits will be 0. For PAGE_COMPRESSED=YES, the values 1..9 are
1235 	allowed for PAGE_COMPRESSION_LEVEL. That is, we must interpret
1236 	the bits AALLLL10DB00001 as AALLLL1DB00001.
1237 
1238 	If someone created a table in MariaDB 10.2.2 or 10.2.3 with
1239 	the attribute ATOMIC_WRITES=OFF (value 2) and without
1240 	PAGE_COMPRESSED=YES or PAGE_COMPRESSION_LEVEL, that should be
1241 	rejected. The value ATOMIC_WRITES=ON (1) would look like
1242 	ATOMIC_WRITES=OFF, but it would be ignored starting with
1243 	MariaDB 10.2.4. */
1244 	compile_time_assert(DICT_TF_POS_PAGE_COMPRESSION == 7);
1245 	compile_time_assert(DICT_TF_POS_UNUSED == 14);
1246 
1247 	if ((type & 0x19f) != 0x101) {
1248 		/* The table cannot have been created with MariaDB
1249 		10.2.2 to 10.2.6, because they would write the
1250 		low-order bits of SYS_TABLES.TYPE as 0b10xx00001 for
1251 		PAGE_COMPRESSED=YES. No adjustment is applicable. */
1252 	} else if (type >= 3 << 13) {
1253 		/* 10.2.2 and 10.2.3 write ATOMIC_WRITES less than 3,
1254 		and no other flags above that can be set for the
1255 		SYS_TABLES.TYPE to be in the 10.2.2..10.2.6 format.
1256 		This would in any case be invalid format for 10.2 and
1257 		earlier releases. */
1258 		ut_ad(!dict_sys_tables_type_valid(type, true));
1259 	} else {
1260 		/* SYS_TABLES.TYPE is of the form AALLLL10DB00001.  We
1261 		must still validate that the LLLL bits are between 0
1262 		and 9 before we can discard the extraneous 0 bit. */
1263 		ut_ad(!DICT_TF_GET_PAGE_COMPRESSION(type));
1264 
1265 		if ((((type >> 9) & 0xf) - 1) < 9) {
1266 			ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) & 1);
1267 
1268 			type = (type & 0x7fU) | (type >> 1 & ~0x7fU);
1269 
1270 			ut_ad(DICT_TF_GET_PAGE_COMPRESSION(type));
1271 			ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) >= 1);
1272 			ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) <= 9);
1273 		} else {
1274 			ut_ad(!dict_sys_tables_type_valid(type, true));
1275 		}
1276 	}
1277 
1278 	/* The low order bit of SYS_TABLES.TYPE is always set to 1. But in
1279 	dict_table_t::flags the low order bit is used to determine if the
1280 	ROW_FORMAT=REDUNDANT (0) or anything else (1).
1281 	Read the 4 byte N_COLS field and look at the high order bit.  It
1282 	should be set for COMPACT and later.  It should not be set for
1283 	REDUNDANT. */
1284 	field = rec_get_nth_field_old(
1285 		rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1286 	ut_a(len == 4);
1287 	*n_cols = mach_read_from_4(field);
1288 
1289 	const bool not_redundant = 0 != (*n_cols & DICT_N_COLS_COMPACT);
1290 
1291 	if (!dict_sys_tables_type_valid(type, not_redundant)) {
1292 		ib::error() << "Table " << table_name << " in InnoDB"
1293 			" data dictionary contains invalid flags."
1294 			" SYS_TABLES.TYPE=" << type <<
1295 			" SYS_TABLES.N_COLS=" << *n_cols;
1296 		return(false);
1297 	}
1298 
1299 	*flags = dict_sys_tables_type_to_tf(type, not_redundant);
1300 
1301 	/* For tables created before MySQL 4.1, there may be
1302 	garbage in SYS_TABLES.MIX_LEN where flags2 are found. Such tables
1303 	would always be in ROW_FORMAT=REDUNDANT which do not have the
1304 	high bit set in n_cols, and flags would be zero.
1305 	MySQL 4.1 was the first version to support innodb_file_per_table,
1306 	that is, *space_id != 0. */
1307 	if (not_redundant || *space_id != 0 || *n_cols & DICT_N_COLS_COMPACT) {
1308 
1309 		/* Get flags2 from SYS_TABLES.MIX_LEN */
1310 		field = rec_get_nth_field_old(
1311 			rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1312 		*flags2 = mach_read_from_4(field);
1313 
1314 		if (!dict_tf2_is_valid(*flags, *flags2)) {
1315 			ib::error() << "Table " << table_name << " in InnoDB"
1316 				" data dictionary contains invalid flags."
1317 				" SYS_TABLES.TYPE=" << type
1318 				<< " SYS_TABLES.MIX_LEN=" << *flags2;
1319 			return(false);
1320 		}
1321 
1322 		/* DICT_TF2_FTS will be set when indexes are being loaded */
1323 		*flags2 &= ~DICT_TF2_FTS;
1324 
1325 		/* Now that we have used this bit, unset it. */
1326 		*n_cols &= ~DICT_N_COLS_COMPACT;
1327 	} else {
1328 		*flags2 = 0;
1329 	}
1330 
1331 	return(true);
1332 }
1333 
1334 /** Load and check each non-predefined tablespace mentioned in SYS_TABLES.
1335 Search SYS_TABLES and check each tablespace mentioned that has not
1336 already been added to the fil_system.  If it is valid, add it to the
1337 file_system list.
1338 @return the highest space ID found. */
1339 static ulint dict_check_sys_tables()
1340 {
1341 	ulint		max_space_id = 0;
1342 	btr_pcur_t	pcur;
1343 	const rec_t*	rec;
1344 	mtr_t		mtr;
1345 
1346 	DBUG_ENTER("dict_check_sys_tables");
1347 
1348 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X));
1349 	ut_ad(mutex_own(&dict_sys->mutex));
1350 
1351 	mtr_start(&mtr);
1352 
1353 	/* Before traversing SYS_TABLES, let's make sure we have
1354 	SYS_TABLESPACES and SYS_DATAFILES loaded. */
1355 	dict_table_t*	sys_tablespaces;
1356 	dict_table_t*	sys_datafiles;
1357 	sys_tablespaces = dict_table_get_low("SYS_TABLESPACES");
1358 	ut_a(sys_tablespaces != NULL);
1359 	sys_datafiles = dict_table_get_low("SYS_DATAFILES");
1360 	ut_a(sys_datafiles != NULL);
1361 
1362 	for (rec = dict_startscan_system(&pcur, &mtr, SYS_TABLES);
1363 	     rec != NULL;
1364 	     mtr.commit(), mtr.start(),
1365 	     rec = dict_getnext_system(&pcur, &mtr)) {
1366 		const byte*	field;
1367 		ulint		len;
1368 		table_id_t	table_id;
1369 		ulint		space_id;
1370 		ulint		n_cols;
1371 		ulint		flags;
1372 		ulint		flags2;
1373 
1374 		/* If a table record is not useable, ignore it and continue
1375 		on to the next record. Error messages were logged. */
1376 		if (dict_sys_tables_rec_check(rec) != NULL) {
1377 			continue;
1378 		}
1379 
1380 		/* Copy the table name from rec */
1381 		field = rec_get_nth_field_old(
1382 			rec, DICT_FLD__SYS_TABLES__NAME, &len);
1383 
1384 		table_name_t table_name(mem_strdupl((char*) field, len));
1385 		DBUG_PRINT("dict_check_sys_tables",
1386 			   ("name: %p, '%s'", table_name.m_name,
1387 			    table_name.m_name));
1388 
1389 		if (!dict_sys_tables_rec_read(rec, table_name,
1390 					      &table_id, &space_id,
1391 					      &n_cols, &flags, &flags2)
1392 		    || space_id == TRX_SYS_SPACE) {
1393 next:
1394 			ut_free(table_name.m_name);
1395 			continue;
1396 		}
1397 
1398 		if (strstr(table_name.m_name, "/" TEMP_FILE_PREFIX "-")) {
1399 			/* This table will be dropped by
1400 			row_mysql_drop_garbage_tables().
1401 			We do not care if the file exists. */
1402 			goto next;
1403 		}
1404 
1405 		if (flags2 & DICT_TF2_DISCARDED) {
1406 			ib::info() << "Ignoring tablespace for " << table_name
1407 				<< " because the DISCARD flag is set .";
1408 			goto next;
1409 		}
1410 
1411 		/* For tables or partitions using .ibd files, the flag
1412 		DICT_TF2_USE_FILE_PER_TABLE was not set in MIX_LEN
1413 		before MySQL 5.6.5. The flag should not have been
1414 		introduced in persistent storage. MariaDB will keep
1415 		setting the flag when writing SYS_TABLES entries for
1416 		newly created or rebuilt tables or partitions, but
1417 		will otherwise ignore the flag. */
1418 
1419 		/* Now that we have the proper name for this tablespace,
1420 		look to see if it is already in the tablespace cache. */
1421 		if (const fil_space_t* space
1422 		    = fil_space_for_table_exists_in_mem(
1423 			    space_id, table_name.m_name, flags)) {
1424 			/* Recovery can open a datafile that does not
1425 			match SYS_DATAFILES.  If they don't match, update
1426 			SYS_DATAFILES. */
1427 			char *dict_path = dict_get_first_path(space_id);
1428 			const char *fil_path = space->chain.start->name;
1429 			if (dict_path
1430 			    && strcmp(dict_path, fil_path)) {
1431 				dict_update_filepath(space_id, fil_path);
1432 			}
1433 			ut_free(dict_path);
1434 			ut_free(table_name.m_name);
1435 			continue;
1436 		}
1437 
1438 		/* Set the expected filepath from the data dictionary.
1439 		If the file is found elsewhere (from an ISL or the default
1440 		location) or this path is the same file but looks different,
1441 		fil_ibd_open() will update the dictionary with what is
1442 		opened. */
1443 		char*	filepath = dict_get_first_path(space_id);
1444 
1445 		/* Check that the .ibd file exists. */
1446 		if (!fil_ibd_open(
1447 			    false,
1448 			    !srv_read_only_mode && srv_log_file_size != 0,
1449 			    FIL_TYPE_TABLESPACE,
1450 			    space_id, dict_tf_to_fsp_flags(flags),
1451 			    table_name, filepath)) {
1452 			ib::warn() << "Ignoring tablespace for "
1453 				<< table_name
1454 				<< " because it could not be opened.";
1455 		}
1456 
1457 		max_space_id = ut_max(max_space_id, space_id);
1458 
1459 		ut_free(table_name.m_name);
1460 		ut_free(filepath);
1461 	}
1462 
1463 	mtr_commit(&mtr);
1464 
1465 	DBUG_RETURN(max_space_id);
1466 }
1467 
1468 /** Check each tablespace found in the data dictionary.
1469 Then look at each table defined in SYS_TABLES that has a space_id > 0
1470 to find all the file-per-table tablespaces.
1471 
1472 In a crash recovery we already have some tablespace objects created from
1473 processing the REDO log.  Any other tablespace in SYS_TABLESPACES not
1474 previously used in recovery will be opened here.  We will compare the
1475 space_id information in the data dictionary to what we find in the
1476 tablespace file. In addition, more validation will be done if recovery
1477 was needed and force_recovery is not set.
1478 
1479 We also scan the biggest space id, and store it to fil_system. */
1480 void dict_check_tablespaces_and_store_max_id()
1481 {
1482 	mtr_t	mtr;
1483 
1484 	DBUG_ENTER("dict_check_tablespaces_and_store_max_id");
1485 
1486 	rw_lock_x_lock(&dict_operation_lock);
1487 	mutex_enter(&dict_sys->mutex);
1488 
1489 	/* Initialize the max space_id from sys header */
1490 	mtr_start(&mtr);
1491 	ulint	max_space_id = mtr_read_ulint(
1492 		dict_hdr_get(&mtr) + DICT_HDR_MAX_SPACE_ID,
1493 		MLOG_4BYTES, &mtr);
1494 	mtr_commit(&mtr);
1495 
1496 	fil_set_max_space_id_if_bigger(max_space_id);
1497 
1498 	/* Open all tablespaces referenced in SYS_TABLES.
1499 	This will update SYS_TABLESPACES and SYS_DATAFILES if it
1500 	finds any file-per-table tablespaces not already there. */
1501 	max_space_id = dict_check_sys_tables();
1502 	fil_set_max_space_id_if_bigger(max_space_id);
1503 
1504 	mutex_exit(&dict_sys->mutex);
1505 	rw_lock_x_unlock(&dict_operation_lock);
1506 
1507 	DBUG_VOID_RETURN;
1508 }
1509 
1510 /** Error message for a delete-marked record in dict_load_column_low() */
1511 static const char* dict_load_column_del = "delete-marked record in SYS_COLUMN";
1512 
1513 /** Load a table column definition from a SYS_COLUMNS record to dict_table_t.
1514 @return	error message
1515 @retval	NULL on success */
1516 static
1517 const char*
1518 dict_load_column_low(
1519 	dict_table_t*	table,		/*!< in/out: table, could be NULL
1520 					if we just populate a dict_column_t
1521 					struct with information from
1522 					a SYS_COLUMNS record */
1523 	mem_heap_t*	heap,		/*!< in/out: memory heap
1524 					for temporary storage */
1525 	dict_col_t*	column,		/*!< out: dict_column_t to fill,
1526 					or NULL if table != NULL */
1527 	table_id_t*	table_id,	/*!< out: table id */
1528 	const char**	col_name,	/*!< out: column name */
1529 	const rec_t*	rec,		/*!< in: SYS_COLUMNS record */
1530 	ulint*		nth_v_col)	/*!< out: if not NULL, this
1531 					records the "n" of "nth" virtual
1532 					column */
1533 {
1534 	char*		name;
1535 	const byte*	field;
1536 	ulint		len;
1537 	ulint		mtype;
1538 	ulint		prtype;
1539 	ulint		col_len;
1540 	ulint		pos;
1541 	ulint		num_base;
1542 
1543 	ut_ad(!table == !!column);
1544 
1545 	if (rec_get_deleted_flag(rec, 0)) {
1546 		return(dict_load_column_del);
1547 	}
1548 
1549 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_COLUMNS) {
1550 		return("wrong number of columns in SYS_COLUMNS record");
1551 	}
1552 
1553 	field = rec_get_nth_field_old(
1554 		rec, DICT_FLD__SYS_COLUMNS__TABLE_ID, &len);
1555 	if (len != 8) {
1556 err_len:
1557 		return("incorrect column length in SYS_COLUMNS");
1558 	}
1559 
1560 	if (table_id) {
1561 		*table_id = mach_read_from_8(field);
1562 	} else if (table->id != mach_read_from_8(field)) {
1563 		return("SYS_COLUMNS.TABLE_ID mismatch");
1564 	}
1565 
1566 	field = rec_get_nth_field_old(
1567 		rec, DICT_FLD__SYS_COLUMNS__POS, &len);
1568 	if (len != 4) {
1569 		goto err_len;
1570 	}
1571 
1572 	pos = mach_read_from_4(field);
1573 
1574 	rec_get_nth_field_offs_old(
1575 		rec, DICT_FLD__SYS_COLUMNS__DB_TRX_ID, &len);
1576 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1577 		goto err_len;
1578 	}
1579 	rec_get_nth_field_offs_old(
1580 		rec, DICT_FLD__SYS_COLUMNS__DB_ROLL_PTR, &len);
1581 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1582 		goto err_len;
1583 	}
1584 
1585 	field = rec_get_nth_field_old(
1586 		rec, DICT_FLD__SYS_COLUMNS__NAME, &len);
1587 	if (len == 0 || len == UNIV_SQL_NULL) {
1588 		goto err_len;
1589 	}
1590 
1591 	name = mem_heap_strdupl(heap, (const char*) field, len);
1592 
1593 	if (col_name) {
1594 		*col_name = name;
1595 	}
1596 
1597 	field = rec_get_nth_field_old(
1598 		rec, DICT_FLD__SYS_COLUMNS__MTYPE, &len);
1599 	if (len != 4) {
1600 		goto err_len;
1601 	}
1602 
1603 	mtype = mach_read_from_4(field);
1604 
1605 	field = rec_get_nth_field_old(
1606 		rec, DICT_FLD__SYS_COLUMNS__PRTYPE, &len);
1607 	if (len != 4) {
1608 		goto err_len;
1609 	}
1610 	prtype = mach_read_from_4(field);
1611 
1612 	if (dtype_get_charset_coll(prtype) == 0
1613 	    && dtype_is_string_type(mtype)) {
1614 		/* The table was created with < 4.1.2. */
1615 
1616 		if (dtype_is_binary_string_type(mtype, prtype)) {
1617 			/* Use the binary collation for
1618 			string columns of binary type. */
1619 
1620 			prtype = dtype_form_prtype(
1621 				prtype,
1622 				DATA_MYSQL_BINARY_CHARSET_COLL);
1623 		} else {
1624 			/* Use the default charset for
1625 			other than binary columns. */
1626 
1627 			prtype = dtype_form_prtype(
1628 				prtype,
1629 				data_mysql_default_charset_coll);
1630 		}
1631 	}
1632 
1633 	if (table && table->n_def != pos && !(prtype & DATA_VIRTUAL)) {
1634 		return("SYS_COLUMNS.POS mismatch");
1635 	}
1636 
1637 	field = rec_get_nth_field_old(
1638 		rec, DICT_FLD__SYS_COLUMNS__LEN, &len);
1639 	if (len != 4) {
1640 		goto err_len;
1641 	}
1642 	col_len = mach_read_from_4(field);
1643 	field = rec_get_nth_field_old(
1644 		rec, DICT_FLD__SYS_COLUMNS__PREC, &len);
1645 	if (len != 4) {
1646 		goto err_len;
1647 	}
1648 	num_base = mach_read_from_4(field);
1649 
1650 	if (table) {
1651 		if (prtype & DATA_VIRTUAL) {
1652 #ifdef UNIV_DEBUG
1653 			dict_v_col_t*	vcol =
1654 #endif
1655 			dict_mem_table_add_v_col(
1656 				table, heap, name, mtype,
1657 				prtype, col_len,
1658 				dict_get_v_col_mysql_pos(pos), num_base);
1659 			ut_ad(vcol->v_pos == dict_get_v_col_pos(pos));
1660 		} else {
1661 			ut_ad(num_base == 0);
1662 			dict_mem_table_add_col(table, heap, name, mtype,
1663 					       prtype, col_len);
1664 		}
1665 	} else {
1666 		dict_mem_fill_column_struct(column, pos, mtype,
1667 					    prtype, col_len);
1668 	}
1669 
1670 	/* Report the virtual column number */
1671 	if ((prtype & DATA_VIRTUAL) && nth_v_col != NULL) {
1672 		*nth_v_col = dict_get_v_col_pos(pos);
1673 	}
1674 
1675 	return(NULL);
1676 }
1677 
1678 /** Error message for a delete-marked record in dict_load_virtual_low() */
1679 static const char* dict_load_virtual_del = "delete-marked record in SYS_VIRTUAL";
1680 
1681 /** Load a virtual column "mapping" (to base columns) information
1682 from a SYS_VIRTUAL record
1683 @param[in,out]	table		table
1684 @param[in,out]	column		mapped base column's dict_column_t
1685 @param[in,out]	table_id	table id
1686 @param[in,out]	pos		virtual column position
1687 @param[in,out]	base_pos	base column position
1688 @param[in]	rec		SYS_VIRTUAL record
1689 @return	error message
1690 @retval	NULL on success */
1691 static
1692 const char*
1693 dict_load_virtual_low(
1694 	dict_table_t*	table,
1695 	dict_col_t**	column,
1696 	table_id_t*	table_id,
1697 	ulint*		pos,
1698 	ulint*		base_pos,
1699 	const rec_t*	rec)
1700 {
1701 	const byte*	field;
1702 	ulint		len;
1703 	ulint		base;
1704 
1705 	if (rec_get_deleted_flag(rec, 0)) {
1706 		return(dict_load_virtual_del);
1707 	}
1708 
1709 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_VIRTUAL) {
1710 		return("wrong number of columns in SYS_VIRTUAL record");
1711 	}
1712 
1713 	field = rec_get_nth_field_old(
1714 		rec, DICT_FLD__SYS_VIRTUAL__TABLE_ID, &len);
1715 	if (len != 8) {
1716 err_len:
1717 		return("incorrect column length in SYS_VIRTUAL");
1718 	}
1719 
1720 	if (table_id != NULL) {
1721 		*table_id = mach_read_from_8(field);
1722 	} else if (table->id != mach_read_from_8(field)) {
1723 		return("SYS_VIRTUAL.TABLE_ID mismatch");
1724 	}
1725 
1726 	field = rec_get_nth_field_old(
1727 		rec, DICT_FLD__SYS_VIRTUAL__POS, &len);
1728 	if (len != 4) {
1729 		goto err_len;
1730 	}
1731 
1732 	if (pos != NULL) {
1733 		*pos = mach_read_from_4(field);
1734 	}
1735 
1736 	field = rec_get_nth_field_old(
1737 		rec, DICT_FLD__SYS_VIRTUAL__BASE_POS, &len);
1738 	if (len != 4) {
1739 		goto err_len;
1740 	}
1741 
1742 	base = mach_read_from_4(field);
1743 
1744 	if (base_pos != NULL) {
1745 		*base_pos = base;
1746 	}
1747 
1748 	rec_get_nth_field_offs_old(
1749 		rec, DICT_FLD__SYS_VIRTUAL__DB_TRX_ID, &len);
1750 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1751 		goto err_len;
1752 	}
1753 
1754 	rec_get_nth_field_offs_old(
1755 		rec, DICT_FLD__SYS_VIRTUAL__DB_ROLL_PTR, &len);
1756 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1757 		goto err_len;
1758 	}
1759 
1760 	if (column != NULL) {
1761 		*column = dict_table_get_nth_col(table, base);
1762 	}
1763 
1764 	return(NULL);
1765 }
1766 
1767 /********************************************************************//**
1768 Loads definitions for table columns. */
1769 static
1770 void
1771 dict_load_columns(
1772 /*==============*/
1773 	dict_table_t*	table,	/*!< in/out: table */
1774 	mem_heap_t*	heap)	/*!< in/out: memory heap
1775 				for temporary storage */
1776 {
1777 	dict_table_t*	sys_columns;
1778 	dict_index_t*	sys_index;
1779 	btr_pcur_t	pcur;
1780 	dtuple_t*	tuple;
1781 	dfield_t*	dfield;
1782 	const rec_t*	rec;
1783 	byte*		buf;
1784 	ulint		i;
1785 	mtr_t		mtr;
1786 	ulint		n_skipped = 0;
1787 
1788 	ut_ad(mutex_own(&dict_sys->mutex));
1789 
1790 	mtr_start(&mtr);
1791 
1792 	sys_columns = dict_table_get_low("SYS_COLUMNS");
1793 	sys_index = UT_LIST_GET_FIRST(sys_columns->indexes);
1794 	ut_ad(!dict_table_is_comp(sys_columns));
1795 
1796 	ut_ad(name_of_col_is(sys_columns, sys_index,
1797 			     DICT_FLD__SYS_COLUMNS__NAME, "NAME"));
1798 	ut_ad(name_of_col_is(sys_columns, sys_index,
1799 			     DICT_FLD__SYS_COLUMNS__PREC, "PREC"));
1800 
1801 	tuple = dtuple_create(heap, 1);
1802 	dfield = dtuple_get_nth_field(tuple, 0);
1803 
1804 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
1805 	mach_write_to_8(buf, table->id);
1806 
1807 	dfield_set_data(dfield, buf, 8);
1808 	dict_index_copy_types(tuple, sys_index, 1);
1809 
1810 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
1811 				  BTR_SEARCH_LEAF, &pcur, &mtr);
1812 
1813 	ut_ad(table->n_t_cols == static_cast<ulint>(
1814 	      table->n_cols) + static_cast<ulint>(table->n_v_cols));
1815 
1816 	for (i = 0;
1817 	     i + DATA_N_SYS_COLS < table->n_t_cols + n_skipped;
1818 	     i++) {
1819 		const char*	err_msg;
1820 		const char*	name = NULL;
1821 		ulint		nth_v_col = ULINT_UNDEFINED;
1822 
1823 		rec = btr_pcur_get_rec(&pcur);
1824 
1825 		ut_a(btr_pcur_is_on_user_rec(&pcur));
1826 
1827 		err_msg = dict_load_column_low(table, heap, NULL, NULL,
1828 					       &name, rec, &nth_v_col);
1829 
1830 		if (err_msg == dict_load_column_del) {
1831 			n_skipped++;
1832 			goto next_rec;
1833 		} else if (err_msg) {
1834 			ib::fatal() << err_msg;
1835 		}
1836 
1837 		/* Note: Currently we have one DOC_ID column that is
1838 		shared by all FTS indexes on a table. And only non-virtual
1839 		column can be used for FULLTEXT index */
1840 		if (innobase_strcasecmp(name,
1841 					FTS_DOC_ID_COL_NAME) == 0
1842 		    && nth_v_col == ULINT_UNDEFINED) {
1843 			dict_col_t*	col;
1844 			/* As part of normal loading of tables the
1845 			table->flag is not set for tables with FTS
1846 			till after the FTS indexes are loaded. So we
1847 			create the fts_t instance here if there isn't
1848 			one already created.
1849 
1850 			This case does not arise for table create as
1851 			the flag is set before the table is created. */
1852 			if (table->fts == NULL) {
1853 				table->fts = fts_create(table);
1854 				fts_optimize_add_table(table);
1855 			}
1856 
1857 			ut_a(table->fts->doc_col == ULINT_UNDEFINED);
1858 
1859 			col = dict_table_get_nth_col(table, i - n_skipped);
1860 
1861 			ut_ad(col->len == sizeof(doc_id_t));
1862 
1863 			if (col->prtype & DATA_FTS_DOC_ID) {
1864 				DICT_TF2_FLAG_SET(
1865 					table, DICT_TF2_FTS_HAS_DOC_ID);
1866 				DICT_TF2_FLAG_UNSET(
1867 					table, DICT_TF2_FTS_ADD_DOC_ID);
1868 			}
1869 
1870 			table->fts->doc_col = i - n_skipped;
1871 		}
1872 next_rec:
1873 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1874 	}
1875 
1876 	btr_pcur_close(&pcur);
1877 	mtr_commit(&mtr);
1878 }
1879 
1880 /** Loads SYS_VIRTUAL info for one virtual column
1881 @param[in,out]	table		table
1882 @param[in]	nth_v_col	virtual column sequence num
1883 @param[in,out]	v_col		virtual column
1884 @param[in,out]	heap		memory heap
1885 */
1886 static
1887 void
1888 dict_load_virtual_one_col(
1889 	dict_table_t*	table,
1890 	ulint		nth_v_col,
1891 	dict_v_col_t*	v_col,
1892 	mem_heap_t*	heap)
1893 {
1894 	dict_table_t*	sys_virtual;
1895 	dict_index_t*	sys_virtual_index;
1896 	btr_pcur_t	pcur;
1897 	dtuple_t*	tuple;
1898 	dfield_t*	dfield;
1899 	const rec_t*	rec;
1900 	byte*		buf;
1901 	ulint		i = 0;
1902 	mtr_t		mtr;
1903 	ulint		skipped = 0;
1904 
1905 	ut_ad(mutex_own(&dict_sys->mutex));
1906 
1907 	if (v_col->num_base == 0) {
1908 		return;
1909 	}
1910 
1911 	mtr_start(&mtr);
1912 
1913 	sys_virtual = dict_table_get_low("SYS_VIRTUAL");
1914 	sys_virtual_index = UT_LIST_GET_FIRST(sys_virtual->indexes);
1915 	ut_ad(!dict_table_is_comp(sys_virtual));
1916 
1917 	ut_ad(name_of_col_is(sys_virtual, sys_virtual_index,
1918 			     DICT_FLD__SYS_VIRTUAL__POS, "POS"));
1919 
1920 	tuple = dtuple_create(heap, 2);
1921 
1922 	/* table ID field */
1923 	dfield = dtuple_get_nth_field(tuple, 0);
1924 
1925 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
1926 	mach_write_to_8(buf, table->id);
1927 
1928 	dfield_set_data(dfield, buf, 8);
1929 
1930 	/* virtual column pos field */
1931 	dfield = dtuple_get_nth_field(tuple, 1);
1932 
1933 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1934 	ulint	vcol_pos = dict_create_v_col_pos(nth_v_col, v_col->m_col.ind);
1935 	mach_write_to_4(buf, vcol_pos);
1936 
1937 	dfield_set_data(dfield, buf, 4);
1938 
1939 	dict_index_copy_types(tuple, sys_virtual_index, 2);
1940 
1941 	btr_pcur_open_on_user_rec(sys_virtual_index, tuple, PAGE_CUR_GE,
1942 				  BTR_SEARCH_LEAF, &pcur, &mtr);
1943 
1944 	for (i = 0; i < v_col->num_base + skipped; i++) {
1945 		const char*	err_msg;
1946 		ulint		pos;
1947 
1948 		ut_ad(btr_pcur_is_on_user_rec(&pcur));
1949 
1950 		rec = btr_pcur_get_rec(&pcur);
1951 
1952 		ut_a(btr_pcur_is_on_user_rec(&pcur));
1953 
1954 		err_msg = dict_load_virtual_low(table,
1955 						&v_col->base_col[i - skipped],
1956 						NULL,
1957 					        &pos, NULL, rec);
1958 
1959 		if (err_msg) {
1960 			if (err_msg != dict_load_virtual_del) {
1961 				ib::fatal() << err_msg;
1962 			} else {
1963 				skipped++;
1964 			}
1965 		} else {
1966 			ut_ad(pos == vcol_pos);
1967 		}
1968 
1969 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1970 	}
1971 
1972 	btr_pcur_close(&pcur);
1973 	mtr_commit(&mtr);
1974 }
1975 
1976 /** Loads info from SYS_VIRTUAL for virtual columns.
1977 @param[in,out]	table	table
1978 @param[in]	heap	memory heap
1979 */
1980 static
1981 void
1982 dict_load_virtual(
1983 	dict_table_t*	table,
1984 	mem_heap_t*	heap)
1985 {
1986 	for (ulint i = 0; i < table->n_v_cols; i++) {
1987 		dict_v_col_t*	v_col = dict_table_get_nth_v_col(table, i);
1988 
1989 		dict_load_virtual_one_col(table, i, v_col, heap);
1990 	}
1991 }
1992 
1993 /** Error message for a delete-marked record in dict_load_field_low() */
1994 static const char* dict_load_field_del = "delete-marked record in SYS_FIELDS";
1995 
1996 /** Load an index field definition from a SYS_FIELDS record to dict_index_t.
1997 @return	error message
1998 @retval	NULL on success */
1999 static
2000 const char*
2001 dict_load_field_low(
2002 	byte*		index_id,	/*!< in/out: index id (8 bytes)
2003 					an "in" value if index != NULL
2004 					and "out" if index == NULL */
2005 	dict_index_t*	index,		/*!< in/out: index, could be NULL
2006 					if we just populate a dict_field_t
2007 					struct with information from
2008 					a SYS_FIELDS record */
2009 	dict_field_t*	sys_field,	/*!< out: dict_field_t to be
2010 					filled */
2011 	ulint*		pos,		/*!< out: Field position */
2012 	byte*		last_index_id,	/*!< in: last index id */
2013 	mem_heap_t*	heap,		/*!< in/out: memory heap
2014 					for temporary storage */
2015 	const rec_t*	rec)		/*!< in: SYS_FIELDS record */
2016 {
2017 	const byte*	field;
2018 	ulint		len;
2019 	unsigned	pos_and_prefix_len;
2020 	unsigned	prefix_len;
2021 	bool		first_field;
2022 	ulint		position;
2023 
2024 	/* Either index or sys_field is supplied, not both */
2025 	ut_a((!index) || (!sys_field));
2026 
2027 	if (rec_get_deleted_flag(rec, 0)) {
2028 		return(dict_load_field_del);
2029 	}
2030 
2031 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FIELDS) {
2032 		return("wrong number of columns in SYS_FIELDS record");
2033 	}
2034 
2035 	field = rec_get_nth_field_old(
2036 		rec, DICT_FLD__SYS_FIELDS__INDEX_ID, &len);
2037 	if (len != 8) {
2038 err_len:
2039 		return("incorrect column length in SYS_FIELDS");
2040 	}
2041 
2042 	if (!index) {
2043 		ut_a(last_index_id);
2044 		memcpy(index_id, (const char*) field, 8);
2045 		first_field = memcmp(index_id, last_index_id, 8);
2046 	} else {
2047 		first_field = (index->n_def == 0);
2048 		if (memcmp(field, index_id, 8)) {
2049 			return("SYS_FIELDS.INDEX_ID mismatch");
2050 		}
2051 	}
2052 
2053 	/* The next field stores the field position in the index and a
2054 	possible column prefix length if the index field does not
2055 	contain the whole column. The storage format is like this: if
2056 	there is at least one prefix field in the index, then the HIGH
2057 	2 bytes contain the field number (index->n_def) and the low 2
2058 	bytes the prefix length for the field. Otherwise the field
2059 	number (index->n_def) is contained in the 2 LOW bytes. */
2060 
2061 	field = rec_get_nth_field_old(
2062 		rec, DICT_FLD__SYS_FIELDS__POS, &len);
2063 	if (len != 4) {
2064 		goto err_len;
2065 	}
2066 
2067 	pos_and_prefix_len = mach_read_from_4(field);
2068 
2069 	if (index && UNIV_UNLIKELY
2070 	    ((pos_and_prefix_len & 0xFFFFUL) != index->n_def
2071 	     && (pos_and_prefix_len >> 16 & 0xFFFF) != index->n_def)) {
2072 		return("SYS_FIELDS.POS mismatch");
2073 	}
2074 
2075 	if (first_field || pos_and_prefix_len > 0xFFFFUL) {
2076 		prefix_len = pos_and_prefix_len & 0xFFFFUL;
2077 		position = (pos_and_prefix_len & 0xFFFF0000UL)  >> 16;
2078 	} else {
2079 		prefix_len = 0;
2080 		position = pos_and_prefix_len & 0xFFFFUL;
2081 	}
2082 
2083 	rec_get_nth_field_offs_old(
2084 		rec, DICT_FLD__SYS_FIELDS__DB_TRX_ID, &len);
2085 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2086 		goto err_len;
2087 	}
2088 	rec_get_nth_field_offs_old(
2089 		rec, DICT_FLD__SYS_FIELDS__DB_ROLL_PTR, &len);
2090 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2091 		goto err_len;
2092 	}
2093 
2094 	field = rec_get_nth_field_old(
2095 		rec, DICT_FLD__SYS_FIELDS__COL_NAME, &len);
2096 	if (len == 0 || len == UNIV_SQL_NULL) {
2097 		goto err_len;
2098 	}
2099 
2100 	if (index) {
2101 		dict_mem_index_add_field(
2102 			index, mem_heap_strdupl(heap, (const char*) field, len),
2103 			prefix_len);
2104 	} else {
2105 		ut_a(sys_field);
2106 		ut_a(pos);
2107 
2108 		sys_field->name = mem_heap_strdupl(
2109 			heap, (const char*) field, len);
2110 		sys_field->prefix_len = prefix_len;
2111 		*pos = position;
2112 	}
2113 
2114 	return(NULL);
2115 }
2116 
2117 /********************************************************************//**
2118 Loads definitions for index fields.
2119 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption */
2120 static
2121 ulint
2122 dict_load_fields(
2123 /*=============*/
2124 	dict_index_t*	index,	/*!< in/out: index whose fields to load */
2125 	mem_heap_t*	heap)	/*!< in: memory heap for temporary storage */
2126 {
2127 	dict_table_t*	sys_fields;
2128 	dict_index_t*	sys_index;
2129 	btr_pcur_t	pcur;
2130 	dtuple_t*	tuple;
2131 	dfield_t*	dfield;
2132 	const rec_t*	rec;
2133 	byte*		buf;
2134 	ulint		i;
2135 	mtr_t		mtr;
2136 	dberr_t		error;
2137 
2138 	ut_ad(mutex_own(&dict_sys->mutex));
2139 
2140 	mtr_start(&mtr);
2141 
2142 	sys_fields = dict_table_get_low("SYS_FIELDS");
2143 	sys_index = UT_LIST_GET_FIRST(sys_fields->indexes);
2144 	ut_ad(!dict_table_is_comp(sys_fields));
2145 	ut_ad(name_of_col_is(sys_fields, sys_index,
2146 			     DICT_FLD__SYS_FIELDS__COL_NAME, "COL_NAME"));
2147 
2148 	tuple = dtuple_create(heap, 1);
2149 	dfield = dtuple_get_nth_field(tuple, 0);
2150 
2151 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2152 	mach_write_to_8(buf, index->id);
2153 
2154 	dfield_set_data(dfield, buf, 8);
2155 	dict_index_copy_types(tuple, sys_index, 1);
2156 
2157 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2158 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2159 	for (i = 0; i < index->n_fields; i++) {
2160 		const char* err_msg;
2161 
2162 		rec = btr_pcur_get_rec(&pcur);
2163 
2164 		ut_a(btr_pcur_is_on_user_rec(&pcur));
2165 
2166 		err_msg = dict_load_field_low(buf, index, NULL, NULL, NULL,
2167 					      heap, rec);
2168 
2169 		if (err_msg == dict_load_field_del) {
2170 			/* There could be delete marked records in
2171 			SYS_FIELDS because SYS_FIELDS.INDEX_ID can be
2172 			updated by ALTER TABLE ADD INDEX. */
2173 
2174 			goto next_rec;
2175 		} else if (err_msg) {
2176 			ib::error() << err_msg;
2177 			error = DB_CORRUPTION;
2178 			goto func_exit;
2179 		}
2180 next_rec:
2181 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2182 	}
2183 
2184 	error = DB_SUCCESS;
2185 func_exit:
2186 	btr_pcur_close(&pcur);
2187 	mtr_commit(&mtr);
2188 	return(error);
2189 }
2190 
2191 /** Error message for a delete-marked record in dict_load_index_low() */
2192 static const char* dict_load_index_del = "delete-marked record in SYS_INDEXES";
2193 /** Error message for table->id mismatch in dict_load_index_low() */
2194 static const char* dict_load_index_id_err = "SYS_INDEXES.TABLE_ID mismatch";
2195 /** Error message for SYS_TABLES flags mismatch in dict_load_table_low() */
2196 static const char* dict_load_table_flags = "incorrect flags in SYS_TABLES";
2197 
2198 /** Load an index definition from a SYS_INDEXES record to dict_index_t.
2199 If allocate=TRUE, we will create a dict_index_t structure and fill it
2200 accordingly. If allocated=FALSE, the dict_index_t will be supplied by
2201 the caller and filled with information read from the record.
2202 @return	error message
2203 @retval	NULL on success */
2204 static
2205 const char*
2206 dict_load_index_low(
2207 	byte*		table_id,	/*!< in/out: table id (8 bytes),
2208 					an "in" value if allocate=TRUE
2209 					and "out" when allocate=FALSE */
2210 	mem_heap_t*	heap,		/*!< in/out: temporary memory heap */
2211 	const rec_t*	rec,		/*!< in: SYS_INDEXES record */
2212 	ibool		allocate,	/*!< in: TRUE=allocate *index,
2213 					FALSE=fill in a pre-allocated
2214 					*index */
2215 	dict_index_t**	index)		/*!< out,own: index, or NULL */
2216 {
2217 	const byte*	field;
2218 	ulint		len;
2219 	ulint		name_len;
2220 	char*		name_buf;
2221 	index_id_t	id;
2222 	ulint		n_fields;
2223 	ulint		type;
2224 	unsigned	merge_threshold;
2225 
2226 	if (allocate) {
2227 		/* If allocate=TRUE, no dict_index_t will
2228 		be supplied. Initialize "*index" to NULL */
2229 		*index = NULL;
2230 	}
2231 
2232 	if (rec_get_deleted_flag(rec, 0)) {
2233 		return(dict_load_index_del);
2234 	}
2235 
2236 	if (rec_get_n_fields_old(rec) == DICT_NUM_FIELDS__SYS_INDEXES) {
2237 		/* MERGE_THRESHOLD exists */
2238 		field = rec_get_nth_field_old(
2239 			rec, DICT_FLD__SYS_INDEXES__MERGE_THRESHOLD, &len);
2240 		switch (len) {
2241 		case 4:
2242 			merge_threshold = mach_read_from_4(field);
2243 			break;
2244 		case UNIV_SQL_NULL:
2245 			merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2246 			break;
2247 		default:
2248 			return("incorrect MERGE_THRESHOLD length"
2249 			       " in SYS_INDEXES");
2250 		}
2251 	} else if (rec_get_n_fields_old(rec)
2252 		   == DICT_NUM_FIELDS__SYS_INDEXES - 1) {
2253 		/* MERGE_THRESHOLD doesn't exist */
2254 
2255 		merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2256 	} else {
2257 		return("wrong number of columns in SYS_INDEXES record");
2258 	}
2259 
2260 	field = rec_get_nth_field_old(
2261 		rec, DICT_FLD__SYS_INDEXES__TABLE_ID, &len);
2262 	if (len != 8) {
2263 err_len:
2264 		return("incorrect column length in SYS_INDEXES");
2265 	}
2266 
2267 	if (!allocate) {
2268 		/* We are reading a SYS_INDEXES record. Copy the table_id */
2269 		memcpy(table_id, (const char*) field, 8);
2270 	} else if (memcmp(field, table_id, 8)) {
2271 		/* Caller supplied table_id, verify it is the same
2272 		id as on the index record */
2273 		return(dict_load_index_id_err);
2274 	}
2275 
2276 	field = rec_get_nth_field_old(
2277 		rec, DICT_FLD__SYS_INDEXES__ID, &len);
2278 	if (len != 8) {
2279 		goto err_len;
2280 	}
2281 
2282 	id = mach_read_from_8(field);
2283 
2284 	rec_get_nth_field_offs_old(
2285 		rec, DICT_FLD__SYS_INDEXES__DB_TRX_ID, &len);
2286 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2287 		goto err_len;
2288 	}
2289 	rec_get_nth_field_offs_old(
2290 		rec, DICT_FLD__SYS_INDEXES__DB_ROLL_PTR, &len);
2291 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2292 		goto err_len;
2293 	}
2294 
2295 	field = rec_get_nth_field_old(
2296 		rec, DICT_FLD__SYS_INDEXES__NAME, &name_len);
2297 	if (name_len == UNIV_SQL_NULL) {
2298 		goto err_len;
2299 	}
2300 
2301 	name_buf = mem_heap_strdupl(heap, (const char*) field,
2302 				    name_len);
2303 
2304 	field = rec_get_nth_field_old(
2305 		rec, DICT_FLD__SYS_INDEXES__N_FIELDS, &len);
2306 	if (len != 4) {
2307 		goto err_len;
2308 	}
2309 	n_fields = mach_read_from_4(field);
2310 
2311 	field = rec_get_nth_field_old(
2312 		rec, DICT_FLD__SYS_INDEXES__TYPE, &len);
2313 	if (len != 4) {
2314 		goto err_len;
2315 	}
2316 	type = mach_read_from_4(field);
2317 	if (type & (~0U << DICT_IT_BITS)) {
2318 		return("unknown SYS_INDEXES.TYPE bits");
2319 	}
2320 
2321 	field = rec_get_nth_field_old(
2322 		rec, DICT_FLD__SYS_INDEXES__PAGE_NO, &len);
2323 	if (len != 4) {
2324 		goto err_len;
2325 	}
2326 
2327 	if (allocate) {
2328 		*index = dict_mem_index_create(NULL, name_buf, type, n_fields);
2329 	} else {
2330 		ut_a(*index);
2331 
2332 		dict_mem_fill_index_struct(*index, NULL, name_buf,
2333 					   type, n_fields);
2334 	}
2335 
2336 	(*index)->id = id;
2337 	(*index)->page = mach_read_from_4(field);
2338 	ut_ad((*index)->page);
2339 	(*index)->merge_threshold = merge_threshold;
2340 
2341 	return(NULL);
2342 }
2343 
2344 /********************************************************************//**
2345 Loads definitions for table indexes. Adds them to the data dictionary
2346 cache.
2347 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption of dictionary
2348 table or DB_UNSUPPORTED if table has unknown index type */
2349 static MY_ATTRIBUTE((nonnull))
2350 dberr_t
2351 dict_load_indexes(
2352 /*==============*/
2353 	dict_table_t*	table,	/*!< in/out: table */
2354 	mem_heap_t*	heap,	/*!< in: memory heap for temporary storage */
2355 	dict_err_ignore_t ignore_err)
2356 				/*!< in: error to be ignored when
2357 				loading the index definition */
2358 {
2359 	dict_table_t*	sys_indexes;
2360 	dict_index_t*	sys_index;
2361 	btr_pcur_t	pcur;
2362 	dtuple_t*	tuple;
2363 	dfield_t*	dfield;
2364 	const rec_t*	rec;
2365 	byte*		buf;
2366 	mtr_t		mtr;
2367 	dberr_t		error = DB_SUCCESS;
2368 
2369 	ut_ad(mutex_own(&dict_sys->mutex));
2370 
2371 	mtr_start(&mtr);
2372 
2373 	sys_indexes = dict_table_get_low("SYS_INDEXES");
2374 	sys_index = UT_LIST_GET_FIRST(sys_indexes->indexes);
2375 	ut_ad(!dict_table_is_comp(sys_indexes));
2376 	ut_ad(name_of_col_is(sys_indexes, sys_index,
2377 			     DICT_FLD__SYS_INDEXES__NAME, "NAME"));
2378 	ut_ad(name_of_col_is(sys_indexes, sys_index,
2379 			     DICT_FLD__SYS_INDEXES__PAGE_NO, "PAGE_NO"));
2380 
2381 	tuple = dtuple_create(heap, 1);
2382 	dfield = dtuple_get_nth_field(tuple, 0);
2383 
2384 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2385 	mach_write_to_8(buf, table->id);
2386 
2387 	dfield_set_data(dfield, buf, 8);
2388 	dict_index_copy_types(tuple, sys_index, 1);
2389 
2390 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2391 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2392 	for (;;) {
2393 		dict_index_t*	index = NULL;
2394 		const char*	err_msg;
2395 
2396 		if (!btr_pcur_is_on_user_rec(&pcur)) {
2397 
2398 			/* We should allow the table to open even
2399 			without index when DICT_ERR_IGNORE_CORRUPT is set.
2400 			DICT_ERR_IGNORE_CORRUPT is currently only set
2401 			for drop table */
2402 			if (dict_table_get_first_index(table) == NULL
2403 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2404 				ib::warn() << "Cannot load table "
2405 					<< table->name
2406 					<< " because it has no indexes in"
2407 					" InnoDB internal data dictionary.";
2408 				error = DB_CORRUPTION;
2409 				goto func_exit;
2410 			}
2411 
2412 			break;
2413 		}
2414 
2415 		rec = btr_pcur_get_rec(&pcur);
2416 
2417 		if ((ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2418 		    && (rec_get_n_fields_old(rec)
2419 			== DICT_NUM_FIELDS__SYS_INDEXES
2420 			/* a record for older SYS_INDEXES table
2421 			(missing merge_threshold column) is acceptable. */
2422 			|| rec_get_n_fields_old(rec)
2423 			   == DICT_NUM_FIELDS__SYS_INDEXES - 1)) {
2424 			const byte*	field;
2425 			ulint		len;
2426 			field = rec_get_nth_field_old(
2427 				rec, DICT_FLD__SYS_INDEXES__NAME, &len);
2428 
2429 			if (len != UNIV_SQL_NULL
2430 			    && static_cast<char>(*field)
2431 			    == static_cast<char>(*TEMP_INDEX_PREFIX_STR)) {
2432 				/* Skip indexes whose name starts with
2433 				TEMP_INDEX_PREFIX_STR, because they will
2434 				be dropped by row_merge_drop_temp_indexes()
2435 				during crash recovery. */
2436 				goto next_rec;
2437 			}
2438 		}
2439 
2440 		err_msg = dict_load_index_low(buf, heap, rec, TRUE, &index);
2441 		ut_ad((index == NULL && err_msg != NULL)
2442 		      || (index != NULL && err_msg == NULL));
2443 
2444 		if (err_msg == dict_load_index_id_err) {
2445 			/* TABLE_ID mismatch means that we have
2446 			run out of index definitions for the table. */
2447 
2448 			if (dict_table_get_first_index(table) == NULL
2449 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2450 
2451 				ib::warn() << "Failed to load the"
2452 					" clustered index for table "
2453 					<< table->name
2454 					<< " because of the following error: "
2455 					<< err_msg << "."
2456 					" Refusing to load the rest of the"
2457 					" indexes (if any) and the whole table"
2458 					" altogether.";
2459 				error = DB_CORRUPTION;
2460 				goto func_exit;
2461 			}
2462 
2463 			break;
2464 		} else if (err_msg == dict_load_index_del) {
2465 			/* Skip delete-marked records. */
2466 			goto next_rec;
2467 		} else if (err_msg) {
2468 			ib::error() << err_msg;
2469 			if (ignore_err & DICT_ERR_IGNORE_CORRUPT) {
2470 				goto next_rec;
2471 			}
2472 			error = DB_CORRUPTION;
2473 			goto func_exit;
2474 		}
2475 
2476 		ut_ad(index);
2477 		ut_ad(!dict_index_is_online_ddl(index));
2478 
2479 		/* Check whether the index is corrupted */
2480 		if (index->is_corrupted()) {
2481 			ib::error() << "Index " << index->name
2482 				<< " of table " << table->name
2483 				<< " is corrupted";
2484 
2485 			if (!srv_load_corrupted
2486 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)
2487 			    && dict_index_is_clust(index)) {
2488 				dict_mem_index_free(index);
2489 
2490 				error = DB_INDEX_CORRUPT;
2491 				goto func_exit;
2492 			} else {
2493 				/* We will load the index if
2494 				1) srv_load_corrupted is TRUE
2495 				2) ignore_err is set with
2496 				DICT_ERR_IGNORE_CORRUPT
2497 				3) if the index corrupted is a secondary
2498 				index */
2499 				ib::info() << "Load corrupted index "
2500 					<< index->name
2501 					<< " of table " << table->name;
2502 			}
2503 		}
2504 
2505 		if (index->type & DICT_FTS
2506 		    && !dict_table_has_fts_index(table)) {
2507 			/* This should have been created by now. */
2508 			ut_a(table->fts != NULL);
2509 			DICT_TF2_FLAG_SET(table, DICT_TF2_FTS);
2510 		}
2511 
2512 		/* We check for unsupported types first, so that the
2513 		subsequent checks are relevant for the supported types. */
2514 		if (index->type & ~(DICT_CLUSTERED | DICT_UNIQUE
2515 				    | DICT_CORRUPT | DICT_FTS
2516 				    | DICT_SPATIAL | DICT_VIRTUAL)) {
2517 
2518 			ib::error() << "Unknown type " << index->type
2519 				<< " of index " << index->name
2520 				<< " of table " << table->name;
2521 
2522 			error = DB_UNSUPPORTED;
2523 			dict_mem_index_free(index);
2524 			goto func_exit;
2525 		} else if (index->page == FIL_NULL
2526 			   && table->is_readable()
2527 			   && (!(index->type & DICT_FTS))) {
2528 
2529 			ib::error() << "Trying to load index " << index->name
2530 				<< " for table " << table->name
2531 				<< ", but the index tree has been freed!";
2532 
2533 			if (ignore_err & DICT_ERR_IGNORE_INDEX_ROOT) {
2534 				/* If caller can tolerate this error,
2535 				we will continue to load the index and
2536 				let caller deal with this error. However
2537 				mark the index and table corrupted. We
2538 				only need to mark such in the index
2539 				dictionary cache for such metadata corruption,
2540 				since we would always be able to set it
2541 				when loading the dictionary cache */
2542 				index->table = table;
2543 				dict_set_corrupted_index_cache_only(index);
2544 
2545 				ib::info() << "Index is corrupt but forcing"
2546 					" load into data dictionary";
2547 			} else {
2548 corrupted:
2549 				dict_mem_index_free(index);
2550 				error = DB_CORRUPTION;
2551 				goto func_exit;
2552 			}
2553 		} else if (!dict_index_is_clust(index)
2554 			   && NULL == dict_table_get_first_index(table)) {
2555 
2556 			ib::error() << "Trying to load index " << index->name
2557 				<< " for table " << table->name
2558 				<< ", but the first index is not clustered!";
2559 
2560 			goto corrupted;
2561 		} else if (dict_is_sys_table(table->id)
2562 			   && (dict_index_is_clust(index)
2563 			       || ((table == dict_sys->sys_tables)
2564 				   && !strcmp("ID_IND", index->name)))) {
2565 
2566 			/* The index was created in memory already at booting
2567 			of the database server */
2568 			dict_mem_index_free(index);
2569 		} else {
2570 			dict_load_fields(index, heap);
2571 			index->table = table;
2572 
2573 			/* The data dictionary tables should never contain
2574 			invalid index definitions.  If we ignored this error
2575 			and simply did not load this index definition, the
2576 			.frm file would disagree with the index definitions
2577 			inside InnoDB. */
2578 			if ((error = dict_index_add_to_cache(index,
2579 							     index->page))
2580 			    != DB_SUCCESS) {
2581 				goto func_exit;
2582 			}
2583 		}
2584 next_rec:
2585 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2586 	}
2587 
2588 	ut_ad(table->fts_doc_id_index == NULL);
2589 
2590 	if (table->fts != NULL) {
2591 		table->fts_doc_id_index = dict_table_get_index_on_name(
2592 			table, FTS_DOC_ID_INDEX_NAME);
2593 	}
2594 
2595 	/* If the table contains FTS indexes, populate table->fts->indexes */
2596 	if (dict_table_has_fts_index(table)) {
2597 		ut_ad(table->fts_doc_id_index != NULL);
2598 		/* table->fts->indexes should have been created. */
2599 		ut_a(table->fts->indexes != NULL);
2600 		dict_table_get_all_fts_indexes(table, table->fts->indexes);
2601 	}
2602 
2603 func_exit:
2604 	btr_pcur_close(&pcur);
2605 	mtr_commit(&mtr);
2606 
2607 	return(error);
2608 }
2609 
2610 /** Load a table definition from a SYS_TABLES record to dict_table_t.
2611 Do not load any columns or indexes.
2612 @param[in]	name		Table name
2613 @param[in]	rec		SYS_TABLES record
2614 @param[out,own]	table		table, or NULL
2615 @return	error message
2616 @retval	NULL on success */
2617 static const char* dict_load_table_low(const table_name_t& name,
2618 				       const rec_t* rec, dict_table_t** table)
2619 {
2620 	table_id_t	table_id;
2621 	ulint		space_id;
2622 	ulint		n_cols;
2623 	ulint		t_num;
2624 	ulint		flags;
2625 	ulint		flags2;
2626 	ulint		n_v_col;
2627 
2628 	if (const char* error_text = dict_sys_tables_rec_check(rec)) {
2629 		*table = NULL;
2630 		return(error_text);
2631 	}
2632 
2633 	if (!dict_sys_tables_rec_read(rec, name, &table_id, &space_id,
2634 				      &t_num, &flags, &flags2)) {
2635 		*table = NULL;
2636 		return(dict_load_table_flags);
2637 	}
2638 
2639 	dict_table_decode_n_col(t_num, &n_cols, &n_v_col);
2640 
2641 	*table = dict_mem_table_create(
2642 		name.m_name, NULL, n_cols + n_v_col, n_v_col, flags, flags2);
2643 	(*table)->space_id = space_id;
2644 	(*table)->id = table_id;
2645 	(*table)->file_unreadable = !!(flags2 & DICT_TF2_DISCARDED);
2646 
2647 	return(NULL);
2648 }
2649 
2650 /********************************************************************//**
2651 Using the table->heap, copy the null-terminated filepath into
2652 table->data_dir_path and replace the 'databasename/tablename.ibd'
2653 portion with 'tablename'.
2654 This allows SHOW CREATE TABLE to return the correct DATA DIRECTORY path.
2655 Make this data directory path only if it has not yet been saved. */
2656 static
2657 void
2658 dict_save_data_dir_path(
2659 /*====================*/
2660 	dict_table_t*	table,		/*!< in/out: table */
2661 	const char*	filepath)	/*!< in: filepath of tablespace */
2662 {
2663 	ut_ad(mutex_own(&dict_sys->mutex));
2664 	ut_a(DICT_TF_HAS_DATA_DIR(table->flags));
2665 
2666 	ut_a(!table->data_dir_path);
2667 	ut_a(filepath);
2668 
2669 	/* Be sure this filepath is not the default filepath. */
2670 	char*	default_filepath = fil_make_filepath(
2671 			NULL, table->name.m_name, IBD, false);
2672 	if (default_filepath) {
2673 		if (0 != strcmp(filepath, default_filepath)) {
2674 			ulint pathlen = strlen(filepath);
2675 			ut_a(pathlen < OS_FILE_MAX_PATH);
2676 			ut_a(0 == strcmp(filepath + pathlen - 4, DOT_IBD));
2677 
2678 			table->data_dir_path = mem_heap_strdup(
2679 				table->heap, filepath);
2680 			os_file_make_data_dir_path(table->data_dir_path);
2681 		}
2682 
2683 		ut_free(default_filepath);
2684 	}
2685 }
2686 
2687 /** Make sure the data_dir_path is saved in dict_table_t if DATA DIRECTORY
2688 was used. Try to read it from the fil_system first, then from SYS_DATAFILES.
2689 @param[in]	table		Table object
2690 @param[in]	dict_mutex_own	true if dict_sys->mutex is owned already */
2691 void
2692 dict_get_and_save_data_dir_path(
2693 	dict_table_t*	table,
2694 	bool		dict_mutex_own)
2695 {
2696 	ut_ad(!table->is_temporary());
2697 	ut_ad(!table->space || table->space->id == table->space_id);
2698 
2699 	if (!table->data_dir_path && table->space_id && table->space) {
2700 		if (!dict_mutex_own) {
2701 			dict_mutex_enter_for_mysql();
2702 		}
2703 
2704 		table->flags |= (1 << DICT_TF_POS_DATA_DIR);
2705 		dict_save_data_dir_path(table,
2706 					table->space->chain.start->name);
2707 
2708 		if (table->data_dir_path == NULL) {
2709 			/* Since we did not set the table data_dir_path,
2710 			unset the flag.  This does not change SYS_DATAFILES
2711 			or SYS_TABLES or FSP_FLAGS on the header page of the
2712 			tablespace, but it makes dict_table_t consistent. */
2713 			table->flags &= ~DICT_TF_MASK_DATA_DIR;
2714 		}
2715 
2716 		if (!dict_mutex_own) {
2717 			dict_mutex_exit_for_mysql();
2718 		}
2719 	}
2720 }
2721 
2722 /** Loads a table definition and also all its index definitions, and also
2723 the cluster definition if the table is a member in a cluster. Also loads
2724 all foreign key constraints where the foreign key is in the table or where
2725 a foreign key references columns in this table.
2726 @param[in]	name		Table name in the dbname/tablename format
2727 @param[in]	ignore_err	Error to be ignored when loading
2728 				table and its index definition
2729 @return table, NULL if does not exist; if the table is stored in an
2730 .ibd file, but the file does not exist, then we set the file_unreadable
2731 flag in the table object we return. */
2732 dict_table_t* dict_load_table(const char* name, dict_err_ignore_t ignore_err)
2733 {
2734 	dict_names_t			fk_list;
2735 	dict_table_t*			result;
2736 	dict_names_t::iterator		i;
2737 
2738 	DBUG_ENTER("dict_load_table");
2739 	DBUG_PRINT("dict_load_table", ("loading table: '%s'", name));
2740 
2741 	ut_ad(mutex_own(&dict_sys->mutex));
2742 
2743 	result = dict_table_check_if_in_cache_low(name);
2744 
2745 	if (!result) {
2746 		result = dict_load_table_one(const_cast<char*>(name),
2747 					     ignore_err, fk_list);
2748 		while (!fk_list.empty()) {
2749 			if (!dict_table_check_if_in_cache_low(fk_list.front()))
2750 				dict_load_table_one(
2751 					const_cast<char*>(fk_list.front()),
2752 					ignore_err, fk_list);
2753 			fk_list.pop_front();
2754 		}
2755 	}
2756 
2757 	DBUG_RETURN(result);
2758 }
2759 
2760 /** Opens a tablespace for dict_load_table_one()
2761 @param[in,out]	table		A table that refers to the tablespace to open
2762 @param[in]	ignore_err	Whether to ignore an error. */
2763 UNIV_INLINE
2764 void
2765 dict_load_tablespace(
2766 	dict_table_t*		table,
2767 	dict_err_ignore_t	ignore_err)
2768 {
2769 	ut_ad(!table->is_temporary());
2770 	ut_ad(!table->space);
2771 	ut_ad(table->space_id < SRV_LOG_SPACE_FIRST_ID);
2772 	ut_ad(fil_system.sys_space);
2773 
2774 	if (table->space_id == TRX_SYS_SPACE) {
2775 		table->space = fil_system.sys_space;
2776 		return;
2777 	}
2778 
2779 	if (table->flags2 & DICT_TF2_DISCARDED) {
2780 		ib::warn() << "Tablespace for table " << table->name
2781 			<< " is set as discarded.";
2782 		table->file_unreadable = true;
2783 		return;
2784 	}
2785 
2786 	/* The tablespace may already be open. */
2787 	table->space = fil_space_for_table_exists_in_mem(
2788 		table->space_id, table->name.m_name, table->flags);
2789 	if (table->space) {
2790 		return;
2791 	}
2792 
2793 	if (ignore_err == DICT_ERR_IGNORE_DROP) {
2794 		table->file_unreadable = true;
2795 		return;
2796 	}
2797 
2798 	if (!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)) {
2799 		ib::error() << "Failed to find tablespace for table "
2800 			<< table->name << " in the cache. Attempting"
2801 			" to load the tablespace with space id "
2802 			<< table->space_id;
2803 	}
2804 
2805 	/* Use the remote filepath if needed. This parameter is optional
2806 	in the call to fil_ibd_open(). If not supplied, it will be built
2807 	from the table->name. */
2808 	char* filepath = NULL;
2809 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
2810 		/* This will set table->data_dir_path from either
2811 		fil_system or SYS_DATAFILES */
2812 		dict_get_and_save_data_dir_path(table, true);
2813 
2814 		if (table->data_dir_path) {
2815 			filepath = fil_make_filepath(
2816 				table->data_dir_path,
2817 				table->name.m_name, IBD, true);
2818 		}
2819 	}
2820 
2821 	/* Try to open the tablespace.  We set the 2nd param (fix_dict) to
2822 	false because we do not have an x-lock on dict_operation_lock */
2823 	table->space = fil_ibd_open(
2824 		true, false, FIL_TYPE_TABLESPACE, table->space_id,
2825 		dict_tf_to_fsp_flags(table->flags),
2826 		table->name, filepath);
2827 
2828 	if (!table->space) {
2829 		/* We failed to find a sensible tablespace file */
2830 		table->file_unreadable = true;
2831 	}
2832 
2833 	ut_free(filepath);
2834 }
2835 
2836 /** Loads a table definition and also all its index definitions.
2837 
2838 Loads those foreign key constraints whose referenced table is already in
2839 dictionary cache.  If a foreign key constraint is not loaded, then the
2840 referenced table is pushed into the output stack (fk_tables), if it is not
2841 NULL.  These tables must be subsequently loaded so that all the foreign
2842 key constraints are loaded into memory.
2843 
2844 @param[in]	name		Table name in the db/tablename format
2845 @param[in]	ignore_err	Error to be ignored when loading table
2846 				and its index definition
2847 @param[out]	fk_tables	Related table names that must also be
2848 				loaded to ensure that all foreign key
2849 				constraints are loaded.
2850 @return table, NULL if does not exist; if the table is stored in an
2851 .ibd file, but the file does not exist, then we set the
2852 file_unreadable flag in the table object we return */
2853 static
2854 dict_table_t*
2855 dict_load_table_one(
2856 	const table_name_t&	name,
2857 	dict_err_ignore_t	ignore_err,
2858 	dict_names_t&		fk_tables)
2859 {
2860 	dberr_t		err;
2861 	dict_table_t*	sys_tables;
2862 	btr_pcur_t	pcur;
2863 	dict_index_t*	sys_index;
2864 	dtuple_t*	tuple;
2865 	mem_heap_t*	heap;
2866 	dfield_t*	dfield;
2867 	const rec_t*	rec;
2868 	const byte*	field;
2869 	ulint		len;
2870 	mtr_t		mtr;
2871 
2872 	DBUG_ENTER("dict_load_table_one");
2873 	DBUG_PRINT("dict_load_table_one", ("table: %s", name.m_name));
2874 
2875 	ut_ad(mutex_own(&dict_sys->mutex));
2876 
2877 	heap = mem_heap_create(32000);
2878 
2879 	mtr_start(&mtr);
2880 
2881 	sys_tables = dict_table_get_low("SYS_TABLES");
2882 	sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
2883 	ut_ad(!dict_table_is_comp(sys_tables));
2884 	ut_ad(name_of_col_is(sys_tables, sys_index,
2885 			     DICT_FLD__SYS_TABLES__ID, "ID"));
2886 	ut_ad(name_of_col_is(sys_tables, sys_index,
2887 			     DICT_FLD__SYS_TABLES__N_COLS, "N_COLS"));
2888 	ut_ad(name_of_col_is(sys_tables, sys_index,
2889 			     DICT_FLD__SYS_TABLES__TYPE, "TYPE"));
2890 	ut_ad(name_of_col_is(sys_tables, sys_index,
2891 			     DICT_FLD__SYS_TABLES__MIX_LEN, "MIX_LEN"));
2892 	ut_ad(name_of_col_is(sys_tables, sys_index,
2893 			     DICT_FLD__SYS_TABLES__SPACE, "SPACE"));
2894 
2895 	tuple = dtuple_create(heap, 1);
2896 	dfield = dtuple_get_nth_field(tuple, 0);
2897 
2898 	dfield_set_data(dfield, name.m_name, ut_strlen(name.m_name));
2899 	dict_index_copy_types(tuple, sys_index, 1);
2900 
2901 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2902 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2903 	rec = btr_pcur_get_rec(&pcur);
2904 
2905 	if (!btr_pcur_is_on_user_rec(&pcur)
2906 	    || rec_get_deleted_flag(rec, 0)) {
2907 		/* Not found */
2908 err_exit:
2909 		btr_pcur_close(&pcur);
2910 		mtr_commit(&mtr);
2911 		mem_heap_free(heap);
2912 
2913 		DBUG_RETURN(NULL);
2914 	}
2915 
2916 	field = rec_get_nth_field_old(
2917 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
2918 
2919 	/* Check if the table name in record is the searched one */
2920 	if (len != ut_strlen(name.m_name)
2921 	    || 0 != ut_memcmp(name.m_name, field, len)) {
2922 
2923 		goto err_exit;
2924 	}
2925 
2926 	dict_table_t* table;
2927 	if (const char* err_msg = dict_load_table_low(name, rec, &table)) {
2928 		if (err_msg != dict_load_table_flags) {
2929 			ib::error() << err_msg;
2930 		}
2931 		goto err_exit;
2932 	}
2933 
2934 	btr_pcur_close(&pcur);
2935 	mtr_commit(&mtr);
2936 
2937 	dict_load_tablespace(table, ignore_err);
2938 
2939 	dict_load_columns(table, heap);
2940 
2941 	dict_load_virtual(table, heap);
2942 
2943 	dict_table_add_system_columns(table, heap);
2944 
2945 	table->can_be_evicted = true;
2946 	table->add_to_cache();
2947 
2948 	mem_heap_empty(heap);
2949 
2950 	ut_ad(dict_tf2_is_valid(table->flags, table->flags2));
2951 
2952 	/* If there is no tablespace for the table then we only need to
2953 	load the index definitions. So that we can IMPORT the tablespace
2954 	later. When recovering table locks for resurrected incomplete
2955 	transactions, the tablespace should exist, because DDL operations
2956 	were not allowed while the table is being locked by a transaction. */
2957 	dict_err_ignore_t index_load_err =
2958 		!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2959 		&& !table->is_readable()
2960 		? DICT_ERR_IGNORE_ALL
2961 		: ignore_err;
2962 
2963 	err = dict_load_indexes(table, heap, index_load_err);
2964 
2965 	if (err == DB_INDEX_CORRUPT) {
2966 		/* Refuse to load the table if the table has a corrupted
2967 		cluster index */
2968 		if (!srv_load_corrupted) {
2969 
2970 			ib::error() << "Load table " << table->name
2971 				<< " failed, the table has"
2972 				" corrupted clustered indexes. Turn on"
2973 				" 'innodb_force_load_corrupted' to drop it";
2974 			dict_table_remove_from_cache(table);
2975 			table = NULL;
2976 			goto func_exit;
2977 		} else {
2978 			if (table->indexes.start->is_corrupted()) {
2979 				table->corrupted = true;
2980 			}
2981 		}
2982 	}
2983 
2984 	if (err == DB_SUCCESS && table->is_readable()) {
2985 		if (table->space && !fil_space_get_size(table->space_id)) {
2986 corrupted:
2987 			table->corrupted = true;
2988 			table->file_unreadable = true;
2989 			err = DB_CORRUPTION;
2990 		} else {
2991 			const page_id_t page_id(
2992 				table->space->id,
2993 				dict_table_get_first_index(table)->page);
2994 			mtr.start();
2995 			buf_block_t* block = buf_page_get(
2996 				page_id,
2997 				dict_table_page_size(table),
2998 				RW_S_LATCH, &mtr);
2999 			const bool corrupted = !block
3000 				|| page_get_space_id(block->frame)
3001 				!= page_id.space()
3002 				|| page_get_page_no(block->frame)
3003 				!= page_id.page_no()
3004 				|| (mach_read_from_2(FIL_PAGE_TYPE
3005 						    + block->frame)
3006 				    != FIL_PAGE_INDEX
3007 				    && mach_read_from_2(FIL_PAGE_TYPE
3008 							+ block->frame)
3009 				    != FIL_PAGE_TYPE_INSTANT);
3010 			mtr.commit();
3011 			if (corrupted) {
3012 				goto corrupted;
3013 			}
3014 
3015 			if (table->supports_instant()) {
3016 				err = btr_cur_instant_init(table);
3017 			}
3018 		}
3019 	}
3020 
3021 	/* Initialize table foreign_child value. Its value could be
3022 	changed when dict_load_foreigns() is called below */
3023 	table->fk_max_recusive_level = 0;
3024 
3025 	/* If the force recovery flag is set, we open the table irrespective
3026 	of the error condition, since the user may want to dump data from the
3027 	clustered index. However we load the foreign key information only if
3028 	all indexes were loaded. */
3029 	if (!table->is_readable()) {
3030 		/* Don't attempt to load the indexes from disk. */
3031 	} else if (err == DB_SUCCESS) {
3032 		err = dict_load_foreigns(table->name.m_name, NULL,
3033 					 true, true,
3034 					 ignore_err, fk_tables);
3035 
3036 		if (err != DB_SUCCESS) {
3037 			ib::warn() << "Load table " << table->name
3038 				<< " failed, the table has missing"
3039 				" foreign key indexes. Turn off"
3040 				" 'foreign_key_checks' and try again.";
3041 
3042 			dict_table_remove_from_cache(table);
3043 			table = NULL;
3044 		} else {
3045 			dict_mem_table_fill_foreign_vcol_set(table);
3046 			table->fk_max_recusive_level = 0;
3047 		}
3048 	} else {
3049 		dict_index_t*   index;
3050 
3051 		/* Make sure that at least the clustered index was loaded.
3052 		Otherwise refuse to load the table */
3053 		index = dict_table_get_first_index(table);
3054 
3055 		if (!srv_force_recovery
3056 		    || !index
3057 		    || !index->is_primary()) {
3058 			dict_table_remove_from_cache(table);
3059 			table = NULL;
3060 		} else if (index->is_corrupted()
3061 			   && table->is_readable()) {
3062 			/* It is possible we force to load a corrupted
3063 			clustered index if srv_load_corrupted is set.
3064 			Mark the table as corrupted in this case */
3065 			table->corrupted = true;
3066 		}
3067 	}
3068 
3069 func_exit:
3070 	mem_heap_free(heap);
3071 
3072 	ut_ad(!table
3073 	      || (ignore_err & ~DICT_ERR_IGNORE_FK_NOKEY)
3074 	      || !table->is_readable()
3075 	      || !table->corrupted);
3076 
3077 	if (table && table->fts) {
3078 		if (!(dict_table_has_fts_index(table)
3079 		      || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)
3080 		      || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID))) {
3081 			/* the table->fts could be created in dict_load_column
3082 			when a user defined FTS_DOC_ID is present, but no
3083 			FTS */
3084 			fts_optimize_remove_table(table);
3085 			fts_free(table);
3086 		} else if (fts_optimize_wq) {
3087 			fts_optimize_add_table(table);
3088 		} else if (table->can_be_evicted) {
3089 			/* fts_optimize_thread is not started yet.
3090 			So make the table as non-evictable from cache. */
3091 			dict_table_move_from_lru_to_non_lru(table);
3092 		}
3093 	}
3094 
3095 	ut_ad(err != DB_SUCCESS || dict_foreign_set_validate(*table));
3096 
3097 	DBUG_RETURN(table);
3098 }
3099 
3100 /***********************************************************************//**
3101 Loads a table object based on the table id.
3102 @return table; NULL if table does not exist */
3103 dict_table_t*
3104 dict_load_table_on_id(
3105 /*==================*/
3106 	table_id_t		table_id,	/*!< in: table id */
3107 	dict_err_ignore_t	ignore_err)	/*!< in: errors to ignore
3108 						when loading the table */
3109 {
3110 	byte		id_buf[8];
3111 	btr_pcur_t	pcur;
3112 	mem_heap_t*	heap;
3113 	dtuple_t*	tuple;
3114 	dfield_t*	dfield;
3115 	dict_index_t*	sys_table_ids;
3116 	dict_table_t*	sys_tables;
3117 	const rec_t*	rec;
3118 	const byte*	field;
3119 	ulint		len;
3120 	dict_table_t*	table;
3121 	mtr_t		mtr;
3122 
3123 	ut_ad(mutex_own(&dict_sys->mutex));
3124 
3125 	table = NULL;
3126 
3127 	/* NOTE that the operation of this function is protected by
3128 	the dictionary mutex, and therefore no deadlocks can occur
3129 	with other dictionary operations. */
3130 
3131 	mtr_start(&mtr);
3132 	/*---------------------------------------------------*/
3133 	/* Get the secondary index based on ID for table SYS_TABLES */
3134 	sys_tables = dict_sys->sys_tables;
3135 	sys_table_ids = dict_table_get_next_index(
3136 		dict_table_get_first_index(sys_tables));
3137 	ut_ad(!dict_table_is_comp(sys_tables));
3138 	ut_ad(!dict_index_is_clust(sys_table_ids));
3139 	heap = mem_heap_create(256);
3140 
3141 	tuple  = dtuple_create(heap, 1);
3142 	dfield = dtuple_get_nth_field(tuple, 0);
3143 
3144 	/* Write the table id in byte format to id_buf */
3145 	mach_write_to_8(id_buf, table_id);
3146 
3147 	dfield_set_data(dfield, id_buf, 8);
3148 	dict_index_copy_types(tuple, sys_table_ids, 1);
3149 
3150 	btr_pcur_open_on_user_rec(sys_table_ids, tuple, PAGE_CUR_GE,
3151 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3152 
3153 	rec = btr_pcur_get_rec(&pcur);
3154 
3155 	if (page_rec_is_user_rec(rec)) {
3156 		/*---------------------------------------------------*/
3157 		/* Now we have the record in the secondary index
3158 		containing the table ID and NAME */
3159 check_rec:
3160 		field = rec_get_nth_field_old(
3161 			rec, DICT_FLD__SYS_TABLE_IDS__ID, &len);
3162 		ut_ad(len == 8);
3163 
3164 		/* Check if the table id in record is the one searched for */
3165 		if (table_id == mach_read_from_8(field)) {
3166 			if (rec_get_deleted_flag(rec, 0)) {
3167 				/* Until purge has completed, there
3168 				may be delete-marked duplicate records
3169 				for the same SYS_TABLES.ID, but different
3170 				SYS_TABLES.NAME. */
3171 				while (btr_pcur_move_to_next(&pcur, &mtr)) {
3172 					rec = btr_pcur_get_rec(&pcur);
3173 
3174 					if (page_rec_is_user_rec(rec)) {
3175 						goto check_rec;
3176 					}
3177 				}
3178 			} else {
3179 				/* Now we get the table name from the record */
3180 				field = rec_get_nth_field_old(rec,
3181 					DICT_FLD__SYS_TABLE_IDS__NAME, &len);
3182 				/* Load the table definition to memory */
3183 				char*	table_name = mem_heap_strdupl(
3184 					heap, (char*) field, len);
3185 				table = dict_load_table(table_name, ignore_err);
3186 			}
3187 		}
3188 	}
3189 
3190 	btr_pcur_close(&pcur);
3191 	mtr_commit(&mtr);
3192 	mem_heap_free(heap);
3193 
3194 	return(table);
3195 }
3196 
3197 /********************************************************************//**
3198 This function is called when the database is booted. Loads system table
3199 index definitions except for the clustered index which is added to the
3200 dictionary cache at booting before calling this function. */
3201 void
3202 dict_load_sys_table(
3203 /*================*/
3204 	dict_table_t*	table)	/*!< in: system table */
3205 {
3206 	mem_heap_t*	heap;
3207 
3208 	ut_ad(mutex_own(&dict_sys->mutex));
3209 
3210 	heap = mem_heap_create(1000);
3211 
3212 	dict_load_indexes(table, heap, DICT_ERR_IGNORE_NONE);
3213 
3214 	mem_heap_free(heap);
3215 }
3216 
3217 /********************************************************************//**
3218 Loads foreign key constraint col names (also for the referenced table).
3219 Members that must be set (and valid) in foreign:
3220 foreign->heap
3221 foreign->n_fields
3222 foreign->id ('\0'-terminated)
3223 Members that will be created and set by this function:
3224 foreign->foreign_col_names[i]
3225 foreign->referenced_col_names[i]
3226 (for i=0..foreign->n_fields-1) */
3227 static
3228 void
3229 dict_load_foreign_cols(
3230 /*===================*/
3231 	dict_foreign_t*	foreign)/*!< in/out: foreign constraint object */
3232 {
3233 	dict_table_t*	sys_foreign_cols;
3234 	dict_index_t*	sys_index;
3235 	btr_pcur_t	pcur;
3236 	dtuple_t*	tuple;
3237 	dfield_t*	dfield;
3238 	const rec_t*	rec;
3239 	const byte*	field;
3240 	ulint		len;
3241 	ulint		i;
3242 	mtr_t		mtr;
3243 	size_t		id_len;
3244 
3245 	ut_ad(mutex_own(&dict_sys->mutex));
3246 
3247 	id_len = strlen(foreign->id);
3248 
3249 	foreign->foreign_col_names = static_cast<const char**>(
3250 		mem_heap_alloc(foreign->heap,
3251 			       foreign->n_fields * sizeof(void*)));
3252 
3253 	foreign->referenced_col_names = static_cast<const char**>(
3254 		mem_heap_alloc(foreign->heap,
3255 			       foreign->n_fields * sizeof(void*)));
3256 
3257 	mtr_start(&mtr);
3258 
3259 	sys_foreign_cols = dict_table_get_low("SYS_FOREIGN_COLS");
3260 
3261 	sys_index = UT_LIST_GET_FIRST(sys_foreign_cols->indexes);
3262 	ut_ad(!dict_table_is_comp(sys_foreign_cols));
3263 
3264 	tuple = dtuple_create(foreign->heap, 1);
3265 	dfield = dtuple_get_nth_field(tuple, 0);
3266 
3267 	dfield_set_data(dfield, foreign->id, id_len);
3268 	dict_index_copy_types(tuple, sys_index, 1);
3269 
3270 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3271 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3272 	for (i = 0; i < foreign->n_fields; i++) {
3273 
3274 		rec = btr_pcur_get_rec(&pcur);
3275 
3276 		ut_a(btr_pcur_is_on_user_rec(&pcur));
3277 		ut_a(!rec_get_deleted_flag(rec, 0));
3278 
3279 		field = rec_get_nth_field_old(
3280 			rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
3281 
3282 		if (len != id_len || ut_memcmp(foreign->id, field, len) != 0) {
3283 			const rec_t*	pos;
3284 			ulint		pos_len;
3285 			const rec_t*	for_col_name;
3286 			ulint		for_col_name_len;
3287 			const rec_t*	ref_col_name;
3288 			ulint		ref_col_name_len;
3289 
3290 			pos = rec_get_nth_field_old(
3291 				rec, DICT_FLD__SYS_FOREIGN_COLS__POS,
3292 				&pos_len);
3293 
3294 			for_col_name = rec_get_nth_field_old(
3295 				rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME,
3296 				&for_col_name_len);
3297 
3298 			ref_col_name = rec_get_nth_field_old(
3299 				rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME,
3300 				&ref_col_name_len);
3301 
3302 			ib::fatal	sout;
3303 
3304 			sout << "Unable to load column names for foreign"
3305 				" key '" << foreign->id
3306 				<< "' because it was not found in"
3307 				" InnoDB internal table SYS_FOREIGN_COLS. The"
3308 				" closest entry we found is:"
3309 				" (ID='";
3310 			sout.write(field, len);
3311 			sout << "', POS=" << mach_read_from_4(pos)
3312 				<< ", FOR_COL_NAME='";
3313 			sout.write(for_col_name, for_col_name_len);
3314 			sout << "', REF_COL_NAME='";
3315 			sout.write(ref_col_name, ref_col_name_len);
3316 			sout << "')";
3317 		}
3318 
3319 		field = rec_get_nth_field_old(
3320 			rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
3321 		ut_a(len == 4);
3322 		ut_a(i == mach_read_from_4(field));
3323 
3324 		field = rec_get_nth_field_old(
3325 			rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
3326 		foreign->foreign_col_names[i] = mem_heap_strdupl(
3327 			foreign->heap, (char*) field, len);
3328 
3329 		field = rec_get_nth_field_old(
3330 			rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
3331 		foreign->referenced_col_names[i] = mem_heap_strdupl(
3332 			foreign->heap, (char*) field, len);
3333 
3334 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
3335 	}
3336 
3337 	btr_pcur_close(&pcur);
3338 	mtr_commit(&mtr);
3339 }
3340 
3341 /***********************************************************************//**
3342 Loads a foreign key constraint to the dictionary cache. If the referenced
3343 table is not yet loaded, it is added in the output parameter (fk_tables).
3344 @return DB_SUCCESS or error code */
3345 static MY_ATTRIBUTE((nonnull(1), warn_unused_result))
3346 dberr_t
3347 dict_load_foreign(
3348 /*==============*/
3349 	const char*		id,
3350 				/*!< in: foreign constraint id, must be
3351 				'\0'-terminated */
3352 	const char**		col_names,
3353 				/*!< in: column names, or NULL
3354 				to use foreign->foreign_table->col_names */
3355 	bool			check_recursive,
3356 				/*!< in: whether to record the foreign table
3357 				parent count to avoid unlimited recursive
3358 				load of chained foreign tables */
3359 	bool			check_charsets,
3360 				/*!< in: whether to check charset
3361 				compatibility */
3362 	dict_err_ignore_t	ignore_err,
3363 				/*!< in: error to be ignored */
3364 	dict_names_t&	fk_tables)
3365 				/*!< out: the foreign key constraint is added
3366 				to the dictionary cache only if the referenced
3367 				table is already in cache.  Otherwise, the
3368 				foreign key constraint is not added to cache,
3369 				and the referenced table is added to this
3370 				stack. */
3371 {
3372 	dict_foreign_t*	foreign;
3373 	dict_table_t*	sys_foreign;
3374 	btr_pcur_t	pcur;
3375 	dict_index_t*	sys_index;
3376 	dtuple_t*	tuple;
3377 	mem_heap_t*	heap2;
3378 	dfield_t*	dfield;
3379 	const rec_t*	rec;
3380 	const byte*	field;
3381 	ulint		len;
3382 	ulint		n_fields_and_type;
3383 	mtr_t		mtr;
3384 	dict_table_t*	for_table;
3385 	dict_table_t*	ref_table;
3386 	size_t		id_len;
3387 
3388 	DBUG_ENTER("dict_load_foreign");
3389 	DBUG_PRINT("dict_load_foreign",
3390 		   ("id: '%s', check_recursive: %d", id, check_recursive));
3391 
3392 	ut_ad(mutex_own(&dict_sys->mutex));
3393 
3394 	id_len = strlen(id);
3395 
3396 	heap2 = mem_heap_create(1000);
3397 
3398 	mtr_start(&mtr);
3399 
3400 	sys_foreign = dict_table_get_low("SYS_FOREIGN");
3401 
3402 	sys_index = UT_LIST_GET_FIRST(sys_foreign->indexes);
3403 	ut_ad(!dict_table_is_comp(sys_foreign));
3404 
3405 	tuple = dtuple_create(heap2, 1);
3406 	dfield = dtuple_get_nth_field(tuple, 0);
3407 
3408 	dfield_set_data(dfield, id, id_len);
3409 	dict_index_copy_types(tuple, sys_index, 1);
3410 
3411 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3412 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3413 	rec = btr_pcur_get_rec(&pcur);
3414 
3415 	if (!btr_pcur_is_on_user_rec(&pcur)
3416 	    || rec_get_deleted_flag(rec, 0)) {
3417 		/* Not found */
3418 
3419 		ib::error() << "Cannot load foreign constraint " << id
3420 			<< ": could not find the relevant record in "
3421 			<< "SYS_FOREIGN";
3422 
3423 		btr_pcur_close(&pcur);
3424 		mtr_commit(&mtr);
3425 		mem_heap_free(heap2);
3426 
3427 		DBUG_RETURN(DB_ERROR);
3428 	}
3429 
3430 	field = rec_get_nth_field_old(rec, DICT_FLD__SYS_FOREIGN__ID, &len);
3431 
3432 	/* Check if the id in record is the searched one */
3433 	if (len != id_len || ut_memcmp(id, field, len) != 0) {
3434 
3435 		{
3436 			ib::error	err;
3437 			err << "Cannot load foreign constraint " << id
3438 				<< ": found ";
3439 			err.write(field, len);
3440 			err << " instead in SYS_FOREIGN";
3441 		}
3442 
3443 		btr_pcur_close(&pcur);
3444 		mtr_commit(&mtr);
3445 		mem_heap_free(heap2);
3446 
3447 		DBUG_RETURN(DB_ERROR);
3448 	}
3449 
3450 	/* Read the table names and the number of columns associated
3451 	with the constraint */
3452 
3453 	mem_heap_free(heap2);
3454 
3455 	foreign = dict_mem_foreign_create();
3456 
3457 	n_fields_and_type = mach_read_from_4(
3458 		rec_get_nth_field_old(
3459 			rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len));
3460 
3461 	ut_a(len == 4);
3462 
3463 	/* We store the type in the bits 24..29 of n_fields_and_type. */
3464 
3465 	foreign->type = (unsigned int) (n_fields_and_type >> 24);
3466 	foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL);
3467 
3468 	foreign->id = mem_heap_strdupl(foreign->heap, id, id_len);
3469 
3470 	field = rec_get_nth_field_old(
3471 		rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
3472 
3473 	foreign->foreign_table_name = mem_heap_strdupl(
3474 		foreign->heap, (char*) field, len);
3475 	dict_mem_foreign_table_name_lookup_set(foreign, TRUE);
3476 
3477 	const ulint foreign_table_name_len = len;
3478 
3479 	field = rec_get_nth_field_old(
3480 		rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
3481 	foreign->referenced_table_name = mem_heap_strdupl(
3482 		foreign->heap, (char*) field, len);
3483 	dict_mem_referenced_table_name_lookup_set(foreign, TRUE);
3484 
3485 	btr_pcur_close(&pcur);
3486 	mtr_commit(&mtr);
3487 
3488 	dict_load_foreign_cols(foreign);
3489 
3490 	ref_table = dict_table_check_if_in_cache_low(
3491 		foreign->referenced_table_name_lookup);
3492 	for_table = dict_table_check_if_in_cache_low(
3493 		foreign->foreign_table_name_lookup);
3494 
3495 	if (!for_table) {
3496 		/* To avoid recursively loading the tables related through
3497 		the foreign key constraints, the child table name is saved
3498 		here.  The child table will be loaded later, along with its
3499 		foreign key constraint. */
3500 
3501 		ut_a(ref_table != NULL);
3502 		fk_tables.push_back(
3503 			mem_heap_strdupl(ref_table->heap,
3504 					 foreign->foreign_table_name_lookup,
3505 					 foreign_table_name_len));
3506 
3507 		dict_foreign_remove_from_cache(foreign);
3508 		DBUG_RETURN(DB_SUCCESS);
3509 	}
3510 
3511 	ut_a(for_table || ref_table);
3512 
3513 	/* Note that there may already be a foreign constraint object in
3514 	the dictionary cache for this constraint: then the following
3515 	call only sets the pointers in it to point to the appropriate table
3516 	and index objects and frees the newly created object foreign.
3517 	Adding to the cache should always succeed since we are not creating
3518 	a new foreign key constraint but loading one from the data
3519 	dictionary. */
3520 
3521 	DBUG_RETURN(dict_foreign_add_to_cache(foreign, col_names,
3522 					      check_charsets,
3523 					      ignore_err));
3524 }
3525 
3526 /***********************************************************************//**
3527 Loads foreign key constraints where the table is either the foreign key
3528 holder or where the table is referenced by a foreign key. Adds these
3529 constraints to the data dictionary.
3530 
3531 The foreign key constraint is loaded only if the referenced table is also
3532 in the dictionary cache.  If the referenced table is not in dictionary
3533 cache, then it is added to the output parameter (fk_tables).
3534 
3535 @return DB_SUCCESS or error code */
3536 dberr_t
3537 dict_load_foreigns(
3538 	const char*		table_name,	/*!< in: table name */
3539 	const char**		col_names,	/*!< in: column names, or NULL
3540 						to use table->col_names */
3541 	bool			check_recursive,/*!< in: Whether to check
3542 						recursive load of tables
3543 						chained by FK */
3544 	bool			check_charsets,	/*!< in: whether to check
3545 						charset compatibility */
3546 	dict_err_ignore_t	ignore_err,	/*!< in: error to be ignored */
3547 	dict_names_t&		fk_tables)
3548 						/*!< out: stack of table
3549 						names which must be loaded
3550 						subsequently to load all the
3551 						foreign key constraints. */
3552 {
3553 	ulint		tuple_buf[(DTUPLE_EST_ALLOC(1) + sizeof(ulint) - 1)
3554 				/ sizeof(ulint)];
3555 	btr_pcur_t	pcur;
3556 	dtuple_t*	tuple;
3557 	dfield_t*	dfield;
3558 	dict_index_t*	sec_index;
3559 	dict_table_t*	sys_foreign;
3560 	const rec_t*	rec;
3561 	const byte*	field;
3562 	ulint		len;
3563 	dberr_t		err;
3564 	mtr_t		mtr;
3565 
3566 	DBUG_ENTER("dict_load_foreigns");
3567 
3568 	ut_ad(mutex_own(&dict_sys->mutex));
3569 
3570 	sys_foreign = dict_table_get_low("SYS_FOREIGN");
3571 
3572 	if (sys_foreign == NULL) {
3573 		/* No foreign keys defined yet in this database */
3574 
3575 		ib::info() << "No foreign key system tables in the database";
3576 		DBUG_RETURN(DB_ERROR);
3577 	}
3578 
3579 	ut_ad(!dict_table_is_comp(sys_foreign));
3580 	mtr_start(&mtr);
3581 
3582 	/* Get the secondary index based on FOR_NAME from table
3583 	SYS_FOREIGN */
3584 
3585 	sec_index = dict_table_get_next_index(
3586 		dict_table_get_first_index(sys_foreign));
3587 	ut_ad(!dict_index_is_clust(sec_index));
3588 start_load:
3589 
3590 	tuple = dtuple_create_from_mem(tuple_buf, sizeof(tuple_buf), 1, 0);
3591 	dfield = dtuple_get_nth_field(tuple, 0);
3592 
3593 	dfield_set_data(dfield, table_name, ut_strlen(table_name));
3594 	dict_index_copy_types(tuple, sec_index, 1);
3595 
3596 	btr_pcur_open_on_user_rec(sec_index, tuple, PAGE_CUR_GE,
3597 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3598 loop:
3599 	rec = btr_pcur_get_rec(&pcur);
3600 
3601 	if (!btr_pcur_is_on_user_rec(&pcur)) {
3602 		/* End of index */
3603 
3604 		goto load_next_index;
3605 	}
3606 
3607 	/* Now we have the record in the secondary index containing a table
3608 	name and a foreign constraint ID */
3609 
3610 	field = rec_get_nth_field_old(
3611 		rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__NAME, &len);
3612 
3613 	/* Check if the table name in the record is the one searched for; the
3614 	following call does the comparison in the latin1_swedish_ci
3615 	charset-collation, in a case-insensitive way. */
3616 
3617 	if (0 != cmp_data_data(dfield_get_type(dfield)->mtype,
3618 			       dfield_get_type(dfield)->prtype,
3619 			       static_cast<const byte*>(
3620 				       dfield_get_data(dfield)),
3621 			       dfield_get_len(dfield),
3622 			       field, len)) {
3623 
3624 		goto load_next_index;
3625 	}
3626 
3627 	/* Since table names in SYS_FOREIGN are stored in a case-insensitive
3628 	order, we have to check that the table name matches also in a binary
3629 	string comparison. On Unix, MySQL allows table names that only differ
3630 	in character case.  If lower_case_table_names=2 then what is stored
3631 	may not be the same case, but the previous comparison showed that they
3632 	match with no-case.  */
3633 
3634 	if (rec_get_deleted_flag(rec, 0)) {
3635 		goto next_rec;
3636 	}
3637 
3638 	if ((innobase_get_lower_case_table_names() != 2)
3639 	    && (0 != ut_memcmp(field, table_name, len))) {
3640 		goto next_rec;
3641 	}
3642 
3643 	/* Now we get a foreign key constraint id */
3644 	field = rec_get_nth_field_old(
3645 		rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__ID, &len);
3646 
3647 	/* Copy the string because the page may be modified or evicted
3648 	after mtr_commit() below. */
3649 	char	fk_id[MAX_TABLE_NAME_LEN + 1];
3650 
3651 	ut_a(len <= MAX_TABLE_NAME_LEN);
3652 	memcpy(fk_id, field, len);
3653 	fk_id[len] = '\0';
3654 
3655 	btr_pcur_store_position(&pcur, &mtr);
3656 
3657 	mtr_commit(&mtr);
3658 
3659 	/* Load the foreign constraint definition to the dictionary cache */
3660 
3661 	err = dict_load_foreign(fk_id, col_names,
3662 				check_recursive, check_charsets, ignore_err,
3663 				fk_tables);
3664 
3665 	if (err != DB_SUCCESS) {
3666 		btr_pcur_close(&pcur);
3667 
3668 		DBUG_RETURN(err);
3669 	}
3670 
3671 	mtr_start(&mtr);
3672 
3673 	btr_pcur_restore_position(BTR_SEARCH_LEAF, &pcur, &mtr);
3674 next_rec:
3675 	btr_pcur_move_to_next_user_rec(&pcur, &mtr);
3676 
3677 	goto loop;
3678 
3679 load_next_index:
3680 	btr_pcur_close(&pcur);
3681 	mtr_commit(&mtr);
3682 
3683 	sec_index = dict_table_get_next_index(sec_index);
3684 
3685 	if (sec_index != NULL) {
3686 
3687 		mtr_start(&mtr);
3688 
3689 		/* Switch to scan index on REF_NAME, fk_max_recusive_level
3690 		already been updated when scanning FOR_NAME index, no need to
3691 		update again */
3692 		check_recursive = FALSE;
3693 
3694 		goto start_load;
3695 	}
3696 
3697 	DBUG_RETURN(DB_SUCCESS);
3698 }
3699 
3700 /***********************************************************************//**
3701 Loads a table id based on the index id.
3702 @return	true if found */
3703 static
3704 bool
3705 dict_load_table_id_on_index_id(
3706 /*===========================*/
3707 	index_id_t		index_id,  /*!< in: index id */
3708 	table_id_t*		table_id) /*!< out: table id */
3709 {
3710 	/* check hard coded indexes */
3711 	switch(index_id) {
3712 	case DICT_TABLES_ID:
3713 	case DICT_COLUMNS_ID:
3714 	case DICT_INDEXES_ID:
3715 	case DICT_FIELDS_ID:
3716 		*table_id = index_id;
3717 		return true;
3718 	case DICT_TABLE_IDS_ID:
3719 		/* The following is a secondary index on SYS_TABLES */
3720 		*table_id = DICT_TABLES_ID;
3721 		return true;
3722 	}
3723 
3724 	bool		found = false;
3725 	mtr_t		mtr;
3726 
3727 	ut_ad(mutex_own(&(dict_sys->mutex)));
3728 
3729 	/* NOTE that the operation of this function is protected by
3730 	the dictionary mutex, and therefore no deadlocks can occur
3731 	with other dictionary operations. */
3732 
3733 	mtr_start(&mtr);
3734 
3735 	btr_pcur_t pcur;
3736 	const rec_t* rec = dict_startscan_system(&pcur, &mtr, SYS_INDEXES);
3737 
3738 	while (rec) {
3739 		ulint len;
3740 		const byte* field = rec_get_nth_field_old(
3741 			rec, DICT_FLD__SYS_INDEXES__ID, &len);
3742 		ut_ad(len == 8);
3743 
3744 		/* Check if the index id is the one searched for */
3745 		if (index_id == mach_read_from_8(field)) {
3746 			found = true;
3747 			/* Now we get the table id */
3748 			const byte* field = rec_get_nth_field_old(
3749 				rec,
3750 				DICT_FLD__SYS_INDEXES__TABLE_ID,
3751 				&len);
3752 			*table_id = mach_read_from_8(field);
3753 			break;
3754 		}
3755 		mtr_commit(&mtr);
3756 		mtr_start(&mtr);
3757 		rec = dict_getnext_system(&pcur, &mtr);
3758 	}
3759 
3760 	btr_pcur_close(&pcur);
3761 	mtr_commit(&mtr);
3762 
3763 	return(found);
3764 }
3765 
3766 dict_table_t* dict_table_open_on_index_id(index_id_t index_id)
3767 {
3768 	table_id_t table_id;
3769 	dict_table_t * table = NULL;
3770 	if (dict_load_table_id_on_index_id(index_id, &table_id)) {
3771 		table = dict_table_open_on_id(table_id, true,
3772 					      DICT_TABLE_OP_LOAD_TABLESPACE);
3773 	}
3774 
3775 	return table;
3776 }
3777