1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2016, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file dict/dict0load.cc
22 Loads to the memory cache database object definitions
23 from dictionary tables
24 
25 Created 4/24/1996 Heikki Tuuri
26 *******************************************************/
27 
28 #include "dict0load.h"
29 
30 #include "mysql_version.h"
31 #include "btr0pcur.h"
32 #include "btr0btr.h"
33 #include "dict0boot.h"
34 #include "dict0crea.h"
35 #include "dict0dict.h"
36 #include "dict0mem.h"
37 #include "dict0priv.h"
38 #include "dict0stats.h"
39 #include "fsp0file.h"
40 #include "fts0priv.h"
41 #include "mach0data.h"
42 #include "page0page.h"
43 #include "rem0cmp.h"
44 #include "srv0start.h"
45 #include "srv0srv.h"
46 #include "fts0opt.h"
47 
48 /** Following are the InnoDB system tables. The positions in
49 this array are referenced by enum dict_system_table_id. */
50 static const char* SYSTEM_TABLE_NAME[] = {
51 	"SYS_TABLES",
52 	"SYS_INDEXES",
53 	"SYS_COLUMNS",
54 	"SYS_FIELDS",
55 	"SYS_FOREIGN",
56 	"SYS_FOREIGN_COLS",
57 	"SYS_TABLESPACES",
58 	"SYS_DATAFILES",
59 	"SYS_VIRTUAL"
60 };
61 
62 /** Loads a table definition and also all its index definitions.
63 
64 Loads those foreign key constraints whose referenced table is already in
65 dictionary cache.  If a foreign key constraint is not loaded, then the
66 referenced table is pushed into the output stack (fk_tables), if it is not
67 NULL.  These tables must be subsequently loaded so that all the foreign
68 key constraints are loaded into memory.
69 
70 @param[in]	name		Table name in the db/tablename format
71 @param[in]	ignore_err	Error to be ignored when loading table
72 				and its index definition
73 @param[out]	fk_tables	Related table names that must also be
74 				loaded to ensure that all foreign key
75 				constraints are loaded.
76 @return table, NULL if does not exist; if the table is stored in an
77 .ibd file, but the file does not exist, then we set the
78 file_unreadable flag in the table object we return */
79 static
80 dict_table_t*
81 dict_load_table_one(
82 	const table_name_t&	name,
83 	dict_err_ignore_t	ignore_err,
84 	dict_names_t&		fk_tables);
85 
86 /** Load a table definition from a SYS_TABLES record to dict_table_t.
87 Do not load any columns or indexes.
88 @param[in]	name		Table name
89 @param[in]	rec		SYS_TABLES record
90 @param[out,own]	table		table, or NULL
91 @return	error message
92 @retval	NULL on success */
93 static const char* dict_load_table_low(const table_name_t& name,
94 				       const rec_t* rec, dict_table_t** table)
95 	MY_ATTRIBUTE((nonnull, warn_unused_result));
96 
97 /** Load an index definition from a SYS_INDEXES record to dict_index_t.
98 If allocate=TRUE, we will create a dict_index_t structure and fill it
99 accordingly. If allocated=FALSE, the dict_index_t will be supplied by
100 the caller and filled with information read from the record.
101 @return	error message
102 @retval	NULL on success */
103 static
104 const char*
105 dict_load_index_low(
106 	byte*		table_id,	/*!< in/out: table id (8 bytes),
107 					an "in" value if allocate=TRUE
108 					and "out" when allocate=FALSE */
109 	mem_heap_t*	heap,		/*!< in/out: temporary memory heap */
110 	const rec_t*	rec,		/*!< in: SYS_INDEXES record */
111 	ibool		allocate,	/*!< in: TRUE=allocate *index,
112 					FALSE=fill in a pre-allocated
113 					*index */
114 	dict_index_t**	index);		/*!< out,own: index, or NULL */
115 
116 /** Load a table column definition from a SYS_COLUMNS record to dict_table_t.
117 @return	error message
118 @retval	NULL on success */
119 static
120 const char*
121 dict_load_column_low(
122 	dict_table_t*	table,		/*!< in/out: table, could be NULL
123 					if we just populate a dict_column_t
124 					struct with information from
125 					a SYS_COLUMNS record */
126 	mem_heap_t*	heap,		/*!< in/out: memory heap
127 					for temporary storage */
128 	dict_col_t*	column,		/*!< out: dict_column_t to fill,
129 					or NULL if table != NULL */
130 	table_id_t*	table_id,	/*!< out: table id */
131 	const char**	col_name,	/*!< out: column name */
132 	const rec_t*	rec,		/*!< in: SYS_COLUMNS record */
133 	ulint*		nth_v_col);	/*!< out: if not NULL, this
134 					records the "n" of "nth" virtual
135 					column */
136 
137 /** Load a virtual column "mapping" (to base columns) information
138 from a SYS_VIRTUAL record
139 @param[in,out]	table		table
140 @param[in,out]	column		mapped base column's dict_column_t
141 @param[in,out]	table_id	table id
142 @param[in,out]	pos		virtual column position
143 @param[in,out]	base_pos	base column position
144 @param[in]	rec		SYS_VIRTUAL record
145 @return	error message
146 @retval	NULL on success */
147 static
148 const char*
149 dict_load_virtual_low(
150 	dict_table_t*	table,
151 	dict_col_t**	column,
152 	table_id_t*	table_id,
153 	ulint*		pos,
154 	ulint*		base_pos,
155 	const rec_t*	rec);
156 
157 /** Load an index field definition from a SYS_FIELDS record to dict_index_t.
158 @return	error message
159 @retval	NULL on success */
160 static
161 const char*
162 dict_load_field_low(
163 	byte*		index_id,	/*!< in/out: index id (8 bytes)
164 					an "in" value if index != NULL
165 					and "out" if index == NULL */
166 	dict_index_t*	index,		/*!< in/out: index, could be NULL
167 					if we just populate a dict_field_t
168 					struct with information from
169 					a SYS_FIELDS record */
170 	dict_field_t*	sys_field,	/*!< out: dict_field_t to be
171 					filled */
172 	ulint*		pos,		/*!< out: Field position */
173 	byte*		last_index_id,	/*!< in: last index id */
174 	mem_heap_t*	heap,		/*!< in/out: memory heap
175 					for temporary storage */
176 	const rec_t*	rec);		/*!< in: SYS_FIELDS record */
177 
178 /* If this flag is TRUE, then we will load the cluster index's (and tables')
179 metadata even if it is marked as "corrupted". */
180 my_bool     srv_load_corrupted;
181 
182 #ifdef UNIV_DEBUG
183 /****************************************************************//**
184 Compare the name of an index column.
185 @return TRUE if the i'th column of index is 'name'. */
186 static
187 ibool
name_of_col_is(const dict_table_t * table,const dict_index_t * index,ulint i,const char * name)188 name_of_col_is(
189 /*===========*/
190 	const dict_table_t*	table,	/*!< in: table */
191 	const dict_index_t*	index,	/*!< in: index */
192 	ulint			i,	/*!< in: index field offset */
193 	const char*		name)	/*!< in: name to compare to */
194 {
195 	ulint	tmp = dict_col_get_no(dict_field_get_col(
196 					      dict_index_get_nth_field(
197 						      index, i)));
198 
199 	return(strcmp(name, dict_table_get_col_name(table, tmp)) == 0);
200 }
201 #endif /* UNIV_DEBUG */
202 
203 /********************************************************************//**
204 Finds the first table name in the given database.
205 @return own: table name, NULL if does not exist; the caller must free
206 the memory in the string! */
207 char*
dict_get_first_table_name_in_db(const char * name)208 dict_get_first_table_name_in_db(
209 /*============================*/
210 	const char*	name)	/*!< in: database name which ends in '/' */
211 {
212 	dict_table_t*	sys_tables;
213 	btr_pcur_t	pcur;
214 	dict_index_t*	sys_index;
215 	dtuple_t*	tuple;
216 	mem_heap_t*	heap;
217 	dfield_t*	dfield;
218 	const rec_t*	rec;
219 	const byte*	field;
220 	ulint		len;
221 	mtr_t		mtr;
222 
223 	ut_ad(mutex_own(&dict_sys.mutex));
224 
225 	heap = mem_heap_create(1000);
226 
227 	mtr_start(&mtr);
228 
229 	sys_tables = dict_table_get_low("SYS_TABLES");
230 	sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
231 	ut_ad(!dict_table_is_comp(sys_tables));
232 
233 	tuple = dtuple_create(heap, 1);
234 	dfield = dtuple_get_nth_field(tuple, 0);
235 
236 	dfield_set_data(dfield, name, strlen(name));
237 	dict_index_copy_types(tuple, sys_index, 1);
238 
239 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
240 				  BTR_SEARCH_LEAF, &pcur, &mtr);
241 loop:
242 	rec = btr_pcur_get_rec(&pcur);
243 
244 	if (!btr_pcur_is_on_user_rec(&pcur)) {
245 		/* Not found */
246 
247 		btr_pcur_close(&pcur);
248 		mtr_commit(&mtr);
249 		mem_heap_free(heap);
250 
251 		return(NULL);
252 	}
253 
254 	field = rec_get_nth_field_old(
255 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
256 
257 	if (len < strlen(name)
258 	    || memcmp(name, field, strlen(name))) {
259 		/* Not found */
260 
261 		btr_pcur_close(&pcur);
262 		mtr_commit(&mtr);
263 		mem_heap_free(heap);
264 
265 		return(NULL);
266 	}
267 
268 	if (!rec_get_deleted_flag(rec, 0)) {
269 
270 		/* We found one */
271 
272 		char*	table_name = mem_strdupl((char*) field, len);
273 
274 		btr_pcur_close(&pcur);
275 		mtr_commit(&mtr);
276 		mem_heap_free(heap);
277 
278 		return(table_name);
279 	}
280 
281 	btr_pcur_move_to_next_user_rec(&pcur, &mtr);
282 
283 	goto loop;
284 }
285 
286 /********************************************************************//**
287 This function gets the next system table record as it scans the table.
288 @return the next record if found, NULL if end of scan */
289 static
290 const rec_t*
dict_getnext_system_low(btr_pcur_t * pcur,mtr_t * mtr)291 dict_getnext_system_low(
292 /*====================*/
293 	btr_pcur_t*	pcur,		/*!< in/out: persistent cursor to the
294 					record*/
295 	mtr_t*		mtr)		/*!< in: the mini-transaction */
296 {
297 	rec_t*	rec = NULL;
298 
299 	while (!rec || rec_get_deleted_flag(rec, 0)) {
300 		btr_pcur_move_to_next_user_rec(pcur, mtr);
301 
302 		rec = btr_pcur_get_rec(pcur);
303 
304 		if (!btr_pcur_is_on_user_rec(pcur)) {
305 			/* end of index */
306 			btr_pcur_close(pcur);
307 
308 			return(NULL);
309 		}
310 	}
311 
312 	/* Get a record, let's save the position */
313 	btr_pcur_store_position(pcur, mtr);
314 
315 	return(rec);
316 }
317 
318 /********************************************************************//**
319 This function opens a system table, and returns the first record.
320 @return first record of the system table */
321 const rec_t*
dict_startscan_system(btr_pcur_t * pcur,mtr_t * mtr,dict_system_id_t system_id)322 dict_startscan_system(
323 /*==================*/
324 	btr_pcur_t*	pcur,		/*!< out: persistent cursor to
325 					the record */
326 	mtr_t*		mtr,		/*!< in: the mini-transaction */
327 	dict_system_id_t system_id)	/*!< in: which system table to open */
328 {
329 	dict_table_t*	system_table;
330 	dict_index_t*	clust_index;
331 	const rec_t*	rec;
332 
333 	ut_a(system_id < SYS_NUM_SYSTEM_TABLES);
334 
335 	system_table = dict_table_get_low(SYSTEM_TABLE_NAME[system_id]);
336 
337 	clust_index = UT_LIST_GET_FIRST(system_table->indexes);
338 
339 	btr_pcur_open_at_index_side(true, clust_index, BTR_SEARCH_LEAF, pcur,
340 				    true, 0, mtr);
341 
342 	rec = dict_getnext_system_low(pcur, mtr);
343 
344 	return(rec);
345 }
346 
347 /********************************************************************//**
348 This function gets the next system table record as it scans the table.
349 @return the next record if found, NULL if end of scan */
350 const rec_t*
dict_getnext_system(btr_pcur_t * pcur,mtr_t * mtr)351 dict_getnext_system(
352 /*================*/
353 	btr_pcur_t*	pcur,		/*!< in/out: persistent cursor
354 					to the record */
355 	mtr_t*		mtr)		/*!< in: the mini-transaction */
356 {
357 	const rec_t*	rec;
358 
359 	/* Restore the position */
360 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
361 
362 	/* Get the next record */
363 	rec = dict_getnext_system_low(pcur, mtr);
364 
365 	return(rec);
366 }
367 
368 /********************************************************************//**
369 This function processes one SYS_TABLES record and populate the dict_table_t
370 struct for the table.
371 @return error message, or NULL on success */
372 const char*
dict_process_sys_tables_rec_and_mtr_commit(mem_heap_t * heap,const rec_t * rec,dict_table_t ** table,bool cached,mtr_t * mtr)373 dict_process_sys_tables_rec_and_mtr_commit(
374 /*=======================================*/
375 	mem_heap_t*	heap,		/*!< in/out: temporary memory heap */
376 	const rec_t*	rec,		/*!< in: SYS_TABLES record */
377 	dict_table_t**	table,		/*!< out: dict_table_t to fill */
378 	bool		cached,		/*!< in: whether to load from cache */
379 	mtr_t*		mtr)		/*!< in/out: mini-transaction,
380 					will be committed */
381 {
382 	ulint		len;
383 	const char*	field;
384 
385 	field = (const char*) rec_get_nth_field_old(
386 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
387 
388 	ut_a(!rec_get_deleted_flag(rec, 0));
389 
390 	ut_ad(mtr->memo_contains_page_flagged(rec, MTR_MEMO_PAGE_S_FIX));
391 
392 	/* Get the table name */
393 	table_name_t table_name(mem_heap_strdupl(heap, field, len));
394 
395 	if (cached) {
396 		/* Commit before load the table again */
397 		mtr_commit(mtr);
398 
399 		*table = dict_table_get_low(table_name.m_name);
400 		return *table ? NULL : "Table not found in cache";
401 	} else {
402 		const char* err = dict_load_table_low(table_name, rec, table);
403 		mtr_commit(mtr);
404 		return err;
405 	}
406 }
407 
408 /********************************************************************//**
409 This function parses a SYS_INDEXES record and populate a dict_index_t
410 structure with the information from the record. For detail information
411 about SYS_INDEXES fields, please refer to dict_boot() function.
412 @return error message, or NULL on success */
413 const char*
dict_process_sys_indexes_rec(mem_heap_t * heap,const rec_t * rec,dict_index_t * index,table_id_t * table_id)414 dict_process_sys_indexes_rec(
415 /*=========================*/
416 	mem_heap_t*	heap,		/*!< in/out: heap memory */
417 	const rec_t*	rec,		/*!< in: current SYS_INDEXES rec */
418 	dict_index_t*	index,		/*!< out: index to be filled */
419 	table_id_t*	table_id)	/*!< out: index table id */
420 {
421 	const char*	err_msg;
422 	byte*		buf;
423 
424 	ut_d(index->is_dummy = true);
425 	ut_d(index->in_instant_init = false);
426 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
427 
428 	/* Parse the record, and get "dict_index_t" struct filled */
429 	err_msg = dict_load_index_low(buf, heap, rec, FALSE, &index);
430 
431 	*table_id = mach_read_from_8(buf);
432 
433 	return(err_msg);
434 }
435 
436 /********************************************************************//**
437 This function parses a SYS_COLUMNS record and populate a dict_column_t
438 structure with the information from the record.
439 @return error message, or NULL on success */
440 const char*
dict_process_sys_columns_rec(mem_heap_t * heap,const rec_t * rec,dict_col_t * column,table_id_t * table_id,const char ** col_name,ulint * nth_v_col)441 dict_process_sys_columns_rec(
442 /*=========================*/
443 	mem_heap_t*	heap,		/*!< in/out: heap memory */
444 	const rec_t*	rec,		/*!< in: current SYS_COLUMNS rec */
445 	dict_col_t*	column,		/*!< out: dict_col_t to be filled */
446 	table_id_t*	table_id,	/*!< out: table id */
447 	const char**	col_name,	/*!< out: column name */
448 	ulint*		nth_v_col)	/*!< out: if virtual col, this is
449 					record's sequence number */
450 {
451 	const char*	err_msg;
452 
453 	/* Parse the record, and get "dict_col_t" struct filled */
454 	err_msg = dict_load_column_low(NULL, heap, column,
455 				       table_id, col_name, rec, nth_v_col);
456 
457 	return(err_msg);
458 }
459 
460 /** This function parses a SYS_VIRTUAL record and extracts virtual column
461 information
462 @param[in]	rec		current SYS_COLUMNS rec
463 @param[in,out]	table_id	table id
464 @param[in,out]	pos		virtual column position
465 @param[in,out]	base_pos	base column position
466 @return error message, or NULL on success */
467 const char*
dict_process_sys_virtual_rec(const rec_t * rec,table_id_t * table_id,ulint * pos,ulint * base_pos)468 dict_process_sys_virtual_rec(
469 	const rec_t*	rec,
470 	table_id_t*	table_id,
471 	ulint*		pos,
472 	ulint*		base_pos)
473 {
474 	const char*	err_msg;
475 
476 	/* Parse the record, and get "dict_col_t" struct filled */
477 	err_msg = dict_load_virtual_low(NULL, NULL, table_id,
478 					pos, base_pos, rec);
479 
480 	return(err_msg);
481 }
482 
483 /********************************************************************//**
484 This function parses a SYS_FIELDS record and populates a dict_field_t
485 structure with the information from the record.
486 @return error message, or NULL on success */
487 const char*
dict_process_sys_fields_rec(mem_heap_t * heap,const rec_t * rec,dict_field_t * sys_field,ulint * pos,index_id_t * index_id,index_id_t last_id)488 dict_process_sys_fields_rec(
489 /*========================*/
490 	mem_heap_t*	heap,		/*!< in/out: heap memory */
491 	const rec_t*	rec,		/*!< in: current SYS_FIELDS rec */
492 	dict_field_t*	sys_field,	/*!< out: dict_field_t to be
493 					filled */
494 	ulint*		pos,		/*!< out: Field position */
495 	index_id_t*	index_id,	/*!< out: current index id */
496 	index_id_t	last_id)	/*!< in: previous index id */
497 {
498 	byte*		buf;
499 	byte*		last_index_id;
500 	const char*	err_msg;
501 
502 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
503 
504 	last_index_id = static_cast<byte*>(mem_heap_alloc(heap, 8));
505 	mach_write_to_8(last_index_id, last_id);
506 
507 	err_msg = dict_load_field_low(buf, NULL, sys_field,
508 				      pos, last_index_id, heap, rec);
509 
510 	*index_id = mach_read_from_8(buf);
511 
512 	return(err_msg);
513 
514 }
515 
516 /********************************************************************//**
517 This function parses a SYS_FOREIGN record and populate a dict_foreign_t
518 structure with the information from the record. For detail information
519 about SYS_FOREIGN fields, please refer to dict_load_foreign() function.
520 @return error message, or NULL on success */
521 const char*
dict_process_sys_foreign_rec(mem_heap_t * heap,const rec_t * rec,dict_foreign_t * foreign)522 dict_process_sys_foreign_rec(
523 /*=========================*/
524 	mem_heap_t*	heap,		/*!< in/out: heap memory */
525 	const rec_t*	rec,		/*!< in: current SYS_FOREIGN rec */
526 	dict_foreign_t*	foreign)	/*!< out: dict_foreign_t struct
527 					to be filled */
528 {
529 	ulint		len;
530 	const byte*	field;
531 
532 	if (rec_get_deleted_flag(rec, 0)) {
533 		return("delete-marked record in SYS_FOREIGN");
534 	}
535 
536 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN) {
537 		return("wrong number of columns in SYS_FOREIGN record");
538 	}
539 
540 	field = rec_get_nth_field_old(
541 		rec, DICT_FLD__SYS_FOREIGN__ID, &len);
542 	if (len == 0 || len == UNIV_SQL_NULL) {
543 err_len:
544 		return("incorrect column length in SYS_FOREIGN");
545 	}
546 
547 	/* This receives a dict_foreign_t* that points to a stack variable.
548 	So dict_foreign_free(foreign) is not used as elsewhere.
549 	Since the heap used here is freed elsewhere, foreign->heap
550 	is not assigned. */
551 	foreign->id = mem_heap_strdupl(heap, (const char*) field, len);
552 
553 	rec_get_nth_field_offs_old(
554 		rec, DICT_FLD__SYS_FOREIGN__DB_TRX_ID, &len);
555 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
556 		goto err_len;
557 	}
558 	rec_get_nth_field_offs_old(
559 		rec, DICT_FLD__SYS_FOREIGN__DB_ROLL_PTR, &len);
560 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
561 		goto err_len;
562 	}
563 
564 	/* The _lookup versions of the referenced and foreign table names
565 	 are not assigned since they are not used in this dict_foreign_t */
566 
567 	field = rec_get_nth_field_old(
568 		rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
569 	if (len == 0 || len == UNIV_SQL_NULL) {
570 		goto err_len;
571 	}
572 	foreign->foreign_table_name = mem_heap_strdupl(
573 		heap, (const char*) field, len);
574 
575 	field = rec_get_nth_field_old(
576 		rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
577 	if (len == 0 || len == UNIV_SQL_NULL) {
578 		goto err_len;
579 	}
580 	foreign->referenced_table_name = mem_heap_strdupl(
581 		heap, (const char*) field, len);
582 
583 	field = rec_get_nth_field_old(
584 		rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len);
585 	if (len != 4) {
586 		goto err_len;
587 	}
588 	uint32_t n_fields_and_type = mach_read_from_4(field);
589 
590 	foreign->type = n_fields_and_type >> 24 & ((1U << 6) - 1);
591 	foreign->n_fields = n_fields_and_type & dict_index_t::MAX_N_FIELDS;
592 
593 	return(NULL);
594 }
595 
596 /********************************************************************//**
597 This function parses a SYS_FOREIGN_COLS record and extract necessary
598 information from the record and return to caller.
599 @return error message, or NULL on success */
600 const char*
dict_process_sys_foreign_col_rec(mem_heap_t * heap,const rec_t * rec,const char ** name,const char ** for_col_name,const char ** ref_col_name,ulint * pos)601 dict_process_sys_foreign_col_rec(
602 /*=============================*/
603 	mem_heap_t*	heap,		/*!< in/out: heap memory */
604 	const rec_t*	rec,		/*!< in: current SYS_FOREIGN_COLS rec */
605 	const char**	name,		/*!< out: foreign key constraint name */
606 	const char**	for_col_name,	/*!< out: referencing column name */
607 	const char**	ref_col_name,	/*!< out: referenced column name
608 					in referenced table */
609 	ulint*		pos)		/*!< out: column position */
610 {
611 	ulint		len;
612 	const byte*	field;
613 
614 	if (rec_get_deleted_flag(rec, 0)) {
615 		return("delete-marked record in SYS_FOREIGN_COLS");
616 	}
617 
618 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN_COLS) {
619 		return("wrong number of columns in SYS_FOREIGN_COLS record");
620 	}
621 
622 	field = rec_get_nth_field_old(
623 		rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
624 	if (len == 0 || len == UNIV_SQL_NULL) {
625 err_len:
626 		return("incorrect column length in SYS_FOREIGN_COLS");
627 	}
628 	*name = mem_heap_strdupl(heap, (char*) field, len);
629 
630 	field = rec_get_nth_field_old(
631 		rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
632 	if (len != 4) {
633 		goto err_len;
634 	}
635 	*pos = mach_read_from_4(field);
636 
637 	rec_get_nth_field_offs_old(
638 		rec, DICT_FLD__SYS_FOREIGN_COLS__DB_TRX_ID, &len);
639 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
640 		goto err_len;
641 	}
642 	rec_get_nth_field_offs_old(
643 		rec, DICT_FLD__SYS_FOREIGN_COLS__DB_ROLL_PTR, &len);
644 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
645 		goto err_len;
646 	}
647 
648 	field = rec_get_nth_field_old(
649 		rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
650 	if (len == 0 || len == UNIV_SQL_NULL) {
651 		goto err_len;
652 	}
653 	*for_col_name = mem_heap_strdupl(heap, (char*) field, len);
654 
655 	field = rec_get_nth_field_old(
656 		rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
657 	if (len == 0 || len == UNIV_SQL_NULL) {
658 		goto err_len;
659 	}
660 	*ref_col_name = mem_heap_strdupl(heap, (char*) field, len);
661 
662 	return(NULL);
663 }
664 
665 /********************************************************************//**
666 This function parses a SYS_TABLESPACES record, extracts necessary
667 information from the record and returns to caller.
668 @return error message, or NULL on success */
669 const char*
dict_process_sys_tablespaces(mem_heap_t * heap,const rec_t * rec,uint32_t * space,const char ** name,ulint * flags)670 dict_process_sys_tablespaces(
671 /*=========================*/
672 	mem_heap_t*	heap,		/*!< in/out: heap memory */
673 	const rec_t*	rec,		/*!< in: current SYS_TABLESPACES rec */
674 	uint32_t*	space,		/*!< out: tablespace identifier */
675 	const char**	name,		/*!< out: tablespace name */
676 	ulint*		flags)		/*!< out: tablespace flags */
677 {
678 	ulint		len;
679 	const byte*	field;
680 
681 	if (rec_get_deleted_flag(rec, 0)) {
682 		return("delete-marked record in SYS_TABLESPACES");
683 	}
684 
685 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLESPACES) {
686 		return("wrong number of columns in SYS_TABLESPACES record");
687 	}
688 
689 	field = rec_get_nth_field_old(
690 		rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
691 	if (len != DICT_FLD_LEN_SPACE) {
692 err_len:
693 		return("incorrect column length in SYS_TABLESPACES");
694 	}
695 	*space = mach_read_from_4(field);
696 
697 	rec_get_nth_field_offs_old(
698 		rec, DICT_FLD__SYS_TABLESPACES__DB_TRX_ID, &len);
699 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
700 		goto err_len;
701 	}
702 
703 	rec_get_nth_field_offs_old(
704 		rec, DICT_FLD__SYS_TABLESPACES__DB_ROLL_PTR, &len);
705 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
706 		goto err_len;
707 	}
708 
709 	field = rec_get_nth_field_old(
710 		rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
711 	if (len == 0 || len == UNIV_SQL_NULL) {
712 		goto err_len;
713 	}
714 	*name = mem_heap_strdupl(heap, (char*) field, len);
715 
716 	field = rec_get_nth_field_old(
717 		rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
718 	if (len != DICT_FLD_LEN_FLAGS) {
719 		goto err_len;
720 	}
721 	*flags = mach_read_from_4(field);
722 
723 	return(NULL);
724 }
725 
726 /********************************************************************//**
727 This function parses a SYS_DATAFILES record, extracts necessary
728 information from the record and returns it to the caller.
729 @return error message, or NULL on success */
730 const char*
dict_process_sys_datafiles(mem_heap_t * heap,const rec_t * rec,uint32_t * space,const char ** path)731 dict_process_sys_datafiles(
732 /*=======================*/
733 	mem_heap_t*	heap,		/*!< in/out: heap memory */
734 	const rec_t*	rec,		/*!< in: current SYS_DATAFILES rec */
735 	uint32_t*	space,		/*!< out: space id */
736 	const char**	path)		/*!< out: datafile paths */
737 {
738 	ulint		len;
739 	const byte*	field;
740 
741 	if (rec_get_deleted_flag(rec, 0)) {
742 		return("delete-marked record in SYS_DATAFILES");
743 	}
744 
745 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_DATAFILES) {
746 		return("wrong number of columns in SYS_DATAFILES record");
747 	}
748 
749 	field = rec_get_nth_field_old(
750 		rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
751 	if (len != DICT_FLD_LEN_SPACE) {
752 err_len:
753 		return("incorrect column length in SYS_DATAFILES");
754 	}
755 	*space = mach_read_from_4(field);
756 
757 	rec_get_nth_field_offs_old(
758 		rec, DICT_FLD__SYS_DATAFILES__DB_TRX_ID, &len);
759 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
760 		goto err_len;
761 	}
762 
763 	rec_get_nth_field_offs_old(
764 		rec, DICT_FLD__SYS_DATAFILES__DB_ROLL_PTR, &len);
765 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
766 		goto err_len;
767 	}
768 
769 	field = rec_get_nth_field_old(
770 		rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
771 	if (len == 0 || len == UNIV_SQL_NULL) {
772 		goto err_len;
773 	}
774 	*path = mem_heap_strdupl(heap, (char*) field, len);
775 
776 	return(NULL);
777 }
778 
779 /** Get the first filepath from SYS_DATAFILES for a given space_id.
780 @param[in]	space_id	Tablespace ID
781 @return First filepath (caller must invoke ut_free() on it)
782 @retval NULL if no SYS_DATAFILES entry was found. */
783 static char*
dict_get_first_path(ulint space_id)784 dict_get_first_path(
785 	ulint	space_id)
786 {
787 	mtr_t		mtr;
788 	dict_table_t*	sys_datafiles;
789 	dict_index_t*	sys_index;
790 	dtuple_t*	tuple;
791 	dfield_t*	dfield;
792 	byte*		buf;
793 	btr_pcur_t	pcur;
794 	const rec_t*	rec;
795 	const byte*	field;
796 	ulint		len;
797 	char*		filepath = NULL;
798 	mem_heap_t*	heap = mem_heap_create(1024);
799 
800 	ut_ad(mutex_own(&dict_sys.mutex));
801 
802 	mtr_start(&mtr);
803 
804 	sys_datafiles = dict_table_get_low("SYS_DATAFILES");
805 	sys_index = UT_LIST_GET_FIRST(sys_datafiles->indexes);
806 
807 	ut_ad(!dict_table_is_comp(sys_datafiles));
808 	ut_ad(name_of_col_is(sys_datafiles, sys_index,
809 			     DICT_FLD__SYS_DATAFILES__SPACE, "SPACE"));
810 	ut_ad(name_of_col_is(sys_datafiles, sys_index,
811 			     DICT_FLD__SYS_DATAFILES__PATH, "PATH"));
812 
813 	tuple = dtuple_create(heap, 1);
814 	dfield = dtuple_get_nth_field(tuple, DICT_FLD__SYS_DATAFILES__SPACE);
815 
816 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
817 	mach_write_to_4(buf, space_id);
818 
819 	dfield_set_data(dfield, buf, 4);
820 	dict_index_copy_types(tuple, sys_index, 1);
821 
822 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
823 				  BTR_SEARCH_LEAF, &pcur, &mtr);
824 
825 	rec = btr_pcur_get_rec(&pcur);
826 
827 	/* Get the filepath from this SYS_DATAFILES record. */
828 	if (btr_pcur_is_on_user_rec(&pcur)) {
829 		field = rec_get_nth_field_old(
830 			rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
831 		ut_a(len == 4);
832 
833 		if (space_id == mach_read_from_4(field)) {
834 			/* A record for this space ID was found. */
835 			field = rec_get_nth_field_old(
836 				rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
837 
838 			ut_ad(len > 0);
839 			ut_ad(len < OS_FILE_MAX_PATH);
840 
841 			if (len > 0 && len < UNIV_SQL_NULL) {
842 				filepath = mem_strdupl(
843 					reinterpret_cast<const char*>(field),
844 					len);
845 				ut_ad(filepath != NULL);
846 
847 				/* The dictionary may have been written on
848 				another OS. */
849 				os_normalize_path(filepath);
850 			}
851 		}
852 	}
853 
854 	btr_pcur_close(&pcur);
855 	mtr_commit(&mtr);
856 	mem_heap_free(heap);
857 
858 	return(filepath);
859 }
860 
861 /** Update the record for space_id in SYS_TABLESPACES to this filepath.
862 @param[in]	space_id	Tablespace ID
863 @param[in]	filepath	Tablespace filepath
864 @return DB_SUCCESS if OK, dberr_t if the insert failed */
865 dberr_t
dict_update_filepath(ulint space_id,const char * filepath)866 dict_update_filepath(
867 	ulint		space_id,
868 	const char*	filepath)
869 {
870 	if (!srv_sys_tablespaces_open) {
871 		/* Startup procedure is not yet ready for updates. */
872 		return(DB_SUCCESS);
873 	}
874 
875 	dberr_t		err = DB_SUCCESS;
876 	trx_t*		trx;
877 
878 	ut_d(dict_sys.assert_locked());
879 
880 	trx = trx_create();
881 	trx->op_info = "update filepath";
882 	trx->dict_operation_lock_mode = RW_X_LATCH;
883 	trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
884 
885 	pars_info_t*	info = pars_info_create();
886 
887 	pars_info_add_int4_literal(info, "space", space_id);
888 	pars_info_add_str_literal(info, "path", filepath);
889 
890 	err = que_eval_sql(info,
891 			   "PROCEDURE UPDATE_FILEPATH () IS\n"
892 			   "BEGIN\n"
893 			   "UPDATE SYS_DATAFILES"
894 			   " SET PATH = :path\n"
895 			   " WHERE SPACE = :space;\n"
896 			   "END;\n", FALSE, trx);
897 
898 	trx_commit_for_mysql(trx);
899 	trx->dict_operation_lock_mode = 0;
900 	trx->free();
901 
902 	if (UNIV_LIKELY(err == DB_SUCCESS)) {
903 		/* We just updated SYS_DATAFILES due to the contents in
904 		a link file.  Make a note that we did this. */
905 		ib::info() << "The InnoDB data dictionary table SYS_DATAFILES"
906 			" for tablespace ID " << space_id
907 			<< " was updated to use file " << filepath << ".";
908 	} else {
909 		ib::warn() << "Error occurred while updating InnoDB data"
910 			" dictionary table SYS_DATAFILES for tablespace ID "
911 			<< space_id << " to file " << filepath << ": "
912 			<< err << ".";
913 	}
914 
915 	return(err);
916 }
917 
918 /** Replace records in SYS_TABLESPACES and SYS_DATAFILES associated with
919 the given space_id using an independent transaction.
920 @param[in]	space_id	Tablespace ID
921 @param[in]	name		Tablespace name
922 @param[in]	filepath	First filepath
923 @param[in]	fsp_flags	Tablespace flags
924 @return DB_SUCCESS if OK, dberr_t if the insert failed */
925 dberr_t
dict_replace_tablespace_and_filepath(ulint space_id,const char * name,const char * filepath,ulint fsp_flags)926 dict_replace_tablespace_and_filepath(
927 	ulint		space_id,
928 	const char*	name,
929 	const char*	filepath,
930 	ulint		fsp_flags)
931 {
932 	if (!srv_sys_tablespaces_open) {
933 		/* Startup procedure is not yet ready for updates.
934 		Return success since this will likely get updated
935 		later. */
936 		return(DB_SUCCESS);
937 	}
938 
939 	dberr_t		err = DB_SUCCESS;
940 	trx_t*		trx;
941 
942 	DBUG_EXECUTE_IF("innodb_fail_to_update_tablespace_dict",
943 			return(DB_INTERRUPTED););
944 
945 	ut_d(dict_sys.assert_locked());
946 	ut_ad(filepath);
947 
948 	trx = trx_create();
949 	trx->op_info = "insert tablespace and filepath";
950 	trx->dict_operation_lock_mode = RW_X_LATCH;
951 	trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
952 
953 	/* A record for this space ID was not found in
954 	SYS_DATAFILES. Assume the record is also missing in
955 	SYS_TABLESPACES.  Insert records into them both. */
956 	err = dict_replace_tablespace_in_dictionary(
957 		space_id, name, fsp_flags, filepath, trx);
958 
959 	trx_commit_for_mysql(trx);
960 	trx->dict_operation_lock_mode = 0;
961 	trx->free();
962 
963 	return(err);
964 }
965 
966 /** Check the validity of a SYS_TABLES record
967 Make sure the fields are the right length and that they
968 do not contain invalid contents.
969 @param[in]	rec	SYS_TABLES record
970 @return error message, or NULL on success */
971 static
972 const char*
dict_sys_tables_rec_check(const rec_t * rec)973 dict_sys_tables_rec_check(
974 	const rec_t*	rec)
975 {
976 	const byte*	field;
977 	ulint		len;
978 
979 	ut_ad(mutex_own(&dict_sys.mutex));
980 
981 	if (rec_get_deleted_flag(rec, 0)) {
982 		return("delete-marked record in SYS_TABLES");
983 	}
984 
985 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLES) {
986 		return("wrong number of columns in SYS_TABLES record");
987 	}
988 
989 	rec_get_nth_field_offs_old(
990 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
991 	if (len == 0 || len == UNIV_SQL_NULL) {
992 err_len:
993 		return("incorrect column length in SYS_TABLES");
994 	}
995 	rec_get_nth_field_offs_old(
996 		rec, DICT_FLD__SYS_TABLES__DB_TRX_ID, &len);
997 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
998 		goto err_len;
999 	}
1000 	rec_get_nth_field_offs_old(
1001 		rec, DICT_FLD__SYS_TABLES__DB_ROLL_PTR, &len);
1002 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1003 		goto err_len;
1004 	}
1005 
1006 	rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__ID, &len);
1007 	if (len != 8) {
1008 		goto err_len;
1009 	}
1010 
1011 	field = rec_get_nth_field_old(
1012 		rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1013 	if (field == NULL || len != 4) {
1014 		goto err_len;
1015 	}
1016 
1017 	rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1018 	if (len != 4) {
1019 		goto err_len;
1020 	}
1021 
1022 	rec_get_nth_field_offs_old(
1023 		rec, DICT_FLD__SYS_TABLES__MIX_ID, &len);
1024 	if (len != 8) {
1025 		goto err_len;
1026 	}
1027 
1028 	field = rec_get_nth_field_old(
1029 		rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1030 	if (field == NULL || len != 4) {
1031 		goto err_len;
1032 	}
1033 
1034 	rec_get_nth_field_offs_old(
1035 		rec, DICT_FLD__SYS_TABLES__CLUSTER_ID, &len);
1036 	if (len != UNIV_SQL_NULL) {
1037 		goto err_len;
1038 	}
1039 
1040 	field = rec_get_nth_field_old(
1041 		rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1042 	if (field == NULL || len != 4) {
1043 		goto err_len;
1044 	}
1045 
1046 	return(NULL);
1047 }
1048 
1049 /** Read and return the contents of a SYS_TABLESPACES record.
1050 @param[in]	rec	A record of SYS_TABLESPACES
1051 @param[out]	id	Pointer to the space_id for this table
1052 @param[in,out]	name	Buffer for Tablespace Name of length NAME_LEN
1053 @param[out]	flags	Pointer to tablespace flags
1054 @return true if the record was read correctly, false if not. */
1055 bool
dict_sys_tablespaces_rec_read(const rec_t * rec,ulint * id,char * name,ulint * flags)1056 dict_sys_tablespaces_rec_read(
1057 	const rec_t*	rec,
1058 	ulint*		id,
1059 	char*		name,
1060 	ulint*		flags)
1061 {
1062 	const byte*	field;
1063 	ulint		len;
1064 
1065 	field = rec_get_nth_field_old(
1066 		rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
1067 	if (len != DICT_FLD_LEN_SPACE) {
1068 		ib::error() << "Wrong field length in SYS_TABLESPACES.SPACE: "
1069 		<< len;
1070 		return(false);
1071 	}
1072 	*id = mach_read_from_4(field);
1073 
1074 	field = rec_get_nth_field_old(
1075 		rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
1076 	if (len == 0 || len == UNIV_SQL_NULL) {
1077 		ib::error() << "Wrong field length in SYS_TABLESPACES.NAME: "
1078 			<< len;
1079 		return(false);
1080 	}
1081 	strncpy(name, reinterpret_cast<const char*>(field), NAME_LEN);
1082 
1083 	/* read the 4 byte flags from the TYPE field */
1084 	field = rec_get_nth_field_old(
1085 		rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
1086 	if (len != 4) {
1087 		ib::error() << "Wrong field length in SYS_TABLESPACES.FLAGS: "
1088 			<< len;
1089 		return(false);
1090 	}
1091 	*flags = mach_read_from_4(field);
1092 
1093 	return(true);
1094 }
1095 
1096 /** Check if SYS_TABLES.TYPE is valid
1097 @param[in]	type		SYS_TABLES.TYPE
1098 @param[in]	not_redundant	whether ROW_FORMAT=REDUNDANT is not used
1099 @return	whether the SYS_TABLES.TYPE value is valid */
1100 static
1101 bool
dict_sys_tables_type_valid(ulint type,bool not_redundant)1102 dict_sys_tables_type_valid(ulint type, bool not_redundant)
1103 {
1104 	/* The DATA_DIRECTORY flag can be assigned fully independently
1105 	of all other persistent table flags. */
1106 	type &= ~DICT_TF_MASK_DATA_DIR;
1107 
1108 	if (type == 1) {
1109 		return(true); /* ROW_FORMAT=REDUNDANT or ROW_FORMAT=COMPACT */
1110 	}
1111 
1112 	if (!(type & 1)) {
1113 		/* For ROW_FORMAT=REDUNDANT and ROW_FORMAT=COMPACT,
1114 		SYS_TABLES.TYPE=1. Else, it is the same as
1115 		dict_table_t::flags, and the least significant bit
1116 		would be set. So, the bit never can be 0. */
1117 		return(false);
1118 	}
1119 
1120 	if (!not_redundant) {
1121 		/* SYS_TABLES.TYPE must be 1 or 1|DICT_TF_MASK_NO_ROLLBACK
1122 		for ROW_FORMAT=REDUNDANT. */
1123 		return !(type & ~(1U | DICT_TF_MASK_NO_ROLLBACK));
1124 	}
1125 
1126 	if (type >= 1U << DICT_TF_POS_UNUSED) {
1127 		/* Some unknown bits are set. */
1128 		return(false);
1129 	}
1130 
1131 	return(dict_tf_is_valid_not_redundant(type));
1132 }
1133 
1134 /** Convert SYS_TABLES.TYPE to dict_table_t::flags.
1135 @param[in]	type		SYS_TABLES.TYPE
1136 @param[in]	not_redundant	whether ROW_FORMAT=REDUNDANT is not used
1137 @return	table flags */
1138 static
1139 ulint
dict_sys_tables_type_to_tf(ulint type,bool not_redundant)1140 dict_sys_tables_type_to_tf(ulint type, bool not_redundant)
1141 {
1142 	ut_ad(dict_sys_tables_type_valid(type, not_redundant));
1143 	ulint	flags = not_redundant ? 1 : 0;
1144 
1145 	/* ZIP_SSIZE, ATOMIC_BLOBS, DATA_DIR, PAGE_COMPRESSION,
1146 	PAGE_COMPRESSION_LEVEL are the same. */
1147 	flags |= type & (DICT_TF_MASK_ZIP_SSIZE
1148 			 | DICT_TF_MASK_ATOMIC_BLOBS
1149 			 | DICT_TF_MASK_DATA_DIR
1150 			 | DICT_TF_MASK_PAGE_COMPRESSION
1151 			 | DICT_TF_MASK_PAGE_COMPRESSION_LEVEL
1152 			 | DICT_TF_MASK_NO_ROLLBACK);
1153 
1154 	ut_ad(dict_tf_is_valid(flags));
1155 	return(flags);
1156 }
1157 
1158 /** Read and return 5 integer fields from a SYS_TABLES record.
1159 @param[in]	rec		A record of SYS_TABLES
1160 @param[in]	name		Table Name, the same as SYS_TABLES.NAME
1161 @param[out]	table_id	Pointer to the table_id for this table
1162 @param[out]	space_id	Pointer to the space_id for this table
1163 @param[out]	n_cols		Pointer to number of columns for this table.
1164 @param[out]	flags		Pointer to table flags
1165 @param[out]	flags2		Pointer to table flags2
1166 @return true if the record was read correctly, false if not. */
1167 MY_ATTRIBUTE((warn_unused_result))
1168 static
1169 bool
dict_sys_tables_rec_read(const rec_t * rec,const table_name_t & table_name,table_id_t * table_id,ulint * space_id,ulint * n_cols,ulint * flags,ulint * flags2)1170 dict_sys_tables_rec_read(
1171 	const rec_t*		rec,
1172 	const table_name_t&	table_name,
1173 	table_id_t*		table_id,
1174 	ulint*			space_id,
1175 	ulint*			n_cols,
1176 	ulint*			flags,
1177 	ulint*			flags2)
1178 {
1179 	const byte*	field;
1180 	ulint		len;
1181 	ulint		type;
1182 
1183 	field = rec_get_nth_field_old(
1184 		rec, DICT_FLD__SYS_TABLES__ID, &len);
1185 	ut_ad(len == 8);
1186 	*table_id = static_cast<table_id_t>(mach_read_from_8(field));
1187 
1188 	field = rec_get_nth_field_old(
1189 		rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1190 	ut_ad(len == 4);
1191 	*space_id = mach_read_from_4(field);
1192 
1193 	/* Read the 4 byte flags from the TYPE field */
1194 	field = rec_get_nth_field_old(
1195 		rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1196 	ut_a(len == 4);
1197 	type = mach_read_from_4(field);
1198 
1199 	/* Handle MDEV-12873 InnoDB SYS_TABLES.TYPE incompatibility
1200 	for PAGE_COMPRESSED=YES in MariaDB 10.2.2 to 10.2.6.
1201 
1202 	MariaDB 10.2.2 introduced the SHARED_SPACE flag from MySQL 5.7,
1203 	shifting the flags PAGE_COMPRESSION, PAGE_COMPRESSION_LEVEL,
1204 	ATOMIC_WRITES (repurposed to NO_ROLLBACK in 10.3.1) by one bit.
1205 	The SHARED_SPACE flag would always
1206 	be written as 0 by MariaDB, because MariaDB does not support
1207 	CREATE TABLESPACE or CREATE TABLE...TABLESPACE for InnoDB.
1208 
1209 	So, instead of the bits AALLLLCxxxxxxx we would have
1210 	AALLLLC0xxxxxxx if the table was created with MariaDB 10.2.2
1211 	to 10.2.6. (AA=ATOMIC_WRITES, LLLL=PAGE_COMPRESSION_LEVEL,
1212 	C=PAGE_COMPRESSED, xxxxxxx=7 bits that were not moved.)
1213 
1214 	The case LLLLC=00000 is not a problem. The problem is the case
1215 	AALLLL10DB00001 where D is the (mostly ignored) DATA_DIRECTORY
1216 	flag and B is the ATOMIC_BLOBS flag (1 for ROW_FORMAT=DYNAMIC
1217 	and 0 for ROW_FORMAT=COMPACT in this case). Other low-order
1218 	bits must be so, because PAGE_COMPRESSED=YES is only allowed
1219 	for ROW_FORMAT=DYNAMIC and ROW_FORMAT=COMPACT, not for
1220 	ROW_FORMAT=REDUNDANT or ROW_FORMAT=COMPRESSED.
1221 
1222 	Starting with MariaDB 10.2.4, the flags would be
1223 	00LLLL10DB00001, because ATOMIC_WRITES is always written as 0.
1224 
1225 	We will concentrate on the PAGE_COMPRESSION_LEVEL and
1226 	PAGE_COMPRESSED=YES. PAGE_COMPRESSED=NO implies
1227 	PAGE_COMPRESSION_LEVEL=0, and in that case all the affected
1228 	bits will be 0. For PAGE_COMPRESSED=YES, the values 1..9 are
1229 	allowed for PAGE_COMPRESSION_LEVEL. That is, we must interpret
1230 	the bits AALLLL10DB00001 as AALLLL1DB00001.
1231 
1232 	If someone created a table in MariaDB 10.2.2 or 10.2.3 with
1233 	the attribute ATOMIC_WRITES=OFF (value 2) and without
1234 	PAGE_COMPRESSED=YES or PAGE_COMPRESSION_LEVEL, that should be
1235 	rejected. The value ATOMIC_WRITES=ON (1) would look like
1236 	ATOMIC_WRITES=OFF, but it would be ignored starting with
1237 	MariaDB 10.2.4. */
1238 	compile_time_assert(DICT_TF_POS_PAGE_COMPRESSION == 7);
1239 	compile_time_assert(DICT_TF_POS_UNUSED == 14);
1240 
1241 	if ((type & 0x19f) != 0x101) {
1242 		/* The table cannot have been created with MariaDB
1243 		10.2.2 to 10.2.6, because they would write the
1244 		low-order bits of SYS_TABLES.TYPE as 0b10xx00001 for
1245 		PAGE_COMPRESSED=YES. No adjustment is applicable. */
1246 	} else if (type >= 3 << 13) {
1247 		/* 10.2.2 and 10.2.3 write ATOMIC_WRITES less than 3,
1248 		and no other flags above that can be set for the
1249 		SYS_TABLES.TYPE to be in the 10.2.2..10.2.6 format.
1250 		This would in any case be invalid format for 10.2 and
1251 		earlier releases. */
1252 		ut_ad(!dict_sys_tables_type_valid(type, true));
1253 	} else {
1254 		/* SYS_TABLES.TYPE is of the form AALLLL10DB00001.  We
1255 		must still validate that the LLLL bits are between 0
1256 		and 9 before we can discard the extraneous 0 bit. */
1257 		ut_ad(!DICT_TF_GET_PAGE_COMPRESSION(type));
1258 
1259 		if ((((type >> 9) & 0xf) - 1) < 9) {
1260 			ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) & 1);
1261 
1262 			type = (type & 0x7fU) | (type >> 1 & ~0x7fU);
1263 
1264 			ut_ad(DICT_TF_GET_PAGE_COMPRESSION(type));
1265 			ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) >= 1);
1266 			ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) <= 9);
1267 		} else {
1268 			ut_ad(!dict_sys_tables_type_valid(type, true));
1269 		}
1270 	}
1271 
1272 	/* The low order bit of SYS_TABLES.TYPE is always set to 1. But in
1273 	dict_table_t::flags the low order bit is used to determine if the
1274 	ROW_FORMAT=REDUNDANT (0) or anything else (1).
1275 	Read the 4 byte N_COLS field and look at the high order bit.  It
1276 	should be set for COMPACT and later.  It should not be set for
1277 	REDUNDANT. */
1278 	field = rec_get_nth_field_old(
1279 		rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1280 	ut_a(len == 4);
1281 	*n_cols = mach_read_from_4(field);
1282 
1283 	const bool not_redundant = 0 != (*n_cols & DICT_N_COLS_COMPACT);
1284 
1285 	if (!dict_sys_tables_type_valid(type, not_redundant)) {
1286 		ib::error() << "Table " << table_name << " in InnoDB"
1287 			" data dictionary contains invalid flags."
1288 			" SYS_TABLES.TYPE=" << type <<
1289 			" SYS_TABLES.N_COLS=" << *n_cols;
1290 		return(false);
1291 	}
1292 
1293 	*flags = dict_sys_tables_type_to_tf(type, not_redundant);
1294 
1295 	/* For tables created before MySQL 4.1, there may be
1296 	garbage in SYS_TABLES.MIX_LEN where flags2 are found. Such tables
1297 	would always be in ROW_FORMAT=REDUNDANT which do not have the
1298 	high bit set in n_cols, and flags would be zero.
1299 	MySQL 4.1 was the first version to support innodb_file_per_table,
1300 	that is, *space_id != 0. */
1301 	if (not_redundant || *space_id != 0 || *n_cols & DICT_N_COLS_COMPACT) {
1302 
1303 		/* Get flags2 from SYS_TABLES.MIX_LEN */
1304 		field = rec_get_nth_field_old(
1305 			rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1306 		*flags2 = mach_read_from_4(field);
1307 
1308 		if (!dict_tf2_is_valid(*flags, *flags2)) {
1309 			ib::error() << "Table " << table_name << " in InnoDB"
1310 				" data dictionary contains invalid flags."
1311 				" SYS_TABLES.TYPE=" << type
1312 				<< " SYS_TABLES.MIX_LEN=" << *flags2;
1313 			return(false);
1314 		}
1315 
1316 		/* DICT_TF2_FTS will be set when indexes are being loaded */
1317 		*flags2 &= ~DICT_TF2_FTS;
1318 
1319 		/* Now that we have used this bit, unset it. */
1320 		*n_cols &= ~DICT_N_COLS_COMPACT;
1321 	} else {
1322 		*flags2 = 0;
1323 	}
1324 
1325 	return(true);
1326 }
1327 
1328 /** Load and check each non-predefined tablespace mentioned in SYS_TABLES.
1329 Search SYS_TABLES and check each tablespace mentioned that has not
1330 already been added to the fil_system.  If it is valid, add it to the
1331 file_system list.
1332 @return the highest space ID found. */
dict_check_sys_tables()1333 static ulint dict_check_sys_tables()
1334 {
1335 	ulint		max_space_id = 0;
1336 	btr_pcur_t	pcur;
1337 	const rec_t*	rec;
1338 	mtr_t		mtr;
1339 
1340 	DBUG_ENTER("dict_check_sys_tables");
1341 
1342 	ut_d(dict_sys.assert_locked());
1343 
1344 	mtr_start(&mtr);
1345 
1346 	/* Before traversing SYS_TABLES, let's make sure we have
1347 	SYS_TABLESPACES and SYS_DATAFILES loaded. */
1348 	dict_table_t*	sys_tablespaces;
1349 	dict_table_t*	sys_datafiles;
1350 	sys_tablespaces = dict_table_get_low("SYS_TABLESPACES");
1351 	ut_a(sys_tablespaces != NULL);
1352 	sys_datafiles = dict_table_get_low("SYS_DATAFILES");
1353 	ut_a(sys_datafiles != NULL);
1354 
1355 	for (rec = dict_startscan_system(&pcur, &mtr, SYS_TABLES);
1356 	     rec != NULL;
1357 	     mtr.commit(), mtr.start(),
1358 	     rec = dict_getnext_system(&pcur, &mtr)) {
1359 		const byte*	field;
1360 		ulint		len;
1361 		table_id_t	table_id;
1362 		ulint		space_id;
1363 		ulint		n_cols;
1364 		ulint		flags;
1365 		ulint		flags2;
1366 
1367 		/* If a table record is not useable, ignore it and continue
1368 		on to the next record. Error messages were logged. */
1369 		if (dict_sys_tables_rec_check(rec) != NULL) {
1370 			continue;
1371 		}
1372 
1373 		/* Copy the table name from rec */
1374 		field = rec_get_nth_field_old(
1375 			rec, DICT_FLD__SYS_TABLES__NAME, &len);
1376 
1377 		table_name_t table_name(mem_strdupl((char*) field, len));
1378 		DBUG_PRINT("dict_check_sys_tables",
1379 			   ("name: %p, '%s'", table_name.m_name,
1380 			    table_name.m_name));
1381 
1382 		if (!dict_sys_tables_rec_read(rec, table_name,
1383 					      &table_id, &space_id,
1384 					      &n_cols, &flags, &flags2)
1385 		    || space_id == TRX_SYS_SPACE) {
1386 next:
1387 			ut_free(table_name.m_name);
1388 			continue;
1389 		}
1390 
1391 		if (strstr(table_name.m_name, "/" TEMP_FILE_PREFIX "-")) {
1392 			/* This table will be dropped by
1393 			row_mysql_drop_garbage_tables().
1394 			We do not care if the file exists. */
1395 			goto next;
1396 		}
1397 
1398 		if (flags2 & DICT_TF2_DISCARDED) {
1399 			ib::info() << "Ignoring tablespace for " << table_name
1400 				<< " because the DISCARD flag is set .";
1401 			goto next;
1402 		}
1403 
1404 		/* For tables or partitions using .ibd files, the flag
1405 		DICT_TF2_USE_FILE_PER_TABLE was not set in MIX_LEN
1406 		before MySQL 5.6.5. The flag should not have been
1407 		introduced in persistent storage. MariaDB will keep
1408 		setting the flag when writing SYS_TABLES entries for
1409 		newly created or rebuilt tables or partitions, but
1410 		will otherwise ignore the flag. */
1411 
1412 		/* Now that we have the proper name for this tablespace,
1413 		look to see if it is already in the tablespace cache. */
1414 		if (const fil_space_t* space
1415 		    = fil_space_for_table_exists_in_mem(
1416 			    space_id, table_name.m_name, flags)) {
1417 			/* Recovery can open a datafile that does not
1418 			match SYS_DATAFILES.  If they don't match, update
1419 			SYS_DATAFILES. */
1420 			char *dict_path = dict_get_first_path(space_id);
1421 			const char *fil_path = space->chain.start->name;
1422 			if (dict_path
1423 			    && strcmp(dict_path, fil_path)) {
1424 				dict_update_filepath(space_id, fil_path);
1425 			}
1426 			ut_free(dict_path);
1427 			ut_free(table_name.m_name);
1428 			continue;
1429 		}
1430 
1431 		/* Set the expected filepath from the data dictionary.
1432 		If the file is found elsewhere (from an ISL or the default
1433 		location) or this path is the same file but looks different,
1434 		fil_ibd_open() will update the dictionary with what is
1435 		opened. */
1436 		char*	filepath = dict_get_first_path(space_id);
1437 
1438 		/* Check that the .ibd file exists. */
1439 		if (!fil_ibd_open(
1440 			    false,
1441 			    !srv_read_only_mode && srv_log_file_size != 0,
1442 			    FIL_TYPE_TABLESPACE,
1443 			    space_id, dict_tf_to_fsp_flags(flags),
1444 			    table_name, filepath)) {
1445 			ib::warn() << "Ignoring tablespace for "
1446 				<< table_name
1447 				<< " because it could not be opened.";
1448 		}
1449 
1450 		max_space_id = ut_max(max_space_id, space_id);
1451 
1452 		ut_free(table_name.m_name);
1453 		ut_free(filepath);
1454 	}
1455 
1456 	mtr_commit(&mtr);
1457 
1458 	DBUG_RETURN(max_space_id);
1459 }
1460 
1461 /** Check each tablespace found in the data dictionary.
1462 Then look at each table defined in SYS_TABLES that has a space_id > 0
1463 to find all the file-per-table tablespaces.
1464 
1465 In a crash recovery we already have some tablespace objects created from
1466 processing the REDO log.  Any other tablespace in SYS_TABLESPACES not
1467 previously used in recovery will be opened here.  We will compare the
1468 space_id information in the data dictionary to what we find in the
1469 tablespace file. In addition, more validation will be done if recovery
1470 was needed and force_recovery is not set.
1471 
1472 We also scan the biggest space id, and store it to fil_system. */
dict_check_tablespaces_and_store_max_id()1473 void dict_check_tablespaces_and_store_max_id()
1474 {
1475 	mtr_t	mtr;
1476 
1477 	DBUG_ENTER("dict_check_tablespaces_and_store_max_id");
1478 
1479 	dict_sys_lock();
1480 
1481 	/* Initialize the max space_id from sys header */
1482 	mtr.start();
1483 	ulint max_space_id = mach_read_from_4(DICT_HDR_MAX_SPACE_ID
1484 					      + DICT_HDR
1485 					      + dict_hdr_get(&mtr)->frame);
1486 	mtr.commit();
1487 
1488 	fil_set_max_space_id_if_bigger(max_space_id);
1489 
1490 	/* Open all tablespaces referenced in SYS_TABLES.
1491 	This will update SYS_TABLESPACES and SYS_DATAFILES if it
1492 	finds any file-per-table tablespaces not already there. */
1493 	max_space_id = dict_check_sys_tables();
1494 	fil_set_max_space_id_if_bigger(max_space_id);
1495 
1496 	dict_sys_unlock();
1497 
1498 	DBUG_VOID_RETURN;
1499 }
1500 
1501 /** Error message for a delete-marked record in dict_load_column_low() */
1502 static const char* dict_load_column_del = "delete-marked record in SYS_COLUMN";
1503 
1504 /** Load a table column definition from a SYS_COLUMNS record to dict_table_t.
1505 @return	error message
1506 @retval	NULL on success */
1507 static
1508 const char*
dict_load_column_low(dict_table_t * table,mem_heap_t * heap,dict_col_t * column,table_id_t * table_id,const char ** col_name,const rec_t * rec,ulint * nth_v_col)1509 dict_load_column_low(
1510 	dict_table_t*	table,		/*!< in/out: table, could be NULL
1511 					if we just populate a dict_column_t
1512 					struct with information from
1513 					a SYS_COLUMNS record */
1514 	mem_heap_t*	heap,		/*!< in/out: memory heap
1515 					for temporary storage */
1516 	dict_col_t*	column,		/*!< out: dict_column_t to fill,
1517 					or NULL if table != NULL */
1518 	table_id_t*	table_id,	/*!< out: table id */
1519 	const char**	col_name,	/*!< out: column name */
1520 	const rec_t*	rec,		/*!< in: SYS_COLUMNS record */
1521 	ulint*		nth_v_col)	/*!< out: if not NULL, this
1522 					records the "n" of "nth" virtual
1523 					column */
1524 {
1525 	char*		name;
1526 	const byte*	field;
1527 	ulint		len;
1528 	ulint		mtype;
1529 	ulint		prtype;
1530 	ulint		col_len;
1531 	ulint		pos;
1532 	ulint		num_base;
1533 
1534 	ut_ad(!table == !!column);
1535 
1536 	if (rec_get_deleted_flag(rec, 0)) {
1537 		return(dict_load_column_del);
1538 	}
1539 
1540 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_COLUMNS) {
1541 		return("wrong number of columns in SYS_COLUMNS record");
1542 	}
1543 
1544 	field = rec_get_nth_field_old(
1545 		rec, DICT_FLD__SYS_COLUMNS__TABLE_ID, &len);
1546 	if (len != 8) {
1547 err_len:
1548 		return("incorrect column length in SYS_COLUMNS");
1549 	}
1550 
1551 	if (table_id) {
1552 		*table_id = mach_read_from_8(field);
1553 	} else if (table->id != mach_read_from_8(field)) {
1554 		return("SYS_COLUMNS.TABLE_ID mismatch");
1555 	}
1556 
1557 	field = rec_get_nth_field_old(
1558 		rec, DICT_FLD__SYS_COLUMNS__POS, &len);
1559 	if (len != 4) {
1560 		goto err_len;
1561 	}
1562 
1563 	pos = mach_read_from_4(field);
1564 
1565 	rec_get_nth_field_offs_old(
1566 		rec, DICT_FLD__SYS_COLUMNS__DB_TRX_ID, &len);
1567 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1568 		goto err_len;
1569 	}
1570 	rec_get_nth_field_offs_old(
1571 		rec, DICT_FLD__SYS_COLUMNS__DB_ROLL_PTR, &len);
1572 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1573 		goto err_len;
1574 	}
1575 
1576 	field = rec_get_nth_field_old(
1577 		rec, DICT_FLD__SYS_COLUMNS__NAME, &len);
1578 	if (len == 0 || len == UNIV_SQL_NULL) {
1579 		goto err_len;
1580 	}
1581 
1582 	name = mem_heap_strdupl(heap, (const char*) field, len);
1583 
1584 	if (col_name) {
1585 		*col_name = name;
1586 	}
1587 
1588 	field = rec_get_nth_field_old(
1589 		rec, DICT_FLD__SYS_COLUMNS__MTYPE, &len);
1590 	if (len != 4) {
1591 		goto err_len;
1592 	}
1593 
1594 	mtype = mach_read_from_4(field);
1595 
1596 	field = rec_get_nth_field_old(
1597 		rec, DICT_FLD__SYS_COLUMNS__PRTYPE, &len);
1598 	if (len != 4) {
1599 		goto err_len;
1600 	}
1601 	prtype = mach_read_from_4(field);
1602 
1603 	if (dtype_get_charset_coll(prtype) == 0
1604 	    && dtype_is_string_type(mtype)) {
1605 		/* The table was created with < 4.1.2. */
1606 
1607 		if (dtype_is_binary_string_type(mtype, prtype)) {
1608 			/* Use the binary collation for
1609 			string columns of binary type. */
1610 
1611 			prtype = dtype_form_prtype(
1612 				prtype,
1613 				DATA_MYSQL_BINARY_CHARSET_COLL);
1614 		} else {
1615 			/* Use the default charset for
1616 			other than binary columns. */
1617 
1618 			prtype = dtype_form_prtype(
1619 				prtype,
1620 				data_mysql_default_charset_coll);
1621 		}
1622 	}
1623 
1624 	if (table && table->n_def != pos && !(prtype & DATA_VIRTUAL)) {
1625 		return("SYS_COLUMNS.POS mismatch");
1626 	}
1627 
1628 	field = rec_get_nth_field_old(
1629 		rec, DICT_FLD__SYS_COLUMNS__LEN, &len);
1630 	if (len != 4) {
1631 		goto err_len;
1632 	}
1633 	col_len = mach_read_from_4(field);
1634 	field = rec_get_nth_field_old(
1635 		rec, DICT_FLD__SYS_COLUMNS__PREC, &len);
1636 	if (len != 4) {
1637 		goto err_len;
1638 	}
1639 	num_base = mach_read_from_4(field);
1640 
1641 	if (table) {
1642 		if (prtype & DATA_VIRTUAL) {
1643 #ifdef UNIV_DEBUG
1644 			dict_v_col_t*	vcol =
1645 #endif
1646 			dict_mem_table_add_v_col(
1647 				table, heap, name, mtype,
1648 				prtype, col_len,
1649 				dict_get_v_col_mysql_pos(pos), num_base);
1650 			ut_ad(vcol->v_pos == dict_get_v_col_pos(pos));
1651 		} else {
1652 			ut_ad(num_base == 0);
1653 			dict_mem_table_add_col(table, heap, name, mtype,
1654 					       prtype, col_len);
1655 		}
1656 	} else {
1657 		dict_mem_fill_column_struct(column, pos, mtype,
1658 					    prtype, col_len);
1659 	}
1660 
1661 	/* Report the virtual column number */
1662 	if ((prtype & DATA_VIRTUAL) && nth_v_col != NULL) {
1663 		*nth_v_col = dict_get_v_col_pos(pos);
1664 	}
1665 
1666 	return(NULL);
1667 }
1668 
1669 /** Error message for a delete-marked record in dict_load_virtual_low() */
1670 static const char* dict_load_virtual_del = "delete-marked record in SYS_VIRTUAL";
1671 
1672 /** Load a virtual column "mapping" (to base columns) information
1673 from a SYS_VIRTUAL record
1674 @param[in,out]	table		table
1675 @param[in,out]	column		mapped base column's dict_column_t
1676 @param[in,out]	table_id	table id
1677 @param[in,out]	pos		virtual column position
1678 @param[in,out]	base_pos	base column position
1679 @param[in]	rec		SYS_VIRTUAL record
1680 @return	error message
1681 @retval	NULL on success */
1682 static
1683 const char*
dict_load_virtual_low(dict_table_t * table,dict_col_t ** column,table_id_t * table_id,ulint * pos,ulint * base_pos,const rec_t * rec)1684 dict_load_virtual_low(
1685 	dict_table_t*	table,
1686 	dict_col_t**	column,
1687 	table_id_t*	table_id,
1688 	ulint*		pos,
1689 	ulint*		base_pos,
1690 	const rec_t*	rec)
1691 {
1692 	const byte*	field;
1693 	ulint		len;
1694 	ulint		base;
1695 
1696 	if (rec_get_deleted_flag(rec, 0)) {
1697 		return(dict_load_virtual_del);
1698 	}
1699 
1700 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_VIRTUAL) {
1701 		return("wrong number of columns in SYS_VIRTUAL record");
1702 	}
1703 
1704 	field = rec_get_nth_field_old(
1705 		rec, DICT_FLD__SYS_VIRTUAL__TABLE_ID, &len);
1706 	if (len != 8) {
1707 err_len:
1708 		return("incorrect column length in SYS_VIRTUAL");
1709 	}
1710 
1711 	if (table_id != NULL) {
1712 		*table_id = mach_read_from_8(field);
1713 	} else if (table->id != mach_read_from_8(field)) {
1714 		return("SYS_VIRTUAL.TABLE_ID mismatch");
1715 	}
1716 
1717 	field = rec_get_nth_field_old(
1718 		rec, DICT_FLD__SYS_VIRTUAL__POS, &len);
1719 	if (len != 4) {
1720 		goto err_len;
1721 	}
1722 
1723 	if (pos != NULL) {
1724 		*pos = mach_read_from_4(field);
1725 	}
1726 
1727 	field = rec_get_nth_field_old(
1728 		rec, DICT_FLD__SYS_VIRTUAL__BASE_POS, &len);
1729 	if (len != 4) {
1730 		goto err_len;
1731 	}
1732 
1733 	base = mach_read_from_4(field);
1734 
1735 	if (base_pos != NULL) {
1736 		*base_pos = base;
1737 	}
1738 
1739 	rec_get_nth_field_offs_old(
1740 		rec, DICT_FLD__SYS_VIRTUAL__DB_TRX_ID, &len);
1741 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1742 		goto err_len;
1743 	}
1744 
1745 	rec_get_nth_field_offs_old(
1746 		rec, DICT_FLD__SYS_VIRTUAL__DB_ROLL_PTR, &len);
1747 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1748 		goto err_len;
1749 	}
1750 
1751 	if (column != NULL) {
1752 		*column = dict_table_get_nth_col(table, base);
1753 	}
1754 
1755 	return(NULL);
1756 }
1757 
1758 /********************************************************************//**
1759 Loads definitions for table columns. */
1760 static
1761 void
dict_load_columns(dict_table_t * table,mem_heap_t * heap)1762 dict_load_columns(
1763 /*==============*/
1764 	dict_table_t*	table,	/*!< in/out: table */
1765 	mem_heap_t*	heap)	/*!< in/out: memory heap
1766 				for temporary storage */
1767 {
1768 	dict_table_t*	sys_columns;
1769 	dict_index_t*	sys_index;
1770 	btr_pcur_t	pcur;
1771 	dtuple_t*	tuple;
1772 	dfield_t*	dfield;
1773 	const rec_t*	rec;
1774 	byte*		buf;
1775 	ulint		i;
1776 	mtr_t		mtr;
1777 	ulint		n_skipped = 0;
1778 
1779 	ut_ad(mutex_own(&dict_sys.mutex));
1780 
1781 	mtr_start(&mtr);
1782 
1783 	sys_columns = dict_table_get_low("SYS_COLUMNS");
1784 	sys_index = UT_LIST_GET_FIRST(sys_columns->indexes);
1785 	ut_ad(!dict_table_is_comp(sys_columns));
1786 
1787 	ut_ad(name_of_col_is(sys_columns, sys_index,
1788 			     DICT_FLD__SYS_COLUMNS__NAME, "NAME"));
1789 	ut_ad(name_of_col_is(sys_columns, sys_index,
1790 			     DICT_FLD__SYS_COLUMNS__PREC, "PREC"));
1791 
1792 	tuple = dtuple_create(heap, 1);
1793 	dfield = dtuple_get_nth_field(tuple, 0);
1794 
1795 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
1796 	mach_write_to_8(buf, table->id);
1797 
1798 	dfield_set_data(dfield, buf, 8);
1799 	dict_index_copy_types(tuple, sys_index, 1);
1800 
1801 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
1802 				  BTR_SEARCH_LEAF, &pcur, &mtr);
1803 
1804 	ut_ad(table->n_t_cols == static_cast<ulint>(
1805 	      table->n_cols) + static_cast<ulint>(table->n_v_cols));
1806 
1807 	for (i = 0;
1808 	     i + DATA_N_SYS_COLS < table->n_t_cols + n_skipped;
1809 	     i++) {
1810 		const char*	err_msg;
1811 		const char*	name = NULL;
1812 		ulint		nth_v_col = ULINT_UNDEFINED;
1813 
1814 		rec = btr_pcur_get_rec(&pcur);
1815 
1816 		ut_a(btr_pcur_is_on_user_rec(&pcur));
1817 
1818 		err_msg = dict_load_column_low(table, heap, NULL, NULL,
1819 					       &name, rec, &nth_v_col);
1820 
1821 		if (err_msg == dict_load_column_del) {
1822 			n_skipped++;
1823 			goto next_rec;
1824 		} else if (err_msg) {
1825 			ib::fatal() << err_msg;
1826 		}
1827 
1828 		/* Note: Currently we have one DOC_ID column that is
1829 		shared by all FTS indexes on a table. And only non-virtual
1830 		column can be used for FULLTEXT index */
1831 		if (innobase_strcasecmp(name,
1832 					FTS_DOC_ID_COL_NAME) == 0
1833 		    && nth_v_col == ULINT_UNDEFINED) {
1834 			dict_col_t*	col;
1835 			/* As part of normal loading of tables the
1836 			table->flag is not set for tables with FTS
1837 			till after the FTS indexes are loaded. So we
1838 			create the fts_t instance here if there isn't
1839 			one already created.
1840 
1841 			This case does not arise for table create as
1842 			the flag is set before the table is created. */
1843 			if (table->fts == NULL) {
1844 				table->fts = fts_create(table);
1845 			}
1846 
1847 			ut_a(table->fts->doc_col == ULINT_UNDEFINED);
1848 
1849 			col = dict_table_get_nth_col(table, i - n_skipped);
1850 
1851 			ut_ad(col->len == sizeof(doc_id_t));
1852 
1853 			if (col->prtype & DATA_FTS_DOC_ID) {
1854 				DICT_TF2_FLAG_SET(
1855 					table, DICT_TF2_FTS_HAS_DOC_ID);
1856 				DICT_TF2_FLAG_UNSET(
1857 					table, DICT_TF2_FTS_ADD_DOC_ID);
1858 			}
1859 
1860 			table->fts->doc_col = i - n_skipped;
1861 		}
1862 next_rec:
1863 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1864 	}
1865 
1866 	btr_pcur_close(&pcur);
1867 	mtr_commit(&mtr);
1868 }
1869 
1870 /** Loads SYS_VIRTUAL info for one virtual column
1871 @param[in,out]	table		table
1872 @param[in]	nth_v_col	virtual column sequence num
1873 @param[in,out]	v_col		virtual column
1874 @param[in,out]	heap		memory heap
1875 */
1876 static
1877 void
dict_load_virtual_one_col(dict_table_t * table,ulint nth_v_col,dict_v_col_t * v_col,mem_heap_t * heap)1878 dict_load_virtual_one_col(
1879 	dict_table_t*	table,
1880 	ulint		nth_v_col,
1881 	dict_v_col_t*	v_col,
1882 	mem_heap_t*	heap)
1883 {
1884 	dict_table_t*	sys_virtual;
1885 	dict_index_t*	sys_virtual_index;
1886 	btr_pcur_t	pcur;
1887 	dtuple_t*	tuple;
1888 	dfield_t*	dfield;
1889 	const rec_t*	rec;
1890 	byte*		buf;
1891 	ulint		i = 0;
1892 	mtr_t		mtr;
1893 	ulint		skipped = 0;
1894 
1895 	ut_ad(mutex_own(&dict_sys.mutex));
1896 
1897 	if (v_col->num_base == 0) {
1898 		return;
1899 	}
1900 
1901 	mtr_start(&mtr);
1902 
1903 	sys_virtual = dict_table_get_low("SYS_VIRTUAL");
1904 	sys_virtual_index = UT_LIST_GET_FIRST(sys_virtual->indexes);
1905 	ut_ad(!dict_table_is_comp(sys_virtual));
1906 
1907 	ut_ad(name_of_col_is(sys_virtual, sys_virtual_index,
1908 			     DICT_FLD__SYS_VIRTUAL__POS, "POS"));
1909 
1910 	tuple = dtuple_create(heap, 2);
1911 
1912 	/* table ID field */
1913 	dfield = dtuple_get_nth_field(tuple, 0);
1914 
1915 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
1916 	mach_write_to_8(buf, table->id);
1917 
1918 	dfield_set_data(dfield, buf, 8);
1919 
1920 	/* virtual column pos field */
1921 	dfield = dtuple_get_nth_field(tuple, 1);
1922 
1923 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1924 	ulint	vcol_pos = dict_create_v_col_pos(nth_v_col, v_col->m_col.ind);
1925 	mach_write_to_4(buf, vcol_pos);
1926 
1927 	dfield_set_data(dfield, buf, 4);
1928 
1929 	dict_index_copy_types(tuple, sys_virtual_index, 2);
1930 
1931 	btr_pcur_open_on_user_rec(sys_virtual_index, tuple, PAGE_CUR_GE,
1932 				  BTR_SEARCH_LEAF, &pcur, &mtr);
1933 
1934 	for (i = 0; i < unsigned{v_col->num_base} + skipped; i++) {
1935 		const char*	err_msg;
1936 		ulint		pos;
1937 
1938 		ut_ad(btr_pcur_is_on_user_rec(&pcur));
1939 
1940 		rec = btr_pcur_get_rec(&pcur);
1941 
1942 		ut_a(btr_pcur_is_on_user_rec(&pcur));
1943 
1944 		err_msg = dict_load_virtual_low(table,
1945 						&v_col->base_col[i - skipped],
1946 						NULL,
1947 					        &pos, NULL, rec);
1948 
1949 		if (err_msg) {
1950 			if (err_msg != dict_load_virtual_del) {
1951 				ib::fatal() << err_msg;
1952 			} else {
1953 				skipped++;
1954 			}
1955 		} else {
1956 			ut_ad(pos == vcol_pos);
1957 		}
1958 
1959 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1960 	}
1961 
1962 	btr_pcur_close(&pcur);
1963 	mtr_commit(&mtr);
1964 }
1965 
1966 /** Loads info from SYS_VIRTUAL for virtual columns.
1967 @param[in,out]	table	table
1968 @param[in]	heap	memory heap
1969 */
1970 static
1971 void
dict_load_virtual(dict_table_t * table,mem_heap_t * heap)1972 dict_load_virtual(
1973 	dict_table_t*	table,
1974 	mem_heap_t*	heap)
1975 {
1976 	for (ulint i = 0; i < table->n_v_cols; i++) {
1977 		dict_v_col_t*	v_col = dict_table_get_nth_v_col(table, i);
1978 
1979 		dict_load_virtual_one_col(table, i, v_col, heap);
1980 	}
1981 }
1982 
1983 /** Error message for a delete-marked record in dict_load_field_low() */
1984 static const char* dict_load_field_del = "delete-marked record in SYS_FIELDS";
1985 
1986 /** Load an index field definition from a SYS_FIELDS record to dict_index_t.
1987 @return	error message
1988 @retval	NULL on success */
1989 static
1990 const char*
dict_load_field_low(byte * index_id,dict_index_t * index,dict_field_t * sys_field,ulint * pos,byte * last_index_id,mem_heap_t * heap,const rec_t * rec)1991 dict_load_field_low(
1992 	byte*		index_id,	/*!< in/out: index id (8 bytes)
1993 					an "in" value if index != NULL
1994 					and "out" if index == NULL */
1995 	dict_index_t*	index,		/*!< in/out: index, could be NULL
1996 					if we just populate a dict_field_t
1997 					struct with information from
1998 					a SYS_FIELDS record */
1999 	dict_field_t*	sys_field,	/*!< out: dict_field_t to be
2000 					filled */
2001 	ulint*		pos,		/*!< out: Field position */
2002 	byte*		last_index_id,	/*!< in: last index id */
2003 	mem_heap_t*	heap,		/*!< in/out: memory heap
2004 					for temporary storage */
2005 	const rec_t*	rec)		/*!< in: SYS_FIELDS record */
2006 {
2007 	const byte*	field;
2008 	ulint		len;
2009 	unsigned	pos_and_prefix_len;
2010 	unsigned	prefix_len;
2011 	bool		first_field;
2012 	ulint		position;
2013 
2014 	/* Either index or sys_field is supplied, not both */
2015 	ut_a((!index) || (!sys_field));
2016 
2017 	if (rec_get_deleted_flag(rec, 0)) {
2018 		return(dict_load_field_del);
2019 	}
2020 
2021 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FIELDS) {
2022 		return("wrong number of columns in SYS_FIELDS record");
2023 	}
2024 
2025 	field = rec_get_nth_field_old(
2026 		rec, DICT_FLD__SYS_FIELDS__INDEX_ID, &len);
2027 	if (len != 8) {
2028 err_len:
2029 		return("incorrect column length in SYS_FIELDS");
2030 	}
2031 
2032 	if (!index) {
2033 		ut_a(last_index_id);
2034 		memcpy(index_id, (const char*) field, 8);
2035 		first_field = memcmp(index_id, last_index_id, 8);
2036 	} else {
2037 		first_field = (index->n_def == 0);
2038 		if (memcmp(field, index_id, 8)) {
2039 			return("SYS_FIELDS.INDEX_ID mismatch");
2040 		}
2041 	}
2042 
2043 	/* The next field stores the field position in the index and a
2044 	possible column prefix length if the index field does not
2045 	contain the whole column. The storage format is like this: if
2046 	there is at least one prefix field in the index, then the HIGH
2047 	2 bytes contain the field number (index->n_def) and the low 2
2048 	bytes the prefix length for the field. Otherwise the field
2049 	number (index->n_def) is contained in the 2 LOW bytes. */
2050 
2051 	field = rec_get_nth_field_old(
2052 		rec, DICT_FLD__SYS_FIELDS__POS, &len);
2053 	if (len != 4) {
2054 		goto err_len;
2055 	}
2056 
2057 	pos_and_prefix_len = mach_read_from_4(field);
2058 
2059 	if (index && UNIV_UNLIKELY
2060 	    ((pos_and_prefix_len & 0xFFFFUL) != index->n_def
2061 	     && (pos_and_prefix_len >> 16 & 0xFFFF) != index->n_def)) {
2062 		return("SYS_FIELDS.POS mismatch");
2063 	}
2064 
2065 	if (first_field || pos_and_prefix_len > 0xFFFFUL) {
2066 		prefix_len = pos_and_prefix_len & 0xFFFFUL;
2067 		position = (pos_and_prefix_len & 0xFFFF0000UL)  >> 16;
2068 	} else {
2069 		prefix_len = 0;
2070 		position = pos_and_prefix_len & 0xFFFFUL;
2071 	}
2072 
2073 	rec_get_nth_field_offs_old(
2074 		rec, DICT_FLD__SYS_FIELDS__DB_TRX_ID, &len);
2075 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2076 		goto err_len;
2077 	}
2078 	rec_get_nth_field_offs_old(
2079 		rec, DICT_FLD__SYS_FIELDS__DB_ROLL_PTR, &len);
2080 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2081 		goto err_len;
2082 	}
2083 
2084 	field = rec_get_nth_field_old(
2085 		rec, DICT_FLD__SYS_FIELDS__COL_NAME, &len);
2086 	if (len == 0 || len == UNIV_SQL_NULL) {
2087 		goto err_len;
2088 	}
2089 
2090 	if (index) {
2091 		dict_mem_index_add_field(
2092 			index, mem_heap_strdupl(heap, (const char*) field, len),
2093 			prefix_len);
2094 	} else {
2095 		ut_a(sys_field);
2096 		ut_a(pos);
2097 
2098 		sys_field->name = mem_heap_strdupl(
2099 			heap, (const char*) field, len);
2100 		sys_field->prefix_len = prefix_len & ((1U << 12) - 1);
2101 		*pos = position;
2102 	}
2103 
2104 	return(NULL);
2105 }
2106 
2107 /********************************************************************//**
2108 Loads definitions for index fields.
2109 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption */
2110 static
2111 ulint
dict_load_fields(dict_index_t * index,mem_heap_t * heap)2112 dict_load_fields(
2113 /*=============*/
2114 	dict_index_t*	index,	/*!< in/out: index whose fields to load */
2115 	mem_heap_t*	heap)	/*!< in: memory heap for temporary storage */
2116 {
2117 	dict_table_t*	sys_fields;
2118 	dict_index_t*	sys_index;
2119 	btr_pcur_t	pcur;
2120 	dtuple_t*	tuple;
2121 	dfield_t*	dfield;
2122 	const rec_t*	rec;
2123 	byte*		buf;
2124 	ulint		i;
2125 	mtr_t		mtr;
2126 	dberr_t		error;
2127 
2128 	ut_ad(mutex_own(&dict_sys.mutex));
2129 
2130 	mtr_start(&mtr);
2131 
2132 	sys_fields = dict_table_get_low("SYS_FIELDS");
2133 	sys_index = UT_LIST_GET_FIRST(sys_fields->indexes);
2134 	ut_ad(!dict_table_is_comp(sys_fields));
2135 	ut_ad(name_of_col_is(sys_fields, sys_index,
2136 			     DICT_FLD__SYS_FIELDS__COL_NAME, "COL_NAME"));
2137 
2138 	tuple = dtuple_create(heap, 1);
2139 	dfield = dtuple_get_nth_field(tuple, 0);
2140 
2141 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2142 	mach_write_to_8(buf, index->id);
2143 
2144 	dfield_set_data(dfield, buf, 8);
2145 	dict_index_copy_types(tuple, sys_index, 1);
2146 
2147 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2148 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2149 	for (i = 0; i < index->n_fields; i++) {
2150 		const char* err_msg;
2151 
2152 		rec = btr_pcur_get_rec(&pcur);
2153 
2154 		ut_a(btr_pcur_is_on_user_rec(&pcur));
2155 
2156 		err_msg = dict_load_field_low(buf, index, NULL, NULL, NULL,
2157 					      heap, rec);
2158 
2159 		if (err_msg == dict_load_field_del) {
2160 			/* There could be delete marked records in
2161 			SYS_FIELDS because SYS_FIELDS.INDEX_ID can be
2162 			updated by ALTER TABLE ADD INDEX. */
2163 
2164 			goto next_rec;
2165 		} else if (err_msg) {
2166 			ib::error() << err_msg;
2167 			error = DB_CORRUPTION;
2168 			goto func_exit;
2169 		}
2170 next_rec:
2171 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2172 	}
2173 
2174 	error = DB_SUCCESS;
2175 func_exit:
2176 	btr_pcur_close(&pcur);
2177 	mtr_commit(&mtr);
2178 	return(error);
2179 }
2180 
2181 /** Error message for a delete-marked record in dict_load_index_low() */
2182 static const char* dict_load_index_del = "delete-marked record in SYS_INDEXES";
2183 /** Error message for table->id mismatch in dict_load_index_low() */
2184 static const char* dict_load_index_id_err = "SYS_INDEXES.TABLE_ID mismatch";
2185 /** Error message for SYS_TABLES flags mismatch in dict_load_table_low() */
2186 static const char* dict_load_table_flags = "incorrect flags in SYS_TABLES";
2187 
2188 /** Load an index definition from a SYS_INDEXES record to dict_index_t.
2189 If allocate=TRUE, we will create a dict_index_t structure and fill it
2190 accordingly. If allocated=FALSE, the dict_index_t will be supplied by
2191 the caller and filled with information read from the record.
2192 @return	error message
2193 @retval	NULL on success */
2194 static
2195 const char*
dict_load_index_low(byte * table_id,mem_heap_t * heap,const rec_t * rec,ibool allocate,dict_index_t ** index)2196 dict_load_index_low(
2197 	byte*		table_id,	/*!< in/out: table id (8 bytes),
2198 					an "in" value if allocate=TRUE
2199 					and "out" when allocate=FALSE */
2200 	mem_heap_t*	heap,		/*!< in/out: temporary memory heap */
2201 	const rec_t*	rec,		/*!< in: SYS_INDEXES record */
2202 	ibool		allocate,	/*!< in: TRUE=allocate *index,
2203 					FALSE=fill in a pre-allocated
2204 					*index */
2205 	dict_index_t**	index)		/*!< out,own: index, or NULL */
2206 {
2207 	const byte*	field;
2208 	ulint		len;
2209 	ulint		name_len;
2210 	char*		name_buf;
2211 	index_id_t	id;
2212 	ulint		n_fields;
2213 	ulint		type;
2214 	unsigned	merge_threshold;
2215 
2216 	if (allocate) {
2217 		/* If allocate=TRUE, no dict_index_t will
2218 		be supplied. Initialize "*index" to NULL */
2219 		*index = NULL;
2220 	}
2221 
2222 	if (rec_get_deleted_flag(rec, 0)) {
2223 		return(dict_load_index_del);
2224 	}
2225 
2226 	if (rec_get_n_fields_old(rec) == DICT_NUM_FIELDS__SYS_INDEXES) {
2227 		/* MERGE_THRESHOLD exists */
2228 		field = rec_get_nth_field_old(
2229 			rec, DICT_FLD__SYS_INDEXES__MERGE_THRESHOLD, &len);
2230 		switch (len) {
2231 		case 4:
2232 			merge_threshold = mach_read_from_4(field);
2233 			break;
2234 		case UNIV_SQL_NULL:
2235 			merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2236 			break;
2237 		default:
2238 			return("incorrect MERGE_THRESHOLD length"
2239 			       " in SYS_INDEXES");
2240 		}
2241 	} else if (rec_get_n_fields_old(rec)
2242 		   == DICT_NUM_FIELDS__SYS_INDEXES - 1) {
2243 		/* MERGE_THRESHOLD doesn't exist */
2244 
2245 		merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2246 	} else {
2247 		return("wrong number of columns in SYS_INDEXES record");
2248 	}
2249 
2250 	field = rec_get_nth_field_old(
2251 		rec, DICT_FLD__SYS_INDEXES__TABLE_ID, &len);
2252 	if (len != 8) {
2253 err_len:
2254 		return("incorrect column length in SYS_INDEXES");
2255 	}
2256 
2257 	if (!allocate) {
2258 		/* We are reading a SYS_INDEXES record. Copy the table_id */
2259 		memcpy(table_id, (const char*) field, 8);
2260 	} else if (memcmp(field, table_id, 8)) {
2261 		/* Caller supplied table_id, verify it is the same
2262 		id as on the index record */
2263 		return(dict_load_index_id_err);
2264 	}
2265 
2266 	field = rec_get_nth_field_old(
2267 		rec, DICT_FLD__SYS_INDEXES__ID, &len);
2268 	if (len != 8) {
2269 		goto err_len;
2270 	}
2271 
2272 	id = mach_read_from_8(field);
2273 
2274 	rec_get_nth_field_offs_old(
2275 		rec, DICT_FLD__SYS_INDEXES__DB_TRX_ID, &len);
2276 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2277 		goto err_len;
2278 	}
2279 	rec_get_nth_field_offs_old(
2280 		rec, DICT_FLD__SYS_INDEXES__DB_ROLL_PTR, &len);
2281 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2282 		goto err_len;
2283 	}
2284 
2285 	field = rec_get_nth_field_old(
2286 		rec, DICT_FLD__SYS_INDEXES__NAME, &name_len);
2287 	if (name_len == UNIV_SQL_NULL) {
2288 		goto err_len;
2289 	}
2290 
2291 	name_buf = mem_heap_strdupl(heap, (const char*) field,
2292 				    name_len);
2293 
2294 	field = rec_get_nth_field_old(
2295 		rec, DICT_FLD__SYS_INDEXES__N_FIELDS, &len);
2296 	if (len != 4) {
2297 		goto err_len;
2298 	}
2299 	n_fields = mach_read_from_4(field);
2300 
2301 	field = rec_get_nth_field_old(
2302 		rec, DICT_FLD__SYS_INDEXES__TYPE, &len);
2303 	if (len != 4) {
2304 		goto err_len;
2305 	}
2306 	type = mach_read_from_4(field);
2307 	if (type & (~0U << DICT_IT_BITS)) {
2308 		return("unknown SYS_INDEXES.TYPE bits");
2309 	}
2310 
2311 	field = rec_get_nth_field_old(
2312 		rec, DICT_FLD__SYS_INDEXES__PAGE_NO, &len);
2313 	if (len != 4) {
2314 		goto err_len;
2315 	}
2316 
2317 	if (allocate) {
2318 		*index = dict_mem_index_create(NULL, name_buf, type, n_fields);
2319 	} else {
2320 		ut_a(*index);
2321 
2322 		dict_mem_fill_index_struct(*index, NULL, name_buf,
2323 					   type, n_fields);
2324 	}
2325 
2326 	(*index)->id = id;
2327 	(*index)->page = mach_read_from_4(field);
2328 	ut_ad((*index)->page);
2329 	(*index)->merge_threshold = merge_threshold & ((1U << 6) - 1);
2330 
2331 	return(NULL);
2332 }
2333 
2334 /********************************************************************//**
2335 Loads definitions for table indexes. Adds them to the data dictionary
2336 cache.
2337 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption of dictionary
2338 table or DB_UNSUPPORTED if table has unknown index type */
2339 static MY_ATTRIBUTE((nonnull))
2340 dberr_t
dict_load_indexes(dict_table_t * table,mem_heap_t * heap,dict_err_ignore_t ignore_err)2341 dict_load_indexes(
2342 /*==============*/
2343 	dict_table_t*	table,	/*!< in/out: table */
2344 	mem_heap_t*	heap,	/*!< in: memory heap for temporary storage */
2345 	dict_err_ignore_t ignore_err)
2346 				/*!< in: error to be ignored when
2347 				loading the index definition */
2348 {
2349 	dict_table_t*	sys_indexes;
2350 	dict_index_t*	sys_index;
2351 	btr_pcur_t	pcur;
2352 	dtuple_t*	tuple;
2353 	dfield_t*	dfield;
2354 	const rec_t*	rec;
2355 	byte*		buf;
2356 	mtr_t		mtr;
2357 	dberr_t		error = DB_SUCCESS;
2358 
2359 	ut_ad(mutex_own(&dict_sys.mutex));
2360 
2361 	mtr_start(&mtr);
2362 
2363 	sys_indexes = dict_table_get_low("SYS_INDEXES");
2364 	sys_index = UT_LIST_GET_FIRST(sys_indexes->indexes);
2365 	ut_ad(!dict_table_is_comp(sys_indexes));
2366 	ut_ad(name_of_col_is(sys_indexes, sys_index,
2367 			     DICT_FLD__SYS_INDEXES__NAME, "NAME"));
2368 	ut_ad(name_of_col_is(sys_indexes, sys_index,
2369 			     DICT_FLD__SYS_INDEXES__PAGE_NO, "PAGE_NO"));
2370 
2371 	tuple = dtuple_create(heap, 1);
2372 	dfield = dtuple_get_nth_field(tuple, 0);
2373 
2374 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2375 	mach_write_to_8(buf, table->id);
2376 
2377 	dfield_set_data(dfield, buf, 8);
2378 	dict_index_copy_types(tuple, sys_index, 1);
2379 
2380 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2381 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2382 	for (;;) {
2383 		dict_index_t*	index = NULL;
2384 		const char*	err_msg;
2385 
2386 		if (!btr_pcur_is_on_user_rec(&pcur)) {
2387 
2388 			/* We should allow the table to open even
2389 			without index when DICT_ERR_IGNORE_CORRUPT is set.
2390 			DICT_ERR_IGNORE_CORRUPT is currently only set
2391 			for drop table */
2392 			if (dict_table_get_first_index(table) == NULL
2393 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2394 				ib::warn() << "Cannot load table "
2395 					<< table->name
2396 					<< " because it has no indexes in"
2397 					" InnoDB internal data dictionary.";
2398 				error = DB_CORRUPTION;
2399 				goto func_exit;
2400 			}
2401 
2402 			break;
2403 		}
2404 
2405 		rec = btr_pcur_get_rec(&pcur);
2406 
2407 		if ((ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2408 		    && (rec_get_n_fields_old(rec)
2409 			== DICT_NUM_FIELDS__SYS_INDEXES
2410 			/* a record for older SYS_INDEXES table
2411 			(missing merge_threshold column) is acceptable. */
2412 			|| rec_get_n_fields_old(rec)
2413 			   == DICT_NUM_FIELDS__SYS_INDEXES - 1)) {
2414 			const byte*	field;
2415 			ulint		len;
2416 			field = rec_get_nth_field_old(
2417 				rec, DICT_FLD__SYS_INDEXES__NAME, &len);
2418 
2419 			if (len != UNIV_SQL_NULL
2420 			    && static_cast<char>(*field)
2421 			    == static_cast<char>(*TEMP_INDEX_PREFIX_STR)) {
2422 				/* Skip indexes whose name starts with
2423 				TEMP_INDEX_PREFIX_STR, because they will
2424 				be dropped by row_merge_drop_temp_indexes()
2425 				during crash recovery. */
2426 				goto next_rec;
2427 			}
2428 		}
2429 
2430 		err_msg = dict_load_index_low(buf, heap, rec, TRUE, &index);
2431 		ut_ad((index == NULL && err_msg != NULL)
2432 		      || (index != NULL && err_msg == NULL));
2433 
2434 		if (err_msg == dict_load_index_id_err) {
2435 			/* TABLE_ID mismatch means that we have
2436 			run out of index definitions for the table. */
2437 
2438 			if (dict_table_get_first_index(table) == NULL
2439 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2440 
2441 				ib::warn() << "Failed to load the"
2442 					" clustered index for table "
2443 					<< table->name
2444 					<< " because of the following error: "
2445 					<< err_msg << "."
2446 					" Refusing to load the rest of the"
2447 					" indexes (if any) and the whole table"
2448 					" altogether.";
2449 				error = DB_CORRUPTION;
2450 				goto func_exit;
2451 			}
2452 
2453 			break;
2454 		} else if (err_msg == dict_load_index_del) {
2455 			/* Skip delete-marked records. */
2456 			goto next_rec;
2457 		} else if (err_msg) {
2458 			ib::error() << err_msg;
2459 			if (ignore_err & DICT_ERR_IGNORE_CORRUPT) {
2460 				goto next_rec;
2461 			}
2462 			error = DB_CORRUPTION;
2463 			goto func_exit;
2464 		}
2465 
2466 		ut_ad(index);
2467 		ut_ad(!dict_index_is_online_ddl(index));
2468 
2469 		/* Check whether the index is corrupted */
2470 		if (index->is_corrupted()) {
2471 			ib::error() << "Index " << index->name
2472 				<< " of table " << table->name
2473 				<< " is corrupted";
2474 
2475 			if (!srv_load_corrupted
2476 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)
2477 			    && dict_index_is_clust(index)) {
2478 				dict_mem_index_free(index);
2479 
2480 				error = DB_INDEX_CORRUPT;
2481 				goto func_exit;
2482 			} else {
2483 				/* We will load the index if
2484 				1) srv_load_corrupted is TRUE
2485 				2) ignore_err is set with
2486 				DICT_ERR_IGNORE_CORRUPT
2487 				3) if the index corrupted is a secondary
2488 				index */
2489 				ib::info() << "Load corrupted index "
2490 					<< index->name
2491 					<< " of table " << table->name;
2492 			}
2493 		}
2494 
2495 		if (index->type & DICT_FTS
2496 		    && !dict_table_has_fts_index(table)) {
2497 			/* This should have been created by now. */
2498 			ut_a(table->fts != NULL);
2499 			DICT_TF2_FLAG_SET(table, DICT_TF2_FTS);
2500 		}
2501 
2502 		/* We check for unsupported types first, so that the
2503 		subsequent checks are relevant for the supported types. */
2504 		if (index->type & ~(DICT_CLUSTERED | DICT_UNIQUE
2505 				    | DICT_CORRUPT | DICT_FTS
2506 				    | DICT_SPATIAL | DICT_VIRTUAL)) {
2507 
2508 			ib::error() << "Unknown type " << index->type
2509 				<< " of index " << index->name
2510 				<< " of table " << table->name;
2511 
2512 			error = DB_UNSUPPORTED;
2513 			dict_mem_index_free(index);
2514 			goto func_exit;
2515 		} else if (index->page == FIL_NULL
2516 			   && table->is_readable()
2517 			   && (!(index->type & DICT_FTS))) {
2518 
2519 			ib::error() << "Trying to load index " << index->name
2520 				<< " for table " << table->name
2521 				<< ", but the index tree has been freed!";
2522 
2523 			if (ignore_err & DICT_ERR_IGNORE_INDEX_ROOT) {
2524 				/* If caller can tolerate this error,
2525 				we will continue to load the index and
2526 				let caller deal with this error. However
2527 				mark the index and table corrupted. We
2528 				only need to mark such in the index
2529 				dictionary cache for such metadata corruption,
2530 				since we would always be able to set it
2531 				when loading the dictionary cache */
2532 				index->table = table;
2533 				dict_set_corrupted_index_cache_only(index);
2534 
2535 				ib::info() << "Index is corrupt but forcing"
2536 					" load into data dictionary";
2537 			} else {
2538 corrupted:
2539 				dict_mem_index_free(index);
2540 				error = DB_CORRUPTION;
2541 				goto func_exit;
2542 			}
2543 		} else if (!dict_index_is_clust(index)
2544 			   && NULL == dict_table_get_first_index(table)) {
2545 
2546 			ib::error() << "Trying to load index " << index->name
2547 				<< " for table " << table->name
2548 				<< ", but the first index is not clustered!";
2549 
2550 			goto corrupted;
2551 		} else if (dict_is_sys_table(table->id)
2552 			   && (dict_index_is_clust(index)
2553 			       || ((table == dict_sys.sys_tables)
2554 				   && !strcmp("ID_IND", index->name)))) {
2555 
2556 			/* The index was created in memory already at booting
2557 			of the database server */
2558 			dict_mem_index_free(index);
2559 		} else {
2560 			dict_load_fields(index, heap);
2561 			index->table = table;
2562 
2563 			/* The data dictionary tables should never contain
2564 			invalid index definitions.  If we ignored this error
2565 			and simply did not load this index definition, the
2566 			.frm file would disagree with the index definitions
2567 			inside InnoDB. */
2568 			if ((error = dict_index_add_to_cache(index,
2569 							     index->page))
2570 			    != DB_SUCCESS) {
2571 				goto func_exit;
2572 			}
2573 
2574 #ifdef UNIV_DEBUG
2575 			// The following assertion doesn't hold for FTS indexes
2576 			// as it may have prefix_len=1 with any charset
2577 			if (index->type != DICT_FTS) {
2578 				for (uint i = 0; i < index->n_fields; i++) {
2579 					dict_field_t &f = index->fields[i];
2580 					ut_ad(f.col->mbmaxlen == 0
2581 					      || f.prefix_len
2582 					      % f.col->mbmaxlen == 0);
2583 				}
2584 			}
2585 #endif /* UNIV_DEBUG */
2586 		}
2587 next_rec:
2588 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2589 	}
2590 
2591 	ut_ad(table->fts_doc_id_index == NULL);
2592 
2593 	if (table->fts != NULL) {
2594 		table->fts_doc_id_index = dict_table_get_index_on_name(
2595 			table, FTS_DOC_ID_INDEX_NAME);
2596 	}
2597 
2598 	/* If the table contains FTS indexes, populate table->fts->indexes */
2599 	if (dict_table_has_fts_index(table)) {
2600 		ut_ad(table->fts_doc_id_index != NULL);
2601 		/* table->fts->indexes should have been created. */
2602 		ut_a(table->fts->indexes != NULL);
2603 		dict_table_get_all_fts_indexes(table, table->fts->indexes);
2604 	}
2605 
2606 func_exit:
2607 	btr_pcur_close(&pcur);
2608 	mtr_commit(&mtr);
2609 
2610 	return(error);
2611 }
2612 
2613 /** Load a table definition from a SYS_TABLES record to dict_table_t.
2614 Do not load any columns or indexes.
2615 @param[in]	name		Table name
2616 @param[in]	rec		SYS_TABLES record
2617 @param[out,own]	table		table, or NULL
2618 @return	error message
2619 @retval	NULL on success */
dict_load_table_low(const table_name_t & name,const rec_t * rec,dict_table_t ** table)2620 static const char* dict_load_table_low(const table_name_t& name,
2621 				       const rec_t* rec, dict_table_t** table)
2622 {
2623 	table_id_t	table_id;
2624 	ulint		space_id;
2625 	ulint		n_cols;
2626 	ulint		t_num;
2627 	ulint		flags;
2628 	ulint		flags2;
2629 	ulint		n_v_col;
2630 
2631 	if (const char* error_text = dict_sys_tables_rec_check(rec)) {
2632 		*table = NULL;
2633 		return(error_text);
2634 	}
2635 
2636 	if (!dict_sys_tables_rec_read(rec, name, &table_id, &space_id,
2637 				      &t_num, &flags, &flags2)) {
2638 		*table = NULL;
2639 		return(dict_load_table_flags);
2640 	}
2641 
2642 	dict_table_decode_n_col(t_num, &n_cols, &n_v_col);
2643 
2644 	*table = dict_mem_table_create(
2645 		name.m_name, NULL, n_cols + n_v_col, n_v_col, flags, flags2);
2646 	(*table)->space_id = space_id;
2647 	(*table)->id = table_id;
2648 	(*table)->file_unreadable = !!(flags2 & DICT_TF2_DISCARDED);
2649 
2650 	return(NULL);
2651 }
2652 
2653 /********************************************************************//**
2654 Using the table->heap, copy the null-terminated filepath into
2655 table->data_dir_path and replace the 'databasename/tablename.ibd'
2656 portion with 'tablename'.
2657 This allows SHOW CREATE TABLE to return the correct DATA DIRECTORY path.
2658 Make this data directory path only if it has not yet been saved. */
2659 static
2660 void
dict_save_data_dir_path(dict_table_t * table,const char * filepath)2661 dict_save_data_dir_path(
2662 /*====================*/
2663 	dict_table_t*	table,		/*!< in/out: table */
2664 	const char*	filepath)	/*!< in: filepath of tablespace */
2665 {
2666 	ut_ad(mutex_own(&dict_sys.mutex));
2667 	ut_a(DICT_TF_HAS_DATA_DIR(table->flags));
2668 
2669 	ut_a(!table->data_dir_path);
2670 	ut_a(filepath);
2671 
2672 	/* Be sure this filepath is not the default filepath. */
2673 	char*	default_filepath = fil_make_filepath(
2674 			NULL, table->name.m_name, IBD, false);
2675 	if (default_filepath) {
2676 		if (0 != strcmp(filepath, default_filepath)) {
2677 			ulint pathlen = strlen(filepath);
2678 			ut_a(pathlen < OS_FILE_MAX_PATH);
2679 			ut_a(0 == strcmp(filepath + pathlen - 4, DOT_IBD));
2680 
2681 			table->data_dir_path = mem_heap_strdup(
2682 				table->heap, filepath);
2683 			os_file_make_data_dir_path(table->data_dir_path);
2684 		}
2685 
2686 		ut_free(default_filepath);
2687 	}
2688 }
2689 
2690 /** Make sure the data_dir_path is saved in dict_table_t if DATA DIRECTORY
2691 was used. Try to read it from the fil_system first, then from SYS_DATAFILES.
2692 @param[in]	table		Table object
2693 @param[in]	dict_mutex_own	true if dict_sys.mutex is owned already */
2694 void
dict_get_and_save_data_dir_path(dict_table_t * table,bool dict_mutex_own)2695 dict_get_and_save_data_dir_path(
2696 	dict_table_t*	table,
2697 	bool		dict_mutex_own)
2698 {
2699 	ut_ad(!table->is_temporary());
2700 	ut_ad(!table->space || table->space->id == table->space_id);
2701 
2702 	if (!table->data_dir_path && table->space_id && table->space) {
2703 		if (!dict_mutex_own) {
2704 			dict_mutex_enter_for_mysql();
2705 		}
2706 
2707 		table->flags |= 1 << DICT_TF_POS_DATA_DIR
2708 			& ((1U << DICT_TF_BITS) - 1);
2709 		dict_save_data_dir_path(table,
2710 					table->space->chain.start->name);
2711 
2712 		if (table->data_dir_path == NULL) {
2713 			/* Since we did not set the table data_dir_path,
2714 			unset the flag.  This does not change SYS_DATAFILES
2715 			or SYS_TABLES or FSP_SPACE_FLAGS on the header page
2716 			of the tablespace, but it makes dict_table_t
2717 			consistent. */
2718 			table->flags &= ~DICT_TF_MASK_DATA_DIR
2719 				& ((1U << DICT_TF_BITS) - 1);
2720 		}
2721 
2722 		if (!dict_mutex_own) {
2723 			dict_mutex_exit_for_mysql();
2724 		}
2725 	}
2726 }
2727 
2728 /** Loads a table definition and also all its index definitions, and also
2729 the cluster definition if the table is a member in a cluster. Also loads
2730 all foreign key constraints where the foreign key is in the table or where
2731 a foreign key references columns in this table.
2732 @param[in]	name		Table name in the dbname/tablename format
2733 @param[in]	ignore_err	Error to be ignored when loading
2734 				table and its index definition
2735 @return table, NULL if does not exist; if the table is stored in an
2736 .ibd file, but the file does not exist, then we set the file_unreadable
2737 flag in the table object we return. */
dict_load_table(const char * name,dict_err_ignore_t ignore_err)2738 dict_table_t* dict_load_table(const char* name, dict_err_ignore_t ignore_err)
2739 {
2740 	dict_names_t			fk_list;
2741 	dict_table_t*			result;
2742 	dict_names_t::iterator		i;
2743 
2744 	DBUG_ENTER("dict_load_table");
2745 	DBUG_PRINT("dict_load_table", ("loading table: '%s'", name));
2746 
2747 	ut_ad(mutex_own(&dict_sys.mutex));
2748 
2749 	result = dict_table_check_if_in_cache_low(name);
2750 
2751 	if (!result) {
2752 		result = dict_load_table_one(const_cast<char*>(name),
2753 					     ignore_err, fk_list);
2754 		while (!fk_list.empty()) {
2755 			if (!dict_table_check_if_in_cache_low(fk_list.front()))
2756 				dict_load_table_one(
2757 					const_cast<char*>(fk_list.front()),
2758 					ignore_err, fk_list);
2759 			fk_list.pop_front();
2760 		}
2761 	}
2762 
2763 	DBUG_RETURN(result);
2764 }
2765 
2766 /** Opens a tablespace for dict_load_table_one()
2767 @param[in,out]	table		A table that refers to the tablespace to open
2768 @param[in]	ignore_err	Whether to ignore an error. */
2769 UNIV_INLINE
2770 void
dict_load_tablespace(dict_table_t * table,dict_err_ignore_t ignore_err)2771 dict_load_tablespace(
2772 	dict_table_t*		table,
2773 	dict_err_ignore_t	ignore_err)
2774 {
2775 	ut_ad(!table->is_temporary());
2776 	ut_ad(!table->space);
2777 	ut_ad(table->space_id < SRV_SPACE_ID_UPPER_BOUND);
2778 	ut_ad(fil_system.sys_space);
2779 
2780 	if (table->space_id == TRX_SYS_SPACE) {
2781 		table->space = fil_system.sys_space;
2782 		return;
2783 	}
2784 
2785 	if (table->flags2 & DICT_TF2_DISCARDED) {
2786 		ib::warn() << "Tablespace for table " << table->name
2787 			<< " is set as discarded.";
2788 		table->file_unreadable = true;
2789 		return;
2790 	}
2791 
2792 	/* The tablespace may already be open. */
2793 	table->space = fil_space_for_table_exists_in_mem(
2794 		table->space_id, table->name.m_name, table->flags);
2795 	if (table->space) {
2796 		return;
2797 	}
2798 
2799 	if (ignore_err == DICT_ERR_IGNORE_DROP) {
2800 		table->file_unreadable = true;
2801 		return;
2802 	}
2803 
2804 	if (!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)) {
2805 		ib::error() << "Failed to find tablespace for table "
2806 			<< table->name << " in the cache. Attempting"
2807 			" to load the tablespace with space id "
2808 			<< table->space_id;
2809 	}
2810 
2811 	/* Use the remote filepath if needed. This parameter is optional
2812 	in the call to fil_ibd_open(). If not supplied, it will be built
2813 	from the table->name. */
2814 	char* filepath = NULL;
2815 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
2816 		/* This will set table->data_dir_path from either
2817 		fil_system or SYS_DATAFILES */
2818 		dict_get_and_save_data_dir_path(table, true);
2819 
2820 		if (table->data_dir_path) {
2821 			filepath = fil_make_filepath(
2822 				table->data_dir_path,
2823 				table->name.m_name, IBD, true);
2824 		}
2825 	}
2826 
2827 	/* Try to open the tablespace.  We set the 2nd param (fix_dict) to
2828 	false because we do not have an x-lock on dict_sys.latch */
2829 	table->space = fil_ibd_open(
2830 		true, false, FIL_TYPE_TABLESPACE, table->space_id,
2831 		dict_tf_to_fsp_flags(table->flags),
2832 		table->name, filepath);
2833 
2834 	if (!table->space) {
2835 		/* We failed to find a sensible tablespace file */
2836 		table->file_unreadable = true;
2837 	}
2838 
2839 	ut_free(filepath);
2840 }
2841 
2842 /** Loads a table definition and also all its index definitions.
2843 
2844 Loads those foreign key constraints whose referenced table is already in
2845 dictionary cache.  If a foreign key constraint is not loaded, then the
2846 referenced table is pushed into the output stack (fk_tables), if it is not
2847 NULL.  These tables must be subsequently loaded so that all the foreign
2848 key constraints are loaded into memory.
2849 
2850 @param[in]	name		Table name in the db/tablename format
2851 @param[in]	ignore_err	Error to be ignored when loading table
2852 				and its index definition
2853 @param[out]	fk_tables	Related table names that must also be
2854 				loaded to ensure that all foreign key
2855 				constraints are loaded.
2856 @return table, NULL if does not exist; if the table is stored in an
2857 .ibd file, but the file does not exist, then we set the
2858 file_unreadable flag in the table object we return */
2859 static
2860 dict_table_t*
dict_load_table_one(const table_name_t & name,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)2861 dict_load_table_one(
2862 	const table_name_t&	name,
2863 	dict_err_ignore_t	ignore_err,
2864 	dict_names_t&		fk_tables)
2865 {
2866 	dberr_t		err;
2867 	dict_table_t*	sys_tables;
2868 	btr_pcur_t	pcur;
2869 	dict_index_t*	sys_index;
2870 	dtuple_t*	tuple;
2871 	mem_heap_t*	heap;
2872 	dfield_t*	dfield;
2873 	const rec_t*	rec;
2874 	const byte*	field;
2875 	ulint		len;
2876 	mtr_t		mtr;
2877 
2878 	DBUG_ENTER("dict_load_table_one");
2879 	DBUG_PRINT("dict_load_table_one", ("table: %s", name.m_name));
2880 
2881 	ut_ad(mutex_own(&dict_sys.mutex));
2882 
2883 	heap = mem_heap_create(32000);
2884 
2885 	mtr_start(&mtr);
2886 
2887 	sys_tables = dict_table_get_low("SYS_TABLES");
2888 	sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
2889 	ut_ad(!dict_table_is_comp(sys_tables));
2890 	ut_ad(name_of_col_is(sys_tables, sys_index,
2891 			     DICT_FLD__SYS_TABLES__ID, "ID"));
2892 	ut_ad(name_of_col_is(sys_tables, sys_index,
2893 			     DICT_FLD__SYS_TABLES__N_COLS, "N_COLS"));
2894 	ut_ad(name_of_col_is(sys_tables, sys_index,
2895 			     DICT_FLD__SYS_TABLES__TYPE, "TYPE"));
2896 	ut_ad(name_of_col_is(sys_tables, sys_index,
2897 			     DICT_FLD__SYS_TABLES__MIX_LEN, "MIX_LEN"));
2898 	ut_ad(name_of_col_is(sys_tables, sys_index,
2899 			     DICT_FLD__SYS_TABLES__SPACE, "SPACE"));
2900 
2901 	tuple = dtuple_create(heap, 1);
2902 	dfield = dtuple_get_nth_field(tuple, 0);
2903 
2904 	dfield_set_data(dfield, name.m_name, strlen(name.m_name));
2905 	dict_index_copy_types(tuple, sys_index, 1);
2906 
2907 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2908 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2909 	rec = btr_pcur_get_rec(&pcur);
2910 
2911 	if (!btr_pcur_is_on_user_rec(&pcur)
2912 	    || rec_get_deleted_flag(rec, 0)) {
2913 		/* Not found */
2914 err_exit:
2915 		btr_pcur_close(&pcur);
2916 		mtr_commit(&mtr);
2917 		mem_heap_free(heap);
2918 
2919 		DBUG_RETURN(NULL);
2920 	}
2921 
2922 	field = rec_get_nth_field_old(
2923 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
2924 
2925 	/* Check if the table name in record is the searched one */
2926 	if (len != strlen(name.m_name)
2927 	    || memcmp(name.m_name, field, len)) {
2928 
2929 		goto err_exit;
2930 	}
2931 
2932 	dict_table_t* table;
2933 	if (const char* err_msg = dict_load_table_low(name, rec, &table)) {
2934 		if (err_msg != dict_load_table_flags) {
2935 			ib::error() << err_msg;
2936 		}
2937 		goto err_exit;
2938 	}
2939 
2940 	btr_pcur_close(&pcur);
2941 	mtr_commit(&mtr);
2942 
2943 	dict_load_tablespace(table, ignore_err);
2944 
2945 	dict_load_columns(table, heap);
2946 
2947 	dict_load_virtual(table, heap);
2948 
2949 	dict_table_add_system_columns(table, heap);
2950 
2951 	table->can_be_evicted = true;
2952 	table->add_to_cache();
2953 
2954 	mem_heap_empty(heap);
2955 
2956 	ut_ad(dict_tf2_is_valid(table->flags, table->flags2));
2957 
2958 	/* If there is no tablespace for the table then we only need to
2959 	load the index definitions. So that we can IMPORT the tablespace
2960 	later. When recovering table locks for resurrected incomplete
2961 	transactions, the tablespace should exist, because DDL operations
2962 	were not allowed while the table is being locked by a transaction. */
2963 	dict_err_ignore_t index_load_err =
2964 		!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2965 		&& !table->is_readable()
2966 		? DICT_ERR_IGNORE_ALL
2967 		: ignore_err;
2968 
2969 	err = dict_load_indexes(table, heap, index_load_err);
2970 
2971 	if (err == DB_INDEX_CORRUPT) {
2972 		/* Refuse to load the table if the table has a corrupted
2973 		cluster index */
2974 		if (!srv_load_corrupted) {
2975 
2976 			ib::error() << "Load table " << table->name
2977 				<< " failed, the table has"
2978 				" corrupted clustered indexes. Turn on"
2979 				" 'innodb_force_load_corrupted' to drop it";
2980 			dict_sys.remove(table);
2981 			table = NULL;
2982 			goto func_exit;
2983 		} else {
2984 			if (table->indexes.start->is_corrupted()) {
2985 				table->corrupted = true;
2986 			}
2987 		}
2988 	}
2989 
2990 	if (err == DB_SUCCESS && table->is_readable()) {
2991 		const auto root = dict_table_get_first_index(table)->page;
2992 
2993 		if (root >= table->space->get_size()) {
2994 corrupted:
2995 			table->corrupted = true;
2996 			table->file_unreadable = true;
2997 			err = DB_CORRUPTION;
2998 		} else {
2999 			const page_id_t page_id(table->space->id, root);
3000 			mtr.start();
3001 			buf_block_t* block = buf_page_get(
3002 				page_id, table->space->zip_size(),
3003 				RW_S_LATCH, &mtr);
3004 			const bool corrupted = !block
3005 				|| page_get_space_id(block->frame)
3006 				!= page_id.space()
3007 				|| page_get_page_no(block->frame)
3008 				!= page_id.page_no()
3009 				|| (mach_read_from_2(FIL_PAGE_TYPE
3010 						    + block->frame)
3011 				    != FIL_PAGE_INDEX
3012 				    && mach_read_from_2(FIL_PAGE_TYPE
3013 							+ block->frame)
3014 				    != FIL_PAGE_TYPE_INSTANT);
3015 			mtr.commit();
3016 			if (corrupted) {
3017 				goto corrupted;
3018 			}
3019 
3020 			if (table->supports_instant()) {
3021 				err = btr_cur_instant_init(table);
3022 			}
3023 		}
3024 	}
3025 
3026 	/* Initialize table foreign_child value. Its value could be
3027 	changed when dict_load_foreigns() is called below */
3028 	table->fk_max_recusive_level = 0;
3029 
3030 	/* If the force recovery flag is set, we open the table irrespective
3031 	of the error condition, since the user may want to dump data from the
3032 	clustered index. However we load the foreign key information only if
3033 	all indexes were loaded. */
3034 	if (!table->is_readable()) {
3035 		/* Don't attempt to load the indexes from disk. */
3036 	} else if (err == DB_SUCCESS) {
3037 		err = dict_load_foreigns(table->name.m_name, NULL,
3038 					 true, true,
3039 					 ignore_err, fk_tables);
3040 
3041 		if (err != DB_SUCCESS) {
3042 			ib::warn() << "Load table " << table->name
3043 				<< " failed, the table has missing"
3044 				" foreign key indexes. Turn off"
3045 				" 'foreign_key_checks' and try again.";
3046 
3047 			dict_sys.remove(table);
3048 			table = NULL;
3049 		} else {
3050 			dict_mem_table_fill_foreign_vcol_set(table);
3051 			table->fk_max_recusive_level = 0;
3052 		}
3053 	} else {
3054 		dict_index_t*   index;
3055 
3056 		/* Make sure that at least the clustered index was loaded.
3057 		Otherwise refuse to load the table */
3058 		index = dict_table_get_first_index(table);
3059 
3060 		if (!srv_force_recovery
3061 		    || !index
3062 		    || !index->is_primary()) {
3063 			dict_sys.remove(table);
3064 			table = NULL;
3065 		} else if (index->is_corrupted()
3066 			   && table->is_readable()) {
3067 			/* It is possible we force to load a corrupted
3068 			clustered index if srv_load_corrupted is set.
3069 			Mark the table as corrupted in this case */
3070 			table->corrupted = true;
3071 		}
3072 	}
3073 
3074 func_exit:
3075 	mem_heap_free(heap);
3076 
3077 	ut_ad(!table
3078 	      || (ignore_err & ~DICT_ERR_IGNORE_FK_NOKEY)
3079 	      || !table->is_readable()
3080 	      || !table->corrupted);
3081 
3082 	if (table && table->fts) {
3083 		if (!(dict_table_has_fts_index(table)
3084 		      || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)
3085 		      || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID))) {
3086 			/* the table->fts could be created in dict_load_column
3087 			when a user defined FTS_DOC_ID is present, but no
3088 			FTS */
3089 			fts_free(table);
3090 		} else if (fts_optimize_wq) {
3091 			fts_optimize_add_table(table);
3092 		} else if (table->can_be_evicted) {
3093 			/* fts_optimize_thread is not started yet.
3094 			So make the table as non-evictable from cache. */
3095 			dict_sys.prevent_eviction(table);
3096 		}
3097 	}
3098 
3099 	ut_ad(err != DB_SUCCESS || dict_foreign_set_validate(*table));
3100 
3101 	DBUG_RETURN(table);
3102 }
3103 
3104 /***********************************************************************//**
3105 Loads a table object based on the table id.
3106 @return table; NULL if table does not exist */
3107 dict_table_t*
dict_load_table_on_id(table_id_t table_id,dict_err_ignore_t ignore_err)3108 dict_load_table_on_id(
3109 /*==================*/
3110 	table_id_t		table_id,	/*!< in: table id */
3111 	dict_err_ignore_t	ignore_err)	/*!< in: errors to ignore
3112 						when loading the table */
3113 {
3114 	byte		id_buf[8];
3115 	btr_pcur_t	pcur;
3116 	mem_heap_t*	heap;
3117 	dtuple_t*	tuple;
3118 	dfield_t*	dfield;
3119 	dict_index_t*	sys_table_ids;
3120 	dict_table_t*	sys_tables;
3121 	const rec_t*	rec;
3122 	const byte*	field;
3123 	ulint		len;
3124 	dict_table_t*	table;
3125 	mtr_t		mtr;
3126 
3127 	ut_ad(mutex_own(&dict_sys.mutex));
3128 
3129 	table = NULL;
3130 
3131 	/* NOTE that the operation of this function is protected by
3132 	the dictionary mutex, and therefore no deadlocks can occur
3133 	with other dictionary operations. */
3134 
3135 	mtr_start(&mtr);
3136 	/*---------------------------------------------------*/
3137 	/* Get the secondary index based on ID for table SYS_TABLES */
3138 	sys_tables = dict_sys.sys_tables;
3139 	sys_table_ids = dict_table_get_next_index(
3140 		dict_table_get_first_index(sys_tables));
3141 	ut_ad(!dict_table_is_comp(sys_tables));
3142 	ut_ad(!dict_index_is_clust(sys_table_ids));
3143 	heap = mem_heap_create(256);
3144 
3145 	tuple  = dtuple_create(heap, 1);
3146 	dfield = dtuple_get_nth_field(tuple, 0);
3147 
3148 	/* Write the table id in byte format to id_buf */
3149 	mach_write_to_8(id_buf, table_id);
3150 
3151 	dfield_set_data(dfield, id_buf, 8);
3152 	dict_index_copy_types(tuple, sys_table_ids, 1);
3153 
3154 	btr_pcur_open_on_user_rec(sys_table_ids, tuple, PAGE_CUR_GE,
3155 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3156 
3157 	rec = btr_pcur_get_rec(&pcur);
3158 
3159 	if (page_rec_is_user_rec(rec)) {
3160 		/*---------------------------------------------------*/
3161 		/* Now we have the record in the secondary index
3162 		containing the table ID and NAME */
3163 check_rec:
3164 		field = rec_get_nth_field_old(
3165 			rec, DICT_FLD__SYS_TABLE_IDS__ID, &len);
3166 		ut_ad(len == 8);
3167 
3168 		/* Check if the table id in record is the one searched for */
3169 		if (table_id == mach_read_from_8(field)) {
3170 			if (rec_get_deleted_flag(rec, 0)) {
3171 				/* Until purge has completed, there
3172 				may be delete-marked duplicate records
3173 				for the same SYS_TABLES.ID, but different
3174 				SYS_TABLES.NAME. */
3175 				while (btr_pcur_move_to_next(&pcur, &mtr)) {
3176 					rec = btr_pcur_get_rec(&pcur);
3177 
3178 					if (page_rec_is_user_rec(rec)) {
3179 						goto check_rec;
3180 					}
3181 				}
3182 			} else {
3183 				/* Now we get the table name from the record */
3184 				field = rec_get_nth_field_old(rec,
3185 					DICT_FLD__SYS_TABLE_IDS__NAME, &len);
3186 				/* Load the table definition to memory */
3187 				char*	table_name = mem_heap_strdupl(
3188 					heap, (char*) field, len);
3189 				table = dict_load_table(table_name, ignore_err);
3190 			}
3191 		}
3192 	}
3193 
3194 	btr_pcur_close(&pcur);
3195 	mtr_commit(&mtr);
3196 	mem_heap_free(heap);
3197 
3198 	return(table);
3199 }
3200 
3201 /********************************************************************//**
3202 This function is called when the database is booted. Loads system table
3203 index definitions except for the clustered index which is added to the
3204 dictionary cache at booting before calling this function. */
3205 void
dict_load_sys_table(dict_table_t * table)3206 dict_load_sys_table(
3207 /*================*/
3208 	dict_table_t*	table)	/*!< in: system table */
3209 {
3210 	mem_heap_t*	heap;
3211 
3212 	ut_ad(mutex_own(&dict_sys.mutex));
3213 
3214 	heap = mem_heap_create(1000);
3215 
3216 	dict_load_indexes(table, heap, DICT_ERR_IGNORE_NONE);
3217 
3218 	mem_heap_free(heap);
3219 }
3220 
3221 /********************************************************************//**
3222 Loads foreign key constraint col names (also for the referenced table).
3223 Members that must be set (and valid) in foreign:
3224 foreign->heap
3225 foreign->n_fields
3226 foreign->id ('\0'-terminated)
3227 Members that will be created and set by this function:
3228 foreign->foreign_col_names[i]
3229 foreign->referenced_col_names[i]
3230 (for i=0..foreign->n_fields-1) */
3231 static
3232 void
dict_load_foreign_cols(dict_foreign_t * foreign)3233 dict_load_foreign_cols(
3234 /*===================*/
3235 	dict_foreign_t*	foreign)/*!< in/out: foreign constraint object */
3236 {
3237 	dict_table_t*	sys_foreign_cols;
3238 	dict_index_t*	sys_index;
3239 	btr_pcur_t	pcur;
3240 	dtuple_t*	tuple;
3241 	dfield_t*	dfield;
3242 	const rec_t*	rec;
3243 	const byte*	field;
3244 	ulint		len;
3245 	ulint		i;
3246 	mtr_t		mtr;
3247 	size_t		id_len;
3248 
3249 	ut_ad(mutex_own(&dict_sys.mutex));
3250 
3251 	id_len = strlen(foreign->id);
3252 
3253 	foreign->foreign_col_names = static_cast<const char**>(
3254 		mem_heap_alloc(foreign->heap,
3255 			       foreign->n_fields * sizeof(void*)));
3256 
3257 	foreign->referenced_col_names = static_cast<const char**>(
3258 		mem_heap_alloc(foreign->heap,
3259 			       foreign->n_fields * sizeof(void*)));
3260 
3261 	mtr_start(&mtr);
3262 
3263 	sys_foreign_cols = dict_table_get_low("SYS_FOREIGN_COLS");
3264 
3265 	sys_index = UT_LIST_GET_FIRST(sys_foreign_cols->indexes);
3266 	ut_ad(!dict_table_is_comp(sys_foreign_cols));
3267 
3268 	tuple = dtuple_create(foreign->heap, 1);
3269 	dfield = dtuple_get_nth_field(tuple, 0);
3270 
3271 	dfield_set_data(dfield, foreign->id, id_len);
3272 	dict_index_copy_types(tuple, sys_index, 1);
3273 
3274 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3275 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3276 	for (i = 0; i < foreign->n_fields; i++) {
3277 
3278 		rec = btr_pcur_get_rec(&pcur);
3279 
3280 		ut_a(btr_pcur_is_on_user_rec(&pcur));
3281 		ut_a(!rec_get_deleted_flag(rec, 0));
3282 
3283 		field = rec_get_nth_field_old(
3284 			rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
3285 
3286 		if (len != id_len || memcmp(foreign->id, field, len)) {
3287 			const rec_t*	pos;
3288 			ulint		pos_len;
3289 			const rec_t*	for_col_name;
3290 			ulint		for_col_name_len;
3291 			const rec_t*	ref_col_name;
3292 			ulint		ref_col_name_len;
3293 
3294 			pos = rec_get_nth_field_old(
3295 				rec, DICT_FLD__SYS_FOREIGN_COLS__POS,
3296 				&pos_len);
3297 
3298 			for_col_name = rec_get_nth_field_old(
3299 				rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME,
3300 				&for_col_name_len);
3301 
3302 			ref_col_name = rec_get_nth_field_old(
3303 				rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME,
3304 				&ref_col_name_len);
3305 
3306 			ib::fatal	sout;
3307 
3308 			sout << "Unable to load column names for foreign"
3309 				" key '" << foreign->id
3310 				<< "' because it was not found in"
3311 				" InnoDB internal table SYS_FOREIGN_COLS. The"
3312 				" closest entry we found is:"
3313 				" (ID='";
3314 			sout.write(field, len);
3315 			sout << "', POS=" << mach_read_from_4(pos)
3316 				<< ", FOR_COL_NAME='";
3317 			sout.write(for_col_name, for_col_name_len);
3318 			sout << "', REF_COL_NAME='";
3319 			sout.write(ref_col_name, ref_col_name_len);
3320 			sout << "')";
3321 		}
3322 
3323 		field = rec_get_nth_field_old(
3324 			rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
3325 		ut_a(len == 4);
3326 		ut_a(i == mach_read_from_4(field));
3327 
3328 		field = rec_get_nth_field_old(
3329 			rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
3330 		foreign->foreign_col_names[i] = mem_heap_strdupl(
3331 			foreign->heap, (char*) field, len);
3332 
3333 		field = rec_get_nth_field_old(
3334 			rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
3335 		foreign->referenced_col_names[i] = mem_heap_strdupl(
3336 			foreign->heap, (char*) field, len);
3337 
3338 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
3339 	}
3340 
3341 	btr_pcur_close(&pcur);
3342 	mtr_commit(&mtr);
3343 }
3344 
3345 /***********************************************************************//**
3346 Loads a foreign key constraint to the dictionary cache. If the referenced
3347 table is not yet loaded, it is added in the output parameter (fk_tables).
3348 @return DB_SUCCESS or error code */
3349 static MY_ATTRIBUTE((nonnull(1), warn_unused_result))
3350 dberr_t
dict_load_foreign(const char * id,const char ** col_names,bool check_recursive,bool check_charsets,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)3351 dict_load_foreign(
3352 /*==============*/
3353 	const char*		id,
3354 				/*!< in: foreign constraint id, must be
3355 				'\0'-terminated */
3356 	const char**		col_names,
3357 				/*!< in: column names, or NULL
3358 				to use foreign->foreign_table->col_names */
3359 	bool			check_recursive,
3360 				/*!< in: whether to record the foreign table
3361 				parent count to avoid unlimited recursive
3362 				load of chained foreign tables */
3363 	bool			check_charsets,
3364 				/*!< in: whether to check charset
3365 				compatibility */
3366 	dict_err_ignore_t	ignore_err,
3367 				/*!< in: error to be ignored */
3368 	dict_names_t&	fk_tables)
3369 				/*!< out: the foreign key constraint is added
3370 				to the dictionary cache only if the referenced
3371 				table is already in cache.  Otherwise, the
3372 				foreign key constraint is not added to cache,
3373 				and the referenced table is added to this
3374 				stack. */
3375 {
3376 	dict_foreign_t*	foreign;
3377 	dict_table_t*	sys_foreign;
3378 	btr_pcur_t	pcur;
3379 	dict_index_t*	sys_index;
3380 	dtuple_t*	tuple;
3381 	mem_heap_t*	heap2;
3382 	dfield_t*	dfield;
3383 	const rec_t*	rec;
3384 	const byte*	field;
3385 	ulint		len;
3386 	mtr_t		mtr;
3387 	dict_table_t*	for_table;
3388 	dict_table_t*	ref_table;
3389 	size_t		id_len;
3390 
3391 	DBUG_ENTER("dict_load_foreign");
3392 	DBUG_PRINT("dict_load_foreign",
3393 		   ("id: '%s', check_recursive: %d", id, check_recursive));
3394 
3395 	ut_ad(mutex_own(&dict_sys.mutex));
3396 
3397 	id_len = strlen(id);
3398 
3399 	heap2 = mem_heap_create(1000);
3400 
3401 	mtr_start(&mtr);
3402 
3403 	sys_foreign = dict_table_get_low("SYS_FOREIGN");
3404 
3405 	sys_index = UT_LIST_GET_FIRST(sys_foreign->indexes);
3406 	ut_ad(!dict_table_is_comp(sys_foreign));
3407 
3408 	tuple = dtuple_create(heap2, 1);
3409 	dfield = dtuple_get_nth_field(tuple, 0);
3410 
3411 	dfield_set_data(dfield, id, id_len);
3412 	dict_index_copy_types(tuple, sys_index, 1);
3413 
3414 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3415 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3416 	rec = btr_pcur_get_rec(&pcur);
3417 
3418 	if (!btr_pcur_is_on_user_rec(&pcur)
3419 	    || rec_get_deleted_flag(rec, 0)) {
3420 		/* Not found */
3421 
3422 		ib::error() << "Cannot load foreign constraint " << id
3423 			<< ": could not find the relevant record in "
3424 			<< "SYS_FOREIGN";
3425 
3426 		btr_pcur_close(&pcur);
3427 		mtr_commit(&mtr);
3428 		mem_heap_free(heap2);
3429 
3430 		DBUG_RETURN(DB_ERROR);
3431 	}
3432 
3433 	field = rec_get_nth_field_old(rec, DICT_FLD__SYS_FOREIGN__ID, &len);
3434 
3435 	/* Check if the id in record is the searched one */
3436 	if (len != id_len || memcmp(id, field, len)) {
3437 		{
3438 			ib::error	err;
3439 			err << "Cannot load foreign constraint " << id
3440 				<< ": found ";
3441 			err.write(field, len);
3442 			err << " instead in SYS_FOREIGN";
3443 		}
3444 
3445 		btr_pcur_close(&pcur);
3446 		mtr_commit(&mtr);
3447 		mem_heap_free(heap2);
3448 
3449 		DBUG_RETURN(DB_ERROR);
3450 	}
3451 
3452 	/* Read the table names and the number of columns associated
3453 	with the constraint */
3454 
3455 	mem_heap_free(heap2);
3456 
3457 	foreign = dict_mem_foreign_create();
3458 
3459 	uint32_t n_fields_and_type = mach_read_from_4(
3460 		rec_get_nth_field_old(
3461 			rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len));
3462 
3463 	ut_a(len == 4);
3464 
3465 	/* We store the type in the bits 24..29 of n_fields_and_type. */
3466 
3467 	foreign->type = (n_fields_and_type >> 24) & ((1U << 6) - 1);
3468 	foreign->n_fields = n_fields_and_type & dict_index_t::MAX_N_FIELDS;
3469 
3470 	foreign->id = mem_heap_strdupl(foreign->heap, id, id_len);
3471 
3472 	field = rec_get_nth_field_old(
3473 		rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
3474 
3475 	foreign->foreign_table_name = mem_heap_strdupl(
3476 		foreign->heap, (char*) field, len);
3477 	dict_mem_foreign_table_name_lookup_set(foreign, TRUE);
3478 
3479 	const ulint foreign_table_name_len = len;
3480 
3481 	field = rec_get_nth_field_old(
3482 		rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
3483 	foreign->referenced_table_name = mem_heap_strdupl(
3484 		foreign->heap, (char*) field, len);
3485 	dict_mem_referenced_table_name_lookup_set(foreign, TRUE);
3486 
3487 	btr_pcur_close(&pcur);
3488 	mtr_commit(&mtr);
3489 
3490 	dict_load_foreign_cols(foreign);
3491 
3492 	ref_table = dict_table_check_if_in_cache_low(
3493 		foreign->referenced_table_name_lookup);
3494 	for_table = dict_table_check_if_in_cache_low(
3495 		foreign->foreign_table_name_lookup);
3496 
3497 	if (!for_table) {
3498 		/* To avoid recursively loading the tables related through
3499 		the foreign key constraints, the child table name is saved
3500 		here.  The child table will be loaded later, along with its
3501 		foreign key constraint. */
3502 
3503 		ut_a(ref_table != NULL);
3504 		fk_tables.push_back(
3505 			mem_heap_strdupl(ref_table->heap,
3506 					 foreign->foreign_table_name_lookup,
3507 					 foreign_table_name_len));
3508 
3509 		dict_foreign_remove_from_cache(foreign);
3510 		DBUG_RETURN(DB_SUCCESS);
3511 	}
3512 
3513 	ut_a(for_table || ref_table);
3514 
3515 	/* Note that there may already be a foreign constraint object in
3516 	the dictionary cache for this constraint: then the following
3517 	call only sets the pointers in it to point to the appropriate table
3518 	and index objects and frees the newly created object foreign.
3519 	Adding to the cache should always succeed since we are not creating
3520 	a new foreign key constraint but loading one from the data
3521 	dictionary. */
3522 
3523 	DBUG_RETURN(dict_foreign_add_to_cache(foreign, col_names,
3524 					      check_charsets,
3525 					      ignore_err));
3526 }
3527 
3528 /***********************************************************************//**
3529 Loads foreign key constraints where the table is either the foreign key
3530 holder or where the table is referenced by a foreign key. Adds these
3531 constraints to the data dictionary.
3532 
3533 The foreign key constraint is loaded only if the referenced table is also
3534 in the dictionary cache.  If the referenced table is not in dictionary
3535 cache, then it is added to the output parameter (fk_tables).
3536 
3537 @return DB_SUCCESS or error code */
3538 dberr_t
dict_load_foreigns(const char * table_name,const char ** col_names,bool check_recursive,bool check_charsets,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)3539 dict_load_foreigns(
3540 	const char*		table_name,	/*!< in: table name */
3541 	const char**		col_names,	/*!< in: column names, or NULL
3542 						to use table->col_names */
3543 	bool			check_recursive,/*!< in: Whether to check
3544 						recursive load of tables
3545 						chained by FK */
3546 	bool			check_charsets,	/*!< in: whether to check
3547 						charset compatibility */
3548 	dict_err_ignore_t	ignore_err,	/*!< in: error to be ignored */
3549 	dict_names_t&		fk_tables)
3550 						/*!< out: stack of table
3551 						names which must be loaded
3552 						subsequently to load all the
3553 						foreign key constraints. */
3554 {
3555 	ulint		tuple_buf[(DTUPLE_EST_ALLOC(1) + sizeof(ulint) - 1)
3556 				/ sizeof(ulint)];
3557 	btr_pcur_t	pcur;
3558 	dtuple_t*	tuple;
3559 	dfield_t*	dfield;
3560 	dict_index_t*	sec_index;
3561 	dict_table_t*	sys_foreign;
3562 	const rec_t*	rec;
3563 	const byte*	field;
3564 	ulint		len;
3565 	dberr_t		err;
3566 	mtr_t		mtr;
3567 
3568 	DBUG_ENTER("dict_load_foreigns");
3569 
3570 	ut_ad(mutex_own(&dict_sys.mutex));
3571 
3572 	sys_foreign = dict_table_get_low("SYS_FOREIGN");
3573 
3574 	if (sys_foreign == NULL) {
3575 		/* No foreign keys defined yet in this database */
3576 
3577 		ib::info() << "No foreign key system tables in the database";
3578 		DBUG_RETURN(DB_ERROR);
3579 	}
3580 
3581 	ut_ad(!dict_table_is_comp(sys_foreign));
3582 	mtr_start(&mtr);
3583 
3584 	/* Get the secondary index based on FOR_NAME from table
3585 	SYS_FOREIGN */
3586 
3587 	sec_index = dict_table_get_next_index(
3588 		dict_table_get_first_index(sys_foreign));
3589 	ut_ad(!dict_index_is_clust(sec_index));
3590 start_load:
3591 
3592 	tuple = dtuple_create_from_mem(tuple_buf, sizeof(tuple_buf), 1, 0);
3593 	dfield = dtuple_get_nth_field(tuple, 0);
3594 
3595 	dfield_set_data(dfield, table_name, strlen(table_name));
3596 	dict_index_copy_types(tuple, sec_index, 1);
3597 
3598 	btr_pcur_open_on_user_rec(sec_index, tuple, PAGE_CUR_GE,
3599 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3600 loop:
3601 	rec = btr_pcur_get_rec(&pcur);
3602 
3603 	if (!btr_pcur_is_on_user_rec(&pcur)) {
3604 		/* End of index */
3605 
3606 		goto load_next_index;
3607 	}
3608 
3609 	/* Now we have the record in the secondary index containing a table
3610 	name and a foreign constraint ID */
3611 
3612 	field = rec_get_nth_field_old(
3613 		rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__NAME, &len);
3614 
3615 	/* Check if the table name in the record is the one searched for; the
3616 	following call does the comparison in the latin1_swedish_ci
3617 	charset-collation, in a case-insensitive way. */
3618 
3619 	if (0 != cmp_data_data(dfield_get_type(dfield)->mtype,
3620 			       dfield_get_type(dfield)->prtype,
3621 			       static_cast<const byte*>(
3622 				       dfield_get_data(dfield)),
3623 			       dfield_get_len(dfield),
3624 			       field, len)) {
3625 
3626 		goto load_next_index;
3627 	}
3628 
3629 	/* Since table names in SYS_FOREIGN are stored in a case-insensitive
3630 	order, we have to check that the table name matches also in a binary
3631 	string comparison. On Unix, MySQL allows table names that only differ
3632 	in character case.  If lower_case_table_names=2 then what is stored
3633 	may not be the same case, but the previous comparison showed that they
3634 	match with no-case.  */
3635 
3636 	if (rec_get_deleted_flag(rec, 0)) {
3637 		goto next_rec;
3638 	}
3639 
3640 	if (innobase_get_lower_case_table_names() != 2
3641 	    && memcmp(field, table_name, len)) {
3642 		goto next_rec;
3643 	}
3644 
3645 	/* Now we get a foreign key constraint id */
3646 	field = rec_get_nth_field_old(
3647 		rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__ID, &len);
3648 
3649 	/* Copy the string because the page may be modified or evicted
3650 	after mtr_commit() below. */
3651 	char	fk_id[MAX_TABLE_NAME_LEN + 1];
3652 
3653 	ut_a(len <= MAX_TABLE_NAME_LEN);
3654 	memcpy(fk_id, field, len);
3655 	fk_id[len] = '\0';
3656 
3657 	btr_pcur_store_position(&pcur, &mtr);
3658 
3659 	mtr_commit(&mtr);
3660 
3661 	/* Load the foreign constraint definition to the dictionary cache */
3662 
3663 	err = dict_load_foreign(fk_id, col_names,
3664 				check_recursive, check_charsets, ignore_err,
3665 				fk_tables);
3666 
3667 	if (err != DB_SUCCESS) {
3668 		btr_pcur_close(&pcur);
3669 
3670 		DBUG_RETURN(err);
3671 	}
3672 
3673 	mtr_start(&mtr);
3674 
3675 	btr_pcur_restore_position(BTR_SEARCH_LEAF, &pcur, &mtr);
3676 next_rec:
3677 	btr_pcur_move_to_next_user_rec(&pcur, &mtr);
3678 
3679 	goto loop;
3680 
3681 load_next_index:
3682 	btr_pcur_close(&pcur);
3683 	mtr_commit(&mtr);
3684 
3685 	sec_index = dict_table_get_next_index(sec_index);
3686 
3687 	if (sec_index != NULL) {
3688 
3689 		mtr_start(&mtr);
3690 
3691 		/* Switch to scan index on REF_NAME, fk_max_recusive_level
3692 		already been updated when scanning FOR_NAME index, no need to
3693 		update again */
3694 		check_recursive = FALSE;
3695 
3696 		goto start_load;
3697 	}
3698 
3699 	DBUG_RETURN(DB_SUCCESS);
3700 }
3701