1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file dict/dict0load.cc
29 Loads to the memory cache database object definitions
30 from dictionary tables
31 
32 Created 4/24/1996 Heikki Tuuri
33 *******************************************************/
34 
35 #include "ha_prototypes.h"
36 
37 #include "dict0load.h"
38 #ifdef UNIV_NONINL
39 #include "dict0load.ic"
40 #endif
41 
42 #include "mysql_version.h"
43 #include "btr0pcur.h"
44 #include "btr0btr.h"
45 #include "dict0boot.h"
46 #include "dict0crea.h"
47 #include "dict0dict.h"
48 #include "dict0mem.h"
49 #include "dict0priv.h"
50 #include "dict0stats.h"
51 #include "fsp0file.h"
52 #include "fsp0sysspace.h"
53 #include "fts0priv.h"
54 #include "mach0data.h"
55 #include "page0page.h"
56 #include "rem0cmp.h"
57 #include "srv0start.h"
58 #include "srv0srv.h"
59 #include <stack>
60 #include <set>
61 
62 /** Following are the InnoDB system tables. The positions in
63 this array are referenced by enum dict_system_table_id. */
64 static const char* SYSTEM_TABLE_NAME[] = {
65 	"SYS_TABLES",
66 	"SYS_INDEXES",
67 	"SYS_COLUMNS",
68 	"SYS_FIELDS",
69 	"SYS_FOREIGN",
70 	"SYS_FOREIGN_COLS",
71 	"SYS_TABLESPACES",
72 	"SYS_DATAFILES",
73 	"SYS_VIRTUAL",
74 	"SYS_ZIP_DICT",
75 	"SYS_ZIP_DICT_COLS"
76 };
77 
78 /** Loads a table definition and also all its index definitions.
79 
80 Loads those foreign key constraints whose referenced table is already in
81 dictionary cache.  If a foreign key constraint is not loaded, then the
82 referenced table is pushed into the output stack (fk_tables), if it is not
83 NULL.  These tables must be subsequently loaded so that all the foreign
84 key constraints are loaded into memory.
85 
86 @param[in]	name		Table name in the db/tablename format
87 @param[in]	cached		true=add to cache, false=do not
88 @param[in]	ignore_err	Error to be ignored when loading table
89 				and its index definition
90 @param[out]	fk_tables	Related table names that must also be
91 				loaded to ensure that all foreign key
92 				constraints are loaded.
93 @return table, NULL if does not exist; if the table is stored in an
94 .ibd file, but the file does not exist, then we set the
95 file_unreadable flag TRUE in the table object we return */
96 static
97 dict_table_t*
98 dict_load_table_one(
99 	table_name_t&		name,
100 	bool			cached,
101 	dict_err_ignore_t	ignore_err,
102 	dict_names_t&		fk_tables);
103 
104 /** Loads a table definition from a SYS_TABLES record to dict_table_t.
105 Does not load any columns or indexes.
106 @param[in]	name	Table name
107 @param[in]	rec	SYS_TABLES record
108 @param[out,own]	table	Table, or NULL
109 @return error message, or NULL on success */
110 static
111 const char*
112 dict_load_table_low(
113 	table_name_t&	name,
114 	const rec_t*	rec,
115 	dict_table_t**	table);
116 
117 /* If this flag is TRUE, then we will load the cluster index's (and tables')
118 metadata even if it is marked as "corrupted". */
119 my_bool     srv_load_corrupted = FALSE;
120 
121 #ifdef UNIV_DEBUG
122 /****************************************************************//**
123 Compare the name of an index column.
124 @return TRUE if the i'th column of index is 'name'. */
125 static
126 ibool
name_of_col_is(const dict_table_t * table,const dict_index_t * index,ulint i,const char * name)127 name_of_col_is(
128 /*===========*/
129 	const dict_table_t*	table,	/*!< in: table */
130 	const dict_index_t*	index,	/*!< in: index */
131 	ulint			i,	/*!< in: index field offset */
132 	const char*		name)	/*!< in: name to compare to */
133 {
134 	ulint	tmp = dict_col_get_no(dict_field_get_col(
135 					      dict_index_get_nth_field(
136 						      index, i)));
137 
138 	return(strcmp(name, dict_table_get_col_name(table, tmp)) == 0);
139 }
140 #endif /* UNIV_DEBUG */
141 
142 /********************************************************************//**
143 Finds the first table name in the given database.
144 @return own: table name, NULL if does not exist; the caller must free
145 the memory in the string! */
146 char*
dict_get_first_table_name_in_db(const char * name)147 dict_get_first_table_name_in_db(
148 /*============================*/
149 	const char*	name)	/*!< in: database name which ends in '/' */
150 {
151 	dict_table_t*	sys_tables;
152 	btr_pcur_t	pcur;
153 	dict_index_t*	sys_index;
154 	dtuple_t*	tuple;
155 	mem_heap_t*	heap;
156 	dfield_t*	dfield;
157 	const rec_t*	rec;
158 	const byte*	field;
159 	ulint		len;
160 	mtr_t		mtr;
161 
162 	ut_ad(mutex_own(&dict_sys->mutex));
163 
164 	heap = mem_heap_create(1000);
165 
166 	mtr_start(&mtr);
167 
168 	sys_tables = dict_table_get_low("SYS_TABLES");
169 	sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
170 	ut_ad(!dict_table_is_comp(sys_tables));
171 
172 	tuple = dtuple_create(heap, 1);
173 	dfield = dtuple_get_nth_field(tuple, 0);
174 
175 	dfield_set_data(dfield, name, ut_strlen(name));
176 	dict_index_copy_types(tuple, sys_index, 1);
177 
178 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
179 				  BTR_SEARCH_LEAF, &pcur, &mtr);
180 loop:
181 	rec = btr_pcur_get_rec(&pcur);
182 
183 	if (!btr_pcur_is_on_user_rec(&pcur)) {
184 		/* Not found */
185 
186 		btr_pcur_close(&pcur);
187 		mtr_commit(&mtr);
188 		mem_heap_free(heap);
189 
190 		return(NULL);
191 	}
192 
193 	field = rec_get_nth_field_old(
194 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
195 
196 	if (len < strlen(name)
197 	    || ut_memcmp(name, field, strlen(name)) != 0) {
198 		/* Not found */
199 
200 		btr_pcur_close(&pcur);
201 		mtr_commit(&mtr);
202 		mem_heap_free(heap);
203 
204 		return(NULL);
205 	}
206 
207 	if (!rec_get_deleted_flag(rec, 0)) {
208 
209 		/* We found one */
210 
211 		char*	table_name = mem_strdupl((char*) field, len);
212 
213 		btr_pcur_close(&pcur);
214 		mtr_commit(&mtr);
215 		mem_heap_free(heap);
216 
217 		return(table_name);
218 	}
219 
220 	btr_pcur_move_to_next_user_rec(&pcur, &mtr);
221 
222 	goto loop;
223 }
224 
225 /********************************************************************//**
226 This function gets the next system table record as it scans the table.
227 @return the next record if found, NULL if end of scan */
228 static
229 const rec_t*
dict_getnext_system_low(btr_pcur_t * pcur,mtr_t * mtr)230 dict_getnext_system_low(
231 /*====================*/
232 	btr_pcur_t*	pcur,		/*!< in/out: persistent cursor to the
233 					record*/
234 	mtr_t*		mtr)		/*!< in: the mini-transaction */
235 {
236 	rec_t*	rec = NULL;
237 
238 	while (!rec || rec_get_deleted_flag(rec, 0)) {
239 		btr_pcur_move_to_next_user_rec(pcur, mtr);
240 
241 		rec = btr_pcur_get_rec(pcur);
242 
243 		if (!btr_pcur_is_on_user_rec(pcur)) {
244 			/* end of index */
245 			btr_pcur_close(pcur);
246 
247 			return(NULL);
248 		}
249 	}
250 
251 	/* Get a record, let's save the position */
252 	btr_pcur_store_position(pcur, mtr);
253 
254 	return(rec);
255 }
256 
257 /********************************************************************//**
258 This function opens a system table, and returns the first record.
259 @return first record of the system table */
260 const rec_t*
dict_startscan_system(btr_pcur_t * pcur,mtr_t * mtr,dict_system_id_t system_id)261 dict_startscan_system(
262 /*==================*/
263 	btr_pcur_t*	pcur,		/*!< out: persistent cursor to
264 					the record */
265 	mtr_t*		mtr,		/*!< in: the mini-transaction */
266 	dict_system_id_t system_id)	/*!< in: which system table to open */
267 {
268 	dict_table_t*	system_table;
269 	dict_index_t*	clust_index;
270 	const rec_t*	rec;
271 
272 	ut_a(system_id < SYS_NUM_SYSTEM_TABLES);
273 
274 	system_table = dict_table_get_low(SYSTEM_TABLE_NAME[system_id]);
275 
276 	clust_index = UT_LIST_GET_FIRST(system_table->indexes);
277 
278 	btr_pcur_open_at_index_side(true, clust_index, BTR_SEARCH_LEAF, pcur,
279 				    true, 0, mtr);
280 
281 	rec = dict_getnext_system_low(pcur, mtr);
282 
283 	return(rec);
284 }
285 
286 /********************************************************************//**
287 This function gets the next system table record as it scans the table.
288 @return the next record if found, NULL if end of scan */
289 const rec_t*
dict_getnext_system(btr_pcur_t * pcur,mtr_t * mtr)290 dict_getnext_system(
291 /*================*/
292 	btr_pcur_t*	pcur,		/*!< in/out: persistent cursor
293 					to the record */
294 	mtr_t*		mtr)		/*!< in: the mini-transaction */
295 {
296 	const rec_t*	rec;
297 
298 	/* Restore the position */
299 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
300 
301 	/* Get the next record */
302 	rec = dict_getnext_system_low(pcur, mtr);
303 
304 	return(rec);
305 }
306 
307 /********************************************************************//**
308 This function processes one SYS_TABLES record and populate the dict_table_t
309 struct for the table. Extracted out of dict_print() to be used by
310 both monitor table output and information schema innodb_sys_tables output.
311 @return error message, or NULL on success */
312 const char*
dict_process_sys_tables_rec_and_mtr_commit(mem_heap_t * heap,const rec_t * rec,dict_table_t ** table,dict_table_info_t status,mtr_t * mtr)313 dict_process_sys_tables_rec_and_mtr_commit(
314 /*=======================================*/
315 	mem_heap_t*	heap,		/*!< in/out: temporary memory heap */
316 	const rec_t*	rec,		/*!< in: SYS_TABLES record */
317 	dict_table_t**	table,		/*!< out: dict_table_t to fill */
318 	dict_table_info_t status,	/*!< in: status bit controls
319 					options such as whether we shall
320 					look for dict_table_t from cache
321 					first */
322 	mtr_t*		mtr)		/*!< in/out: mini-transaction,
323 					will be committed */
324 {
325 	ulint		len;
326 	const char*	field;
327 	const char*	err_msg = NULL;
328 	table_name_t	table_name;
329 
330 	field = (const char*) rec_get_nth_field_old(
331 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
332 
333 	ut_a(!rec_get_deleted_flag(rec, 0));
334 
335 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
336 
337 	/* Get the table name */
338 	table_name.m_name = mem_heap_strdupl(heap, field, len);
339 
340 	/* If DICT_TABLE_LOAD_FROM_CACHE is set, first check
341 	whether there is cached dict_table_t struct */
342 	if (status & DICT_TABLE_LOAD_FROM_CACHE) {
343 
344 		/* Commit before load the table again */
345 		mtr_commit(mtr);
346 
347 		*table = dict_table_get_low(table_name.m_name);
348 
349 		if (!(*table)) {
350 			err_msg = "Table not found in cache";
351 		}
352 	} else {
353 		err_msg = dict_load_table_low(table_name, rec, table);
354 		mtr_commit(mtr);
355 	}
356 
357 	if (err_msg) {
358 		return(err_msg);
359 	}
360 
361 	return(NULL);
362 }
363 
364 /********************************************************************//**
365 This function parses a SYS_INDEXES record and populate a dict_index_t
366 structure with the information from the record. For detail information
367 about SYS_INDEXES fields, please refer to dict_boot() function.
368 @return error message, or NULL on success */
369 const char*
dict_process_sys_indexes_rec(mem_heap_t * heap,const rec_t * rec,dict_index_t * index,table_id_t * table_id)370 dict_process_sys_indexes_rec(
371 /*=========================*/
372 	mem_heap_t*	heap,		/*!< in/out: heap memory */
373 	const rec_t*	rec,		/*!< in: current SYS_INDEXES rec */
374 	dict_index_t*	index,		/*!< out: index to be filled */
375 	table_id_t*	table_id)	/*!< out: index table id */
376 {
377 	const char*	err_msg;
378 	byte*		buf;
379 
380 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
381 
382 	/* Parse the record, and get "dict_index_t" struct filled */
383 	err_msg = dict_load_index_low(buf, NULL,
384 				      heap, rec, FALSE, &index);
385 
386 	*table_id = mach_read_from_8(buf);
387 
388 	return(err_msg);
389 }
390 
391 /********************************************************************//**
392 This function parses a SYS_COLUMNS record and populate a dict_column_t
393 structure with the information from the record.
394 @return error message, or NULL on success */
395 const char*
dict_process_sys_columns_rec(mem_heap_t * heap,const rec_t * rec,dict_col_t * column,table_id_t * table_id,const char ** col_name,ulint * nth_v_col)396 dict_process_sys_columns_rec(
397 /*=========================*/
398 	mem_heap_t*	heap,		/*!< in/out: heap memory */
399 	const rec_t*	rec,		/*!< in: current SYS_COLUMNS rec */
400 	dict_col_t*	column,		/*!< out: dict_col_t to be filled */
401 	table_id_t*	table_id,	/*!< out: table id */
402 	const char**	col_name,	/*!< out: column name */
403 	ulint*		nth_v_col)	/*!< out: if virtual col, this is
404 					record's sequence number */
405 {
406 	const char*	err_msg;
407 
408 	/* Parse the record, and get "dict_col_t" struct filled */
409 	err_msg = dict_load_column_low(NULL, heap, column,
410 				       table_id, col_name, rec, nth_v_col);
411 
412 	return(err_msg);
413 }
414 
415 /** This function parses a SYS_VIRTUAL record and extracts virtual column
416 information
417 @param[in,out]	heap		heap memory
418 @param[in]	rec		current SYS_COLUMNS rec
419 @param[in,out]	table_id	table id
420 @param[in,out]	pos		virtual column position
421 @param[in,out]	base_pos	base column position
422 @return error message, or NULL on success */
423 const char*
dict_process_sys_virtual_rec(mem_heap_t * heap,const rec_t * rec,table_id_t * table_id,ulint * pos,ulint * base_pos)424 dict_process_sys_virtual_rec(
425 	mem_heap_t*	heap,
426 	const rec_t*	rec,
427 	table_id_t*	table_id,
428 	ulint*		pos,
429 	ulint*		base_pos)
430 {
431 	const char*	err_msg;
432 
433 	/* Parse the record, and get "dict_col_t" struct filled */
434 	err_msg = dict_load_virtual_low(NULL, heap, NULL, table_id,
435 					pos, base_pos, rec);
436 
437 	return(err_msg);
438 }
439 /********************************************************************//**
440 This function parses a SYS_FIELDS record and populates a dict_field_t
441 structure with the information from the record.
442 @return error message, or NULL on success */
443 const char*
dict_process_sys_fields_rec(mem_heap_t * heap,const rec_t * rec,dict_field_t * sys_field,ulint * pos,index_id_t * index_id,index_id_t last_id)444 dict_process_sys_fields_rec(
445 /*========================*/
446 	mem_heap_t*	heap,		/*!< in/out: heap memory */
447 	const rec_t*	rec,		/*!< in: current SYS_FIELDS rec */
448 	dict_field_t*	sys_field,	/*!< out: dict_field_t to be
449 					filled */
450 	ulint*		pos,		/*!< out: Field position */
451 	index_id_t*	index_id,	/*!< out: current index id */
452 	index_id_t	last_id)	/*!< in: previous index id */
453 {
454 	byte*		buf;
455 	byte*		last_index_id;
456 	const char*	err_msg;
457 
458 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
459 
460 	last_index_id = static_cast<byte*>(mem_heap_alloc(heap, 8));
461 	mach_write_to_8(last_index_id, last_id);
462 
463 	err_msg = dict_load_field_low(buf, NULL, sys_field,
464 				      pos, last_index_id, heap, rec);
465 
466 	*index_id = mach_read_from_8(buf);
467 
468 	return(err_msg);
469 
470 }
471 
472 /********************************************************************//**
473 This function parses a SYS_FOREIGN record and populate a dict_foreign_t
474 structure with the information from the record. For detail information
475 about SYS_FOREIGN fields, please refer to dict_load_foreign() function.
476 @return error message, or NULL on success */
477 const char*
dict_process_sys_foreign_rec(mem_heap_t * heap,const rec_t * rec,dict_foreign_t * foreign)478 dict_process_sys_foreign_rec(
479 /*=========================*/
480 	mem_heap_t*	heap,		/*!< in/out: heap memory */
481 	const rec_t*	rec,		/*!< in: current SYS_FOREIGN rec */
482 	dict_foreign_t*	foreign)	/*!< out: dict_foreign_t struct
483 					to be filled */
484 {
485 	ulint		len;
486 	const byte*	field;
487 	ulint		n_fields_and_type;
488 
489 	if (rec_get_deleted_flag(rec, 0)) {
490 		return("delete-marked record in SYS_FOREIGN");
491 	}
492 
493 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN) {
494 		return("wrong number of columns in SYS_FOREIGN record");
495 	}
496 
497 	field = rec_get_nth_field_old(
498 		rec, DICT_FLD__SYS_FOREIGN__ID, &len);
499 	if (len == 0 || len == UNIV_SQL_NULL) {
500 err_len:
501 		return("incorrect column length in SYS_FOREIGN");
502 	}
503 
504 	/* This recieves a dict_foreign_t* that points to a stack variable.
505 	So mem_heap_free(foreign->heap) is not used as elsewhere.
506 	Since the heap used here is freed elsewhere, foreign->heap
507 	is not assigned. */
508 	foreign->id = mem_heap_strdupl(heap, (const char*) field, len);
509 
510 	rec_get_nth_field_offs_old(
511 		rec, DICT_FLD__SYS_FOREIGN__DB_TRX_ID, &len);
512 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
513 		goto err_len;
514 	}
515 	rec_get_nth_field_offs_old(
516 		rec, DICT_FLD__SYS_FOREIGN__DB_ROLL_PTR, &len);
517 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
518 		goto err_len;
519 	}
520 
521 	/* The _lookup versions of the referenced and foreign table names
522 	 are not assigned since they are not used in this dict_foreign_t */
523 
524 	field = rec_get_nth_field_old(
525 		rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
526 	if (len == 0 || len == UNIV_SQL_NULL) {
527 		goto err_len;
528 	}
529 	foreign->foreign_table_name = mem_heap_strdupl(
530 		heap, (const char*) field, len);
531 
532 	field = rec_get_nth_field_old(
533 		rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
534 	if (len == 0 || len == UNIV_SQL_NULL) {
535 		goto err_len;
536 	}
537 	foreign->referenced_table_name = mem_heap_strdupl(
538 		heap, (const char*) field, len);
539 
540 	field = rec_get_nth_field_old(
541 		rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len);
542 	if (len != 4) {
543 		goto err_len;
544 	}
545 	n_fields_and_type = mach_read_from_4(field);
546 
547 	foreign->type = (unsigned int) (n_fields_and_type >> 24);
548 	foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL);
549 
550 	return(NULL);
551 }
552 
553 /********************************************************************//**
554 This function parses a SYS_FOREIGN_COLS record and extract necessary
555 information from the record and return to caller.
556 @return error message, or NULL on success */
557 const char*
dict_process_sys_foreign_col_rec(mem_heap_t * heap,const rec_t * rec,const char ** name,const char ** for_col_name,const char ** ref_col_name,ulint * pos)558 dict_process_sys_foreign_col_rec(
559 /*=============================*/
560 	mem_heap_t*	heap,		/*!< in/out: heap memory */
561 	const rec_t*	rec,		/*!< in: current SYS_FOREIGN_COLS rec */
562 	const char**	name,		/*!< out: foreign key constraint name */
563 	const char**	for_col_name,	/*!< out: referencing column name */
564 	const char**	ref_col_name,	/*!< out: referenced column name
565 					in referenced table */
566 	ulint*		pos)		/*!< out: column position */
567 {
568 	ulint		len;
569 	const byte*	field;
570 
571 	if (rec_get_deleted_flag(rec, 0)) {
572 		return("delete-marked record in SYS_FOREIGN_COLS");
573 	}
574 
575 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN_COLS) {
576 		return("wrong number of columns in SYS_FOREIGN_COLS record");
577 	}
578 
579 	field = rec_get_nth_field_old(
580 		rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
581 	if (len == 0 || len == UNIV_SQL_NULL) {
582 err_len:
583 		return("incorrect column length in SYS_FOREIGN_COLS");
584 	}
585 	*name = mem_heap_strdupl(heap, (char*) field, len);
586 
587 	field = rec_get_nth_field_old(
588 		rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
589 	if (len != 4) {
590 		goto err_len;
591 	}
592 	*pos = mach_read_from_4(field);
593 
594 	rec_get_nth_field_offs_old(
595 		rec, DICT_FLD__SYS_FOREIGN_COLS__DB_TRX_ID, &len);
596 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
597 		goto err_len;
598 	}
599 	rec_get_nth_field_offs_old(
600 		rec, DICT_FLD__SYS_FOREIGN_COLS__DB_ROLL_PTR, &len);
601 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
602 		goto err_len;
603 	}
604 
605 	field = rec_get_nth_field_old(
606 		rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
607 	if (len == 0 || len == UNIV_SQL_NULL) {
608 		goto err_len;
609 	}
610 	*for_col_name = mem_heap_strdupl(heap, (char*) field, len);
611 
612 	field = rec_get_nth_field_old(
613 		rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
614 	if (len == 0 || len == UNIV_SQL_NULL) {
615 		goto err_len;
616 	}
617 	*ref_col_name = mem_heap_strdupl(heap, (char*) field, len);
618 
619 	return(NULL);
620 }
621 
622 /********************************************************************//**
623 This function parses a SYS_TABLESPACES record, extracts necessary
624 information from the record and returns to caller.
625 @return error message, or NULL on success */
626 const char*
dict_process_sys_tablespaces(mem_heap_t * heap,const rec_t * rec,ulint * space,const char ** name,ulint * flags)627 dict_process_sys_tablespaces(
628 /*=========================*/
629 	mem_heap_t*	heap,		/*!< in/out: heap memory */
630 	const rec_t*	rec,		/*!< in: current SYS_TABLESPACES rec */
631 	ulint*		space,		/*!< out: space id */
632 	const char**	name,		/*!< out: tablespace name */
633 	ulint*		flags)		/*!< out: tablespace flags */
634 {
635 	ulint		len;
636 	const byte*	field;
637 
638 	/* Initialize the output values */
639 	*space = ULINT_UNDEFINED;
640 	*name = NULL;
641 	*flags = ULINT_UNDEFINED;
642 
643 	if (rec_get_deleted_flag(rec, 0)) {
644 		return("delete-marked record in SYS_TABLESPACES");
645 	}
646 
647 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLESPACES) {
648 		return("wrong number of columns in SYS_TABLESPACES record");
649 	}
650 
651 	field = rec_get_nth_field_old(
652 		rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
653 	if (len != DICT_FLD_LEN_SPACE) {
654 err_len:
655 		return("incorrect column length in SYS_TABLESPACES");
656 	}
657 	*space = mach_read_from_4(field);
658 
659 	rec_get_nth_field_offs_old(
660 		rec, DICT_FLD__SYS_TABLESPACES__DB_TRX_ID, &len);
661 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
662 		goto err_len;
663 	}
664 
665 	rec_get_nth_field_offs_old(
666 		rec, DICT_FLD__SYS_TABLESPACES__DB_ROLL_PTR, &len);
667 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
668 		goto err_len;
669 	}
670 
671 	field = rec_get_nth_field_old(
672 		rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
673 	if (len == 0 || len == UNIV_SQL_NULL) {
674 		goto err_len;
675 	}
676 	*name = mem_heap_strdupl(heap, (char*) field, len);
677 
678 	field = rec_get_nth_field_old(
679 		rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
680 	if (len != DICT_FLD_LEN_FLAGS) {
681 		goto err_len;
682 	}
683 	*flags = mach_read_from_4(field);
684 
685 	return(NULL);
686 }
687 
688 /********************************************************************//**
689 This function parses a SYS_DATAFILES record, extracts necessary
690 information from the record and returns it to the caller.
691 @return error message, or NULL on success */
692 const char*
dict_process_sys_datafiles(mem_heap_t * heap,const rec_t * rec,ulint * space,const char ** path)693 dict_process_sys_datafiles(
694 /*=======================*/
695 	mem_heap_t*	heap,		/*!< in/out: heap memory */
696 	const rec_t*	rec,		/*!< in: current SYS_DATAFILES rec */
697 	ulint*		space,		/*!< out: space id */
698 	const char**	path)		/*!< out: datafile paths */
699 {
700 	ulint		len;
701 	const byte*	field;
702 
703 	if (rec_get_deleted_flag(rec, 0)) {
704 		return("delete-marked record in SYS_DATAFILES");
705 	}
706 
707 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_DATAFILES) {
708 		return("wrong number of columns in SYS_DATAFILES record");
709 	}
710 
711 	field = rec_get_nth_field_old(
712 		rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
713 	if (len != DICT_FLD_LEN_SPACE) {
714 err_len:
715 		return("incorrect column length in SYS_DATAFILES");
716 	}
717 	*space = mach_read_from_4(field);
718 
719 	rec_get_nth_field_offs_old(
720 		rec, DICT_FLD__SYS_DATAFILES__DB_TRX_ID, &len);
721 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
722 		goto err_len;
723 	}
724 
725 	rec_get_nth_field_offs_old(
726 		rec, DICT_FLD__SYS_DATAFILES__DB_ROLL_PTR, &len);
727 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
728 		goto err_len;
729 	}
730 
731 	field = rec_get_nth_field_old(
732 		rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
733 	if (len == 0 || len == UNIV_SQL_NULL) {
734 		goto err_len;
735 	}
736 	*path = mem_heap_strdupl(heap, (char*) field, len);
737 
738 	return(NULL);
739 }
740 
741 /** This function parses a SYS_ZIP_DICT record, extracts necessary
742 information from the record and returns to caller.
743 @return error message, or NULL on success */
744 const char*
dict_process_sys_zip_dict(mem_heap_t * heap,const page_size_t & page_size,const rec_t * rec,ulint * id,const char ** name,const char ** data,ulint * data_len)745 dict_process_sys_zip_dict(
746 	mem_heap_t*	heap,		/*!< in/out: heap memory */
747 	const page_size_t&
748 			page_size,	/*!< in: BLOB page size */
749 	const rec_t*	rec,		/*!< in: current SYS_ZIP_DICT rec */
750 	ulint*		id,		/*!< out: dict id */
751 	const char**	name,		/*!< out: dict name */
752 	const char**	data,		/*!< out: dict data */
753 	ulint*		data_len)	/*!< out: dict data length */
754 {
755 	ulint		len;
756 	const byte*	field;
757 
758 	/* Initialize the output values */
759 	*id = ULINT_UNDEFINED;
760 	*name = NULL;
761 	*data = NULL;
762 	*data_len = 0;
763 
764 	if (UNIV_UNLIKELY(rec_get_deleted_flag(rec, 0))) {
765 		return("delete-marked record in SYS_ZIP_DICT");
766 	}
767 
768 	if (UNIV_UNLIKELY(
769 		rec_get_n_fields_old(rec)!= DICT_NUM_FIELDS__SYS_ZIP_DICT)) {
770 		return("wrong number of columns in SYS_ZIP_DICT record");
771 	}
772 
773 	field = rec_get_nth_field_old(
774 		rec, DICT_FLD__SYS_ZIP_DICT__ID, &len);
775 	if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
776 		goto err_len;
777 	}
778 	*id = mach_read_from_4(field);
779 
780 	rec_get_nth_field_offs_old(
781 		rec, DICT_FLD__SYS_ZIP_DICT__DB_TRX_ID, &len);
782 	if (UNIV_UNLIKELY(len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL)) {
783 		goto err_len;
784 	}
785 
786 	rec_get_nth_field_offs_old(
787 		rec, DICT_FLD__SYS_ZIP_DICT__DB_ROLL_PTR, &len);
788 	if (UNIV_UNLIKELY(len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL)) {
789 		goto err_len;
790 	}
791 
792 	field = rec_get_nth_field_old(
793 		rec, DICT_FLD__SYS_ZIP_DICT__NAME, &len);
794 	if (UNIV_UNLIKELY(len == 0 || len == UNIV_SQL_NULL)) {
795 		goto err_len;
796 	}
797 	*name = mem_heap_strdupl(heap, (char*) field, len);
798 
799 	field = rec_get_nth_field_old(
800 		rec, DICT_FLD__SYS_ZIP_DICT__DATA, &len);
801 	if (UNIV_UNLIKELY(len == UNIV_SQL_NULL)) {
802 		goto err_len;
803 	}
804 
805 	if (rec_get_1byte_offs_flag(rec) == 0 &&
806 		rec_2_is_field_extern(rec, DICT_FLD__SYS_ZIP_DICT__DATA)) {
807 		ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
808 
809 		if (UNIV_UNLIKELY
810 			(!memcmp(field + len - BTR_EXTERN_FIELD_REF_SIZE,
811 				field_ref_zero,
812 				BTR_EXTERN_FIELD_REF_SIZE))) {
813 			goto err_len;
814 		}
815 		*data = reinterpret_cast<char*>(
816 			btr_copy_externally_stored_field(data_len, field,
817 				page_size, len, heap));
818 	}
819 	else {
820 		*data_len = len;
821 		*data = static_cast<char*>(mem_heap_dup(heap, field, len));
822 	}
823 
824 	return(NULL);
825 
826 err_len:
827 	return("incorrect column length in SYS_ZIP_DICT");
828 }
829 
830 /** This function parses a SYS_ZIP_DICT_COLS record, extracts necessary
831 information from the record and returns to caller.
832 @return error message, or NULL on success */
833 const char*
dict_process_sys_zip_dict_cols(mem_heap_t * heap,const rec_t * rec,table_id_t * table_id,ulint * column_pos,ulint * dict_id)834 dict_process_sys_zip_dict_cols(
835 	mem_heap_t*	heap,		/*!< in/out: heap memory */
836 	const rec_t*	rec,		/*!< in: current SYS_ZIP_DICT rec */
837 	table_id_t*	table_id,	/*!< out: table id */
838 	ulint*		column_pos,	/*!< out: column position */
839 	ulint*		dict_id)	/*!< out: dict id */
840 {
841 	ulint		len;
842 	const byte*	field;
843 
844 	/* Initialize the output values */
845 	*table_id = ULINT_UNDEFINED;
846 	*column_pos = ULINT_UNDEFINED;
847 	*dict_id = ULINT_UNDEFINED;
848 
849 	if (UNIV_UNLIKELY(rec_get_deleted_flag(rec, 0))) {
850 		return("delete-marked record in SYS_ZIP_DICT_COLS");
851 	}
852 
853 	if (UNIV_UNLIKELY(rec_get_n_fields_old(rec) !=
854 		DICT_NUM_FIELDS__SYS_ZIP_DICT_COLS)) {
855 		return("wrong number of columns in SYS_ZIP_DICT_COLS"
856 			" record");
857 	}
858 
859 	field = rec_get_nth_field_old(
860 		rec, DICT_FLD__SYS_ZIP_DICT_COLS__TABLE_ID, &len);
861 	if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
862 err_len:
863 		return("incorrect column length in SYS_ZIP_DICT_COLS");
864 	}
865 	*table_id = mach_read_from_4(field);
866 
867 	field = rec_get_nth_field_old(
868 		rec, DICT_FLD__SYS_ZIP_DICT_COLS__COLUMN_POS, &len);
869 	if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
870 		goto err_len;
871 	}
872 	*column_pos = mach_read_from_4(field);
873 
874 	rec_get_nth_field_offs_old(
875 		rec, DICT_FLD__SYS_ZIP_DICT_COLS__DB_TRX_ID, &len);
876 	if (UNIV_UNLIKELY(len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL)) {
877 		goto err_len;
878 	}
879 
880 	rec_get_nth_field_offs_old(
881 		rec, DICT_FLD__SYS_ZIP_DICT_COLS__DB_ROLL_PTR, &len);
882 	if (UNIV_UNLIKELY(len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL)) {
883 		goto err_len;
884 	}
885 
886 	field = rec_get_nth_field_old(
887 		rec, DICT_FLD__SYS_ZIP_DICT_COLS__DICT_ID, &len);
888 	if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
889 		goto err_len;
890 	}
891 	*dict_id = mach_read_from_4(field);
892 
893 	return(NULL);
894 }
895 
896 /** Get the first filepath from SYS_DATAFILES for a given space_id.
897 @param[in]	space_id	Tablespace ID
898 @return First filepath (caller must invoke ut_free() on it)
899 @retval NULL if no SYS_DATAFILES entry was found. */
900 char*
dict_get_first_path(ulint space_id)901 dict_get_first_path(
902 	ulint	space_id)
903 {
904 	mtr_t		mtr;
905 	dict_table_t*	sys_datafiles;
906 	dict_index_t*	sys_index;
907 	dtuple_t*	tuple;
908 	dfield_t*	dfield;
909 	byte*		buf;
910 	btr_pcur_t	pcur;
911 	const rec_t*	rec;
912 	const byte*	field;
913 	ulint		len;
914 	char*		filepath = NULL;
915 	mem_heap_t*	heap = mem_heap_create(1024);
916 
917 	ut_ad(mutex_own(&dict_sys->mutex));
918 
919 	mtr_start(&mtr);
920 
921 	sys_datafiles = dict_table_get_low("SYS_DATAFILES");
922 	sys_index = UT_LIST_GET_FIRST(sys_datafiles->indexes);
923 
924 	ut_ad(!dict_table_is_comp(sys_datafiles));
925 	ut_ad(name_of_col_is(sys_datafiles, sys_index,
926 			     DICT_FLD__SYS_DATAFILES__SPACE, "SPACE"));
927 	ut_ad(name_of_col_is(sys_datafiles, sys_index,
928 			     DICT_FLD__SYS_DATAFILES__PATH, "PATH"));
929 
930 	tuple = dtuple_create(heap, 1);
931 	dfield = dtuple_get_nth_field(tuple, DICT_FLD__SYS_DATAFILES__SPACE);
932 
933 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
934 	mach_write_to_4(buf, space_id);
935 
936 	dfield_set_data(dfield, buf, 4);
937 	dict_index_copy_types(tuple, sys_index, 1);
938 
939 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
940 				  BTR_SEARCH_LEAF, &pcur, &mtr);
941 
942 	rec = btr_pcur_get_rec(&pcur);
943 
944 	/* Get the filepath from this SYS_DATAFILES record. */
945 	if (btr_pcur_is_on_user_rec(&pcur)) {
946 		field = rec_get_nth_field_old(
947 			rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
948 		ut_a(len == 4);
949 
950 		if (space_id == mach_read_from_4(field)) {
951 			/* A record for this space ID was found. */
952 			field = rec_get_nth_field_old(
953 				rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
954 
955 			ut_ad(len > 0);
956 			ut_ad(len < OS_FILE_MAX_PATH);
957 
958 			if (len > 0 && len != UNIV_SQL_NULL) {
959 				filepath = mem_strdupl(
960 					reinterpret_cast<const char*>(field),
961 					len);
962 				ut_ad(filepath != NULL);
963 
964 				/* The dictionary may have been written on
965 				another OS. */
966 				os_normalize_path(filepath);
967 			}
968 		}
969 	}
970 
971 	btr_pcur_close(&pcur);
972 	mtr_commit(&mtr);
973 	mem_heap_free(heap);
974 
975 	return(filepath);
976 }
977 
978 /** Gets the space name from SYS_TABLESPACES for a given space ID.
979 @param[in]	space_id	Tablespace ID
980 @param[in]	callers_heap	A heap to allocate from, may be NULL
981 @return Tablespace name (caller is responsible to free it)
982 @retval NULL if no dictionary entry was found. */
983 static
984 char*
dict_space_get_name(ulint space_id,mem_heap_t * callers_heap)985 dict_space_get_name(
986 	ulint		space_id,
987 	mem_heap_t*	callers_heap)
988 {
989 	mtr_t		mtr;
990 	dict_table_t*	sys_tablespaces;
991 	dict_index_t*	sys_index;
992 	dtuple_t*	tuple;
993 	dfield_t*	dfield;
994 	byte*		buf;
995 	btr_pcur_t	pcur;
996 	const rec_t*	rec;
997 	const byte*	field;
998 	ulint		len;
999 	char*		space_name = NULL;
1000 	mem_heap_t*	heap = mem_heap_create(1024);
1001 
1002 	ut_ad(mutex_own(&dict_sys->mutex));
1003 
1004 	sys_tablespaces = dict_table_get_low("SYS_TABLESPACES");
1005 	if (sys_tablespaces == NULL) {
1006 		ut_a(!srv_sys_tablespaces_open);
1007 		return(NULL);
1008 	}
1009 
1010 	sys_index = UT_LIST_GET_FIRST(sys_tablespaces->indexes);
1011 
1012 	ut_ad(!dict_table_is_comp(sys_tablespaces));
1013 	ut_ad(name_of_col_is(sys_tablespaces, sys_index,
1014 			     DICT_FLD__SYS_TABLESPACES__SPACE, "SPACE"));
1015 	ut_ad(name_of_col_is(sys_tablespaces, sys_index,
1016 			     DICT_FLD__SYS_TABLESPACES__NAME, "NAME"));
1017 
1018 	tuple = dtuple_create(heap, 1);
1019 	dfield = dtuple_get_nth_field(tuple, DICT_FLD__SYS_TABLESPACES__SPACE);
1020 
1021 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1022 	mach_write_to_4(buf, space_id);
1023 
1024 	dfield_set_data(dfield, buf, 4);
1025 	dict_index_copy_types(tuple, sys_index, 1);
1026 
1027 	mtr_start(&mtr);
1028 
1029 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
1030 				  BTR_SEARCH_LEAF, &pcur, &mtr);
1031 
1032 	rec = btr_pcur_get_rec(&pcur);
1033 
1034 	/* Get the tablespace name from this SYS_TABLESPACES record. */
1035 	if (btr_pcur_is_on_user_rec(&pcur)) {
1036 		field = rec_get_nth_field_old(
1037 			rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
1038 		ut_a(len == 4);
1039 
1040 		if (space_id == mach_read_from_4(field)) {
1041 			/* A record for this space ID was found. */
1042 			field = rec_get_nth_field_old(
1043 				rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
1044 
1045 			ut_ad(len > 0);
1046 			ut_ad(len < OS_FILE_MAX_PATH);
1047 
1048 			if (len > 0 && len != UNIV_SQL_NULL) {
1049 				/* Found a tablespace name. */
1050 				if (callers_heap == NULL) {
1051 					space_name = mem_strdupl(
1052 						reinterpret_cast<
1053 							const char*>(field),
1054 						len);
1055 				} else {
1056 					space_name = mem_heap_strdupl(
1057 						callers_heap,
1058 						reinterpret_cast<
1059 							const char*>(field),
1060 						len);
1061 				}
1062 				ut_ad(space_name);
1063 			}
1064 		}
1065 	}
1066 
1067 	btr_pcur_close(&pcur);
1068 	mtr_commit(&mtr);
1069 	mem_heap_free(heap);
1070 
1071 	return(space_name);
1072 }
1073 
1074 /** Update the record for space_id in SYS_TABLESPACES to this filepath.
1075 @param[in]	space_id	Tablespace ID
1076 @param[in]	filepath	Tablespace filepath
1077 @return DB_SUCCESS if OK, dberr_t if the insert failed */
1078 dberr_t
dict_update_filepath(ulint space_id,const char * filepath)1079 dict_update_filepath(
1080 	ulint		space_id,
1081 	const char*	filepath)
1082 {
1083 	if (!srv_sys_tablespaces_open) {
1084 		/* Startup procedure is not yet ready for updates. */
1085 		return(DB_SUCCESS);
1086 	}
1087 
1088 	dberr_t		err = DB_SUCCESS;
1089 	trx_t*		trx;
1090 
1091 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
1092 	ut_ad(mutex_own(&dict_sys->mutex));
1093 
1094 	trx = trx_allocate_for_background();
1095 	trx->op_info = "update filepath";
1096 	trx->dict_operation_lock_mode = RW_X_LATCH;
1097 	trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
1098 
1099 	pars_info_t*	info = pars_info_create();
1100 
1101 	pars_info_add_int4_literal(info, "space", space_id);
1102 	pars_info_add_str_literal(info, "path", filepath);
1103 
1104 	err = que_eval_sql(info,
1105 			   "PROCEDURE UPDATE_FILEPATH () IS\n"
1106 			   "BEGIN\n"
1107 			   "UPDATE SYS_DATAFILES"
1108 			   " SET PATH = :path\n"
1109 			   " WHERE SPACE = :space;\n"
1110 			   "END;\n", FALSE, trx);
1111 
1112 	trx_commit_for_mysql(trx);
1113 	trx->dict_operation_lock_mode = 0;
1114 	trx_free_for_background(trx);
1115 
1116 	if (err == DB_SUCCESS) {
1117 		/* We just updated SYS_DATAFILES due to the contents in
1118 		a link file.  Make a note that we did this. */
1119 		ib::info() << "The InnoDB data dictionary table SYS_DATAFILES"
1120 			" for tablespace ID " << space_id
1121 			<< " was updated to use file " << filepath << ".";
1122 	} else {
1123 		ib::warn() << "Error occurred while updating InnoDB data"
1124 			" dictionary table SYS_DATAFILES for tablespace ID "
1125 			<< space_id << " to file " << filepath << ": "
1126 			<< ut_strerr(err) << ".";
1127 	}
1128 
1129 	return(err);
1130 }
1131 
1132 /** Replace records in SYS_TABLESPACES and SYS_DATAFILES associated with
1133 the given space_id using an independent transaction.
1134 @param[in]	space_id	Tablespace ID
1135 @param[in]	name		Tablespace name
1136 @param[in]	filepath	First filepath
1137 @param[in]	fsp_flags	Tablespace flags
1138 @return DB_SUCCESS if OK, dberr_t if the insert failed */
1139 dberr_t
dict_replace_tablespace_and_filepath(ulint space_id,const char * name,const char * filepath,ulint fsp_flags)1140 dict_replace_tablespace_and_filepath(
1141 	ulint		space_id,
1142 	const char*	name,
1143 	const char*	filepath,
1144 	ulint		fsp_flags)
1145 {
1146 	if (!srv_sys_tablespaces_open) {
1147 		/* Startup procedure is not yet ready for updates.
1148 		Return success since this will likely get updated
1149 		later. */
1150 		return(DB_SUCCESS);
1151 	}
1152 
1153 	dberr_t		err = DB_SUCCESS;
1154 	trx_t*		trx;
1155 
1156 	DBUG_EXECUTE_IF("innodb_fail_to_update_tablespace_dict",
1157 			return(DB_INTERRUPTED););
1158 
1159 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
1160 	ut_ad(mutex_own(&dict_sys->mutex));
1161 	ut_ad(filepath);
1162 
1163 	trx = trx_allocate_for_background();
1164 	trx->op_info = "insert tablespace and filepath";
1165 	trx->dict_operation_lock_mode = RW_X_LATCH;
1166 	trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
1167 
1168 	/* A record for this space ID was not found in
1169 	SYS_DATAFILES. Assume the record is also missing in
1170 	SYS_TABLESPACES.  Insert records into them both. */
1171 	err = dict_replace_tablespace_in_dictionary(
1172 		space_id, name, fsp_flags, filepath, trx, false);
1173 
1174 	trx_commit_for_mysql(trx);
1175 	trx->dict_operation_lock_mode = 0;
1176 	trx_free_for_background(trx);
1177 
1178 	return(err);
1179 }
1180 
1181 /** Check the validity of a SYS_TABLES record
1182 Make sure the fields are the right length and that they
1183 do not contain invalid contents.
1184 @param[in]	rec	SYS_TABLES record
1185 @return error message, or NULL on success */
1186 static
1187 const char*
dict_sys_tables_rec_check(const rec_t * rec)1188 dict_sys_tables_rec_check(
1189 	const rec_t*	rec)
1190 {
1191 	const byte*	field;
1192 	ulint		len;
1193 
1194 	ut_ad(mutex_own(&dict_sys->mutex));
1195 
1196 	if (rec_get_deleted_flag(rec, 0)) {
1197 		return("delete-marked record in SYS_TABLES");
1198 	}
1199 
1200 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLES) {
1201 		return("wrong number of columns in SYS_TABLES record");
1202 	}
1203 
1204 	rec_get_nth_field_offs_old(
1205 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
1206 	if (len == 0 || len == UNIV_SQL_NULL) {
1207 err_len:
1208 		return("incorrect column length in SYS_TABLES");
1209 	}
1210 	rec_get_nth_field_offs_old(
1211 		rec, DICT_FLD__SYS_TABLES__DB_TRX_ID, &len);
1212 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1213 		goto err_len;
1214 	}
1215 	rec_get_nth_field_offs_old(
1216 		rec, DICT_FLD__SYS_TABLES__DB_ROLL_PTR, &len);
1217 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1218 		goto err_len;
1219 	}
1220 
1221 	rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__ID, &len);
1222 	if (len != 8) {
1223 		goto err_len;
1224 	}
1225 
1226 	field = rec_get_nth_field_old(
1227 		rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1228 	if (field == NULL || len != 4) {
1229 		goto err_len;
1230 	}
1231 
1232 	rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1233 	if (len != 4) {
1234 		goto err_len;
1235 	}
1236 
1237 	rec_get_nth_field_offs_old(
1238 		rec, DICT_FLD__SYS_TABLES__MIX_ID, &len);
1239 	if (len != 8) {
1240 		goto err_len;
1241 	}
1242 
1243 	field = rec_get_nth_field_old(
1244 		rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1245 	if (field == NULL || len != 4) {
1246 		goto err_len;
1247 	}
1248 
1249 	rec_get_nth_field_offs_old(
1250 		rec, DICT_FLD__SYS_TABLES__CLUSTER_ID, &len);
1251 	if (len != UNIV_SQL_NULL) {
1252 		goto err_len;
1253 	}
1254 
1255 	field = rec_get_nth_field_old(
1256 		rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1257 	if (field == NULL || len != 4) {
1258 		goto err_len;
1259 	}
1260 
1261 	return(NULL);
1262 }
1263 
1264 /** Read and return the contents of a SYS_TABLESPACES record.
1265 @param[in]	rec	A record of SYS_TABLESPACES
1266 @param[out]	id	Pointer to the space_id for this table
1267 @param[in,out]	name	Buffer for Tablespace Name of length NAME_LEN
1268 @param[out]	flags	Pointer to tablespace flags
1269 @return true if the record was read correctly, false if not. */
1270 bool
dict_sys_tablespaces_rec_read(const rec_t * rec,ulint * id,char * name,ulint * flags)1271 dict_sys_tablespaces_rec_read(
1272 	const rec_t*	rec,
1273 	ulint*		id,
1274 	char*		name,
1275 	ulint*		flags)
1276 {
1277 	const byte*	field;
1278 	ulint		len;
1279 
1280 	field = rec_get_nth_field_old(
1281 		rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
1282 	if (len != DICT_FLD_LEN_SPACE) {
1283 		ib::error() << "Wrong field length in SYS_TABLESPACES.SPACE: "
1284 		<< len;
1285 		return(false);
1286 	}
1287 	*id = mach_read_from_4(field);
1288 
1289 	field = rec_get_nth_field_old(
1290 		rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
1291 	if (len == 0 || len == UNIV_SQL_NULL) {
1292 		ib::error() << "Wrong field length in SYS_TABLESPACES.NAME: "
1293 			<< len;
1294 		return(false);
1295 	}
1296 	strncpy(name, reinterpret_cast<const char*>(field), NAME_LEN);
1297 
1298 	/* read the 4 byte flags from the TYPE field */
1299 	field = rec_get_nth_field_old(
1300 		rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
1301 	if (len != 4) {
1302 		ib::error() << "Wrong field length in SYS_TABLESPACES.FLAGS: "
1303 			<< len;
1304 		return(false);
1305 	}
1306 	*flags = mach_read_from_4(field);
1307 
1308 	return(true);
1309 }
1310 
1311 /** Load and check each general tablespace mentioned in the SYS_TABLESPACES.
1312 Ignore system and file-per-table tablespaces.
1313 If it is valid, add it to the file_system list.
1314 @param[in]	validate	true when the previous shutdown was not clean
1315 @return the highest space ID found. */
1316 UNIV_INLINE
1317 ulint
dict_check_sys_tablespaces(bool validate)1318 dict_check_sys_tablespaces(
1319 	bool		validate)
1320 {
1321 	ulint		max_space_id = 0;
1322 	btr_pcur_t	pcur;
1323 	const rec_t*	rec;
1324 	mtr_t		mtr;
1325 
1326 	DBUG_ENTER("dict_check_sys_tablespaces");
1327 
1328 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
1329 	ut_ad(mutex_own(&dict_sys->mutex));
1330 
1331 	/* Before traversing it, let's make sure we have
1332 	SYS_TABLESPACES and SYS_DATAFILES loaded. */
1333 	dict_table_get_low("SYS_TABLESPACES");
1334 	dict_table_get_low("SYS_DATAFILES");
1335 
1336 	mtr_start(&mtr);
1337 
1338 	for (rec = dict_startscan_system(&pcur, &mtr, SYS_TABLESPACES);
1339 	     rec != NULL;
1340 	     rec = dict_getnext_system(&pcur, &mtr))
1341 	{
1342 		char	space_name[NAME_LEN];
1343 		ulint	space_id = 0;
1344 		ulint	fsp_flags;
1345 
1346 		if (!dict_sys_tablespaces_rec_read(rec, &space_id,
1347 						   space_name, &fsp_flags)) {
1348 			continue;
1349 		}
1350 
1351 		/* Ignore system and file-per-table tablespaces. */
1352 		if (is_system_tablespace(space_id)
1353 		    || !fsp_is_shared_tablespace(fsp_flags)) {
1354 			continue;
1355 		}
1356 
1357 		/* Ignore tablespaces that already are in the tablespace
1358 		cache. */
1359 		if (fil_space_for_table_exists_in_mem(
1360 				space_id, space_name, false, true, NULL, 0)) {
1361 			/* Recovery can open a datafile that does not
1362 			match SYS_DATAFILES.  If they don't match, update
1363 			SYS_DATAFILES. */
1364 			char *dict_path = dict_get_first_path(space_id);
1365 			char *fil_path = fil_space_get_first_path(space_id);
1366 			if (dict_path && fil_path
1367 			    && strcmp(dict_path, fil_path)) {
1368 				dict_update_filepath(space_id, fil_path);
1369 			}
1370 			ut_free(dict_path);
1371 			ut_free(fil_path);
1372 			continue;
1373 		}
1374 
1375 		/* Set the expected filepath from the data dictionary.
1376 		If the file is found elsewhere (from an ISL or the default
1377 		location) or this path is the same file but looks different,
1378 		fil_ibd_open() will update the dictionary with what is
1379 		opened. */
1380 		char*	filepath = dict_get_first_path(space_id);
1381 
1382 		Keyring_encryption_info keyring_encryption_info;
1383 
1384 		/* Check that the .ibd file exists. */
1385 		dberr_t	err = fil_ibd_open(
1386 			validate,
1387 			!srv_read_only_mode && srv_log_file_size != 0,
1388 			FIL_TYPE_TABLESPACE,
1389 			space_id,
1390 			fsp_flags,
1391 			space_name,
1392 			filepath,
1393                         keyring_encryption_info);
1394 
1395 		if (err != DB_SUCCESS) {
1396 			ib::warn() << "Ignoring tablespace "
1397 				<< id_name_t(space_name)
1398 				<< " because it could not be opened.";
1399 		}
1400 
1401 		max_space_id = ut_max(max_space_id, space_id);
1402 
1403 		ut_free(filepath);
1404 	}
1405 
1406 	mtr_commit(&mtr);
1407 
1408 	DBUG_RETURN(max_space_id);
1409 }
1410 
1411 /** Read and return 5 integer fields from a SYS_TABLES record.
1412 @param[in]	rec		A record of SYS_TABLES
1413 @param[in]	name		Table Name, the same as SYS_TABLES.NAME
1414 @param[out]	table_id	Pointer to the table_id for this table
1415 @param[out]	space_id	Pointer to the space_id for this table
1416 @param[out]	n_cols		Pointer to number of columns for this table.
1417 @param[out]	flags		Pointer to table flags
1418 @param[out]	flags2		Pointer to table flags2
1419 @return true if the record was read correctly, false if not. */
1420 static
1421 bool
dict_sys_tables_rec_read(const rec_t * rec,const table_name_t & table_name,table_id_t * table_id,ulint * space_id,ulint * n_cols,ulint * flags,ulint * flags2)1422 dict_sys_tables_rec_read(
1423 	const rec_t*		rec,
1424 	const table_name_t&	table_name,
1425 	table_id_t*		table_id,
1426 	ulint*			space_id,
1427 	ulint*			n_cols,
1428 	ulint*			flags,
1429 	ulint*			flags2)
1430 {
1431 	const byte*	field;
1432 	ulint		len;
1433 	ulint		type;
1434 
1435 	*flags2 = 0;
1436 
1437 	field = rec_get_nth_field_old(
1438 		rec, DICT_FLD__SYS_TABLES__ID, &len);
1439 	ut_ad(len == 8);
1440 	*table_id = static_cast<table_id_t>(mach_read_from_8(field));
1441 
1442 	field = rec_get_nth_field_old(
1443 		rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1444 	ut_ad(len == 4);
1445 	*space_id = mach_read_from_4(field);
1446 
1447 	/* Read the 4 byte flags from the TYPE field */
1448 	field = rec_get_nth_field_old(
1449 		rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1450 	ut_a(len == 4);
1451 	type = mach_read_from_4(field);
1452 
1453 	/* The low order bit of SYS_TABLES.TYPE is always set to 1. But in
1454 	dict_table_t::flags the low order bit is used to determine if the
1455 	row format is Redundant (0) or Compact (1) when the format is Antelope.
1456 	Read the 4 byte N_COLS field and look at the high order bit.  It
1457 	should be set for COMPACT and later.  It should not be set for
1458 	REDUNDANT. */
1459 	field = rec_get_nth_field_old(
1460 		rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1461 	ut_a(len == 4);
1462 	*n_cols = mach_read_from_4(field);
1463 
1464 	/* This validation function also combines the DICT_N_COLS_COMPACT
1465 	flag in n_cols into the type field to effectively make it a
1466 	dict_table_t::flags. */
1467 
1468 	if (ULINT_UNDEFINED == dict_sys_tables_type_validate(type, *n_cols)) {
1469 		ib::error() << "Table " << table_name << " in InnoDB"
1470 			" data dictionary contains invalid flags."
1471 			" SYS_TABLES.TYPE=" << type <<
1472 			" SYS_TABLES.N_COLS=" << *n_cols;
1473 		*flags = ULINT_UNDEFINED;
1474 		return(false);
1475 	}
1476 
1477 	*flags = dict_sys_tables_type_to_tf(type, *n_cols);
1478 
1479 	/* Get flags2 from SYS_TABLES.MIX_LEN */
1480 	field = rec_get_nth_field_old(
1481 		rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1482 	*flags2 = mach_read_from_4(field);
1483 
1484 	/* DICT_TF2_FTS will be set when indexes are being loaded */
1485 	*flags2 &= ~DICT_TF2_FTS;
1486 
1487 	/* Now that we have used this bit, unset it. */
1488 	*n_cols &= ~DICT_N_COLS_COMPACT;
1489 
1490 	return(true);
1491 }
1492 
1493 /** Load and check each non-predefined tablespace mentioned in SYS_TABLES.
1494 Search SYS_TABLES and check each tablespace mentioned that has not
1495 already been added to the fil_system.  If it is valid, add it to the
1496 file_system list.  Perform extra validation on the table if recovery from
1497 the REDO log occurred.
1498 @param[in]	validate	Whether to do validation on the table.
1499 @return the highest space ID found. */
1500 UNIV_INLINE
1501 ulint
dict_check_sys_tables(bool validate)1502 dict_check_sys_tables(
1503 	bool		validate)
1504 {
1505 	ulint		max_space_id = 0;
1506 	btr_pcur_t	pcur;
1507 	const rec_t*	rec;
1508 	mtr_t		mtr;
1509 
1510 	DBUG_ENTER("dict_check_sys_tables");
1511 
1512 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
1513 	ut_ad(mutex_own(&dict_sys->mutex));
1514 
1515 	mtr_start(&mtr);
1516 
1517 	/* Before traversing SYS_TABLES, let's make sure we have
1518 	SYS_TABLESPACES and SYS_DATAFILES loaded. */
1519 	dict_table_t*	sys_tablespaces;
1520 	dict_table_t*	sys_datafiles;
1521 	sys_tablespaces = dict_table_get_low("SYS_TABLESPACES");
1522 	ut_a(sys_tablespaces != NULL);
1523 	sys_datafiles = dict_table_get_low("SYS_DATAFILES");
1524 	ut_a(sys_datafiles != NULL);
1525 
1526 	for (rec = dict_startscan_system(&pcur, &mtr, SYS_TABLES);
1527 	     rec != NULL;
1528 	     rec = dict_getnext_system(&pcur, &mtr)) {
1529 		const byte*	field;
1530 		ulint		len;
1531 		char*		space_name;
1532 		table_name_t	table_name;
1533 		table_id_t	table_id;
1534 		ulint		space_id;
1535 		ulint		n_cols;
1536 		ulint		flags;
1537 		ulint		flags2;
1538 
1539 		/* If a table record is not useable, ignore it and continue
1540 		on to the next record. Error messages were logged. */
1541 		if (dict_sys_tables_rec_check(rec) != NULL) {
1542 			continue;
1543 		}
1544 
1545 		/* Copy the table name from rec */
1546 		field = rec_get_nth_field_old(
1547 			rec, DICT_FLD__SYS_TABLES__NAME, &len);
1548 		table_name.m_name = mem_strdupl((char*) field, len);
1549 		DBUG_PRINT("dict_check_sys_tables",
1550 			   ("name: %p, '%s'", table_name.m_name,
1551 			    table_name.m_name));
1552 
1553 		dict_sys_tables_rec_read(rec, table_name,
1554 					 &table_id, &space_id,
1555 					 &n_cols, &flags, &flags2);
1556 		if (flags == ULINT_UNDEFINED
1557 		    || is_system_tablespace(space_id)) {
1558 			ut_free(table_name.m_name);
1559 			continue;
1560 		}
1561 
1562 		if (flags2 & DICT_TF2_DISCARDED) {
1563 			ib::info() << "Ignoring tablespace " << table_name
1564 				<< " because the DISCARD flag is set .";
1565 			ut_free(table_name.m_name);
1566 			continue;
1567 		}
1568 
1569 		/* If the table is not a predefined tablespace then it must
1570 		be in a file-per-table or shared tablespace.
1571 		Note that flags2 is not available for REDUNDANT tables,
1572 		so don't check those. */
1573 		ut_ad(DICT_TF_HAS_SHARED_SPACE(flags)
1574 		      || !DICT_TF_GET_COMPACT(flags)
1575 		      || flags2 & DICT_TF2_USE_FILE_PER_TABLE);
1576 
1577 		/* Look up the tablespace name in the data dictionary if this
1578 		is a shared tablespace.  For file-per-table, the table_name
1579 		and the tablespace_name are the same.
1580 		Some hidden tables like FTS AUX tables may not be found in
1581 		the dictionary since they can always be found in the default
1582 		location. If so, then dict_space_get_name() will return NULL,
1583 		the space name must be the table_name, and the filepath can be
1584 		discovered in the default location.*/
1585 		char*	shared_space_name = dict_space_get_name(space_id, NULL);
1586 		space_name = shared_space_name == NULL
1587 			? table_name.m_name
1588 			: shared_space_name;
1589 
1590 		/* Now that we have the proper name for this tablespace,
1591 		whether it is a shared tablespace or a single table
1592 		tablespace, look to see if it is already in the tablespace
1593 		cache. */
1594 		if (fil_space_for_table_exists_in_mem(
1595 			    space_id, space_name, false, true, NULL, 0)) {
1596 			/* Recovery can open a datafile that does not
1597 			match SYS_DATAFILES.  If they don't match, update
1598 			SYS_DATAFILES. */
1599 			char *dict_path = dict_get_first_path(space_id);
1600 			char *fil_path = fil_space_get_first_path(space_id);
1601 			if (dict_path && fil_path
1602 			    && strcmp(dict_path, fil_path)) {
1603 				dict_update_filepath(space_id, fil_path);
1604 			}
1605 			ut_free(dict_path);
1606 			ut_free(fil_path);
1607 			ut_free(table_name.m_name);
1608 			ut_free(shared_space_name);
1609 			continue;
1610 		}
1611 
1612 		/* Set the expected filepath from the data dictionary.
1613 		If the file is found elsewhere (from an ISL or the default
1614 		location) or this path is the same file but looks different,
1615 		fil_ibd_open() will update the dictionary with what is
1616 		opened. */
1617 		char*	filepath = dict_get_first_path(space_id);
1618 
1619 		/* Check that the .ibd file exists. */
1620 		bool	is_temp = flags2 & DICT_TF2_TEMPORARY;
1621 		bool	is_encrypted = flags2 & DICT_TF2_ENCRYPTION;
1622 		ulint	fsp_flags = dict_tf_to_fsp_flags(flags,
1623 							 is_temp,
1624 							 is_encrypted);
1625 
1626 		Keyring_encryption_info keyring_encryption_info;
1627 
1628 		dberr_t	err = fil_ibd_open(
1629 			validate,
1630 			!srv_read_only_mode && srv_log_file_size != 0,
1631 			FIL_TYPE_TABLESPACE,
1632 			space_id,
1633 			fsp_flags,
1634 			space_name,
1635 			filepath,
1636 			keyring_encryption_info);
1637 
1638 		if (err != DB_SUCCESS) {
1639 			ib::warn() << "Ignoring tablespace "
1640 				<< id_name_t(space_name)
1641 				<< " because it could not be opened.";
1642 		}
1643 
1644 		max_space_id = ut_max(max_space_id, space_id);
1645 
1646 		ut_free(table_name.m_name);
1647 		ut_free(shared_space_name);
1648 		ut_free(filepath);
1649 	}
1650 
1651 	mtr_commit(&mtr);
1652 
1653 	DBUG_RETURN(max_space_id);
1654 }
1655 
1656 /** Check each tablespace found in the data dictionary.
1657 Look at each general tablespace found in SYS_TABLESPACES.
1658 Then look at each table defined in SYS_TABLES that has a space_id > 0
1659 to find all the file-per-table tablespaces.
1660 
1661 In a crash recovery we already have some tablespace objects created from
1662 processing the REDO log.  Any other tablespace in SYS_TABLESPACES not
1663 previously used in recovery will be opened here.  We will compare the
1664 space_id information in the data dictionary to what we find in the
1665 tablespace file. In addition, more validation will be done if recovery
1666 was needed and force_recovery is not set.
1667 
1668 We also scan the biggest space id, and store it to fil_system.
1669 @param[in]	validate	true if recovery was needed */
1670 void
dict_check_tablespaces_and_store_max_id(bool validate)1671 dict_check_tablespaces_and_store_max_id(
1672 	bool	validate)
1673 {
1674 	mtr_t	mtr;
1675 
1676 	DBUG_ENTER("dict_check_tablespaces_and_store_max_id");
1677 
1678 	rw_lock_x_lock(dict_operation_lock);
1679 	mutex_enter(&dict_sys->mutex);
1680 
1681 	/* Initialize the max space_id from sys header */
1682 	mtr_start(&mtr);
1683 	ulint	max_space_id = mtr_read_ulint(
1684 		dict_hdr_get(&mtr) + DICT_HDR_MAX_SPACE_ID,
1685 		MLOG_4BYTES, &mtr);
1686 	mtr_commit(&mtr);
1687 
1688 	fil_set_max_space_id_if_bigger(max_space_id);
1689 
1690 	/* Open all general tablespaces found in SYS_TABLESPACES. */
1691 	ulint	max1 = dict_check_sys_tablespaces(validate);
1692 
1693 	/* Open all tablespaces referenced in SYS_TABLES.
1694 	This will update SYS_TABLESPACES and SYS_DATAFILES if it
1695 	finds any file-per-table tablespaces not already there. */
1696 	ulint	max2 = dict_check_sys_tables(validate);
1697 
1698 	/* Store the max space_id found */
1699 	max_space_id = ut_max(max1, max2);
1700 	fil_set_max_space_id_if_bigger(max_space_id);
1701 
1702 	mutex_exit(&dict_sys->mutex);
1703 	rw_lock_x_unlock(dict_operation_lock);
1704 
1705 	DBUG_VOID_RETURN;
1706 }
1707 
1708 /** Error message for a delete-marked record in dict_load_column_low() */
1709 static const char* dict_load_column_del = "delete-marked record in SYS_COLUMN";
1710 
1711 /********************************************************************//**
1712 Loads a table column definition from a SYS_COLUMNS record to
1713 dict_table_t.
1714 @return error message, or NULL on success */
1715 const char*
dict_load_column_low(dict_table_t * table,mem_heap_t * heap,dict_col_t * column,table_id_t * table_id,const char ** col_name,const rec_t * rec,ulint * nth_v_col)1716 dict_load_column_low(
1717 /*=================*/
1718 	dict_table_t*	table,		/*!< in/out: table, could be NULL
1719 					if we just populate a dict_column_t
1720 					struct with information from
1721 					a SYS_COLUMNS record */
1722 	mem_heap_t*	heap,		/*!< in/out: memory heap
1723 					for temporary storage */
1724 	dict_col_t*	column,		/*!< out: dict_column_t to fill,
1725 					or NULL if table != NULL */
1726 	table_id_t*	table_id,	/*!< out: table id */
1727 	const char**	col_name,	/*!< out: column name */
1728 	const rec_t*	rec,		/*!< in: SYS_COLUMNS record */
1729 	ulint*		nth_v_col)	/*!< out: if not NULL, this
1730 					records the "n" of "nth" virtual
1731 					column */
1732 {
1733 	char*		name;
1734 	const byte*	field;
1735 	ulint		len;
1736 	ulint		mtype;
1737 	ulint		prtype;
1738 	ulint		col_len;
1739 	ulint		pos;
1740 	ulint		num_base;
1741 
1742 	ut_ad(table || column);
1743 
1744 	if (rec_get_deleted_flag(rec, 0)) {
1745 		return(dict_load_column_del);
1746 	}
1747 
1748 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_COLUMNS) {
1749 		return("wrong number of columns in SYS_COLUMNS record");
1750 	}
1751 
1752 	field = rec_get_nth_field_old(
1753 		rec, DICT_FLD__SYS_COLUMNS__TABLE_ID, &len);
1754 	if (len != 8) {
1755 err_len:
1756 		return("incorrect column length in SYS_COLUMNS");
1757 	}
1758 
1759 	if (table_id) {
1760 		*table_id = mach_read_from_8(field);
1761 	} else if (table->id != mach_read_from_8(field)) {
1762 		return("SYS_COLUMNS.TABLE_ID mismatch");
1763 	}
1764 
1765 	field = rec_get_nth_field_old(
1766 		rec, DICT_FLD__SYS_COLUMNS__POS, &len);
1767 	if (len != 4) {
1768 		goto err_len;
1769 	}
1770 
1771 	pos = mach_read_from_4(field);
1772 
1773 	rec_get_nth_field_offs_old(
1774 		rec, DICT_FLD__SYS_COLUMNS__DB_TRX_ID, &len);
1775 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1776 		goto err_len;
1777 	}
1778 	rec_get_nth_field_offs_old(
1779 		rec, DICT_FLD__SYS_COLUMNS__DB_ROLL_PTR, &len);
1780 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1781 		goto err_len;
1782 	}
1783 
1784 	field = rec_get_nth_field_old(
1785 		rec, DICT_FLD__SYS_COLUMNS__NAME, &len);
1786 	if (len == 0 || len == UNIV_SQL_NULL) {
1787 		goto err_len;
1788 	}
1789 
1790 	name = mem_heap_strdupl(heap, (const char*) field, len);
1791 
1792 	if (col_name) {
1793 		*col_name = name;
1794 	}
1795 
1796 	field = rec_get_nth_field_old(
1797 		rec, DICT_FLD__SYS_COLUMNS__MTYPE, &len);
1798 	if (len != 4) {
1799 		goto err_len;
1800 	}
1801 
1802 	mtype = mach_read_from_4(field);
1803 
1804 	field = rec_get_nth_field_old(
1805 		rec, DICT_FLD__SYS_COLUMNS__PRTYPE, &len);
1806 	if (len != 4) {
1807 		goto err_len;
1808 	}
1809 	prtype = mach_read_from_4(field);
1810 
1811 	if (dtype_get_charset_coll(prtype) == 0
1812 	    && dtype_is_string_type(mtype)) {
1813 		/* The table was created with < 4.1.2. */
1814 
1815 		if (dtype_is_binary_string_type(mtype, prtype)) {
1816 			/* Use the binary collation for
1817 			string columns of binary type. */
1818 
1819 			prtype = dtype_form_prtype(
1820 				prtype,
1821 				DATA_MYSQL_BINARY_CHARSET_COLL);
1822 		} else {
1823 			/* Use the default charset for
1824 			other than binary columns. */
1825 
1826 			prtype = dtype_form_prtype(
1827 				prtype,
1828 				data_mysql_default_charset_coll);
1829 		}
1830 	}
1831 
1832 	if (table && table->n_def != pos && !(prtype & DATA_VIRTUAL)) {
1833 		return("SYS_COLUMNS.POS mismatch");
1834 	}
1835 
1836 	field = rec_get_nth_field_old(
1837 		rec, DICT_FLD__SYS_COLUMNS__LEN, &len);
1838 	if (len != 4) {
1839 		goto err_len;
1840 	}
1841 	col_len = mach_read_from_4(field);
1842 	field = rec_get_nth_field_old(
1843 		rec, DICT_FLD__SYS_COLUMNS__PREC, &len);
1844 	if (len != 4) {
1845 		goto err_len;
1846 	}
1847 	num_base = mach_read_from_4(field);
1848 
1849 	if (column == NULL) {
1850 		if (prtype & DATA_VIRTUAL) {
1851 #ifdef UNIV_DEBUG
1852 			dict_v_col_t*	vcol =
1853 #endif
1854 			dict_mem_table_add_v_col(
1855 				table, heap, name, mtype,
1856 				prtype, col_len,
1857 				dict_get_v_col_mysql_pos(pos), num_base);
1858 			ut_ad(vcol->v_pos == dict_get_v_col_pos(pos));
1859 		} else {
1860 			ut_ad(num_base == 0);
1861 			dict_mem_table_add_col(table, heap, name, mtype,
1862 					       prtype, col_len);
1863 		}
1864 	} else {
1865 		dict_mem_fill_column_struct(column, pos, mtype,
1866 					    prtype, col_len);
1867 	}
1868 
1869 	/* Report the virtual column number */
1870 	if ((prtype & DATA_VIRTUAL) && nth_v_col != NULL) {
1871 		*nth_v_col = dict_get_v_col_pos(pos);
1872 	}
1873 
1874 	return(NULL);
1875 }
1876 
1877 /** Error message for a delete-marked record in dict_load_virtual_low() */
1878 static const char* dict_load_virtual_del = "delete-marked record in SYS_VIRTUAL";
1879 
1880 /** Loads a virtual column "mapping" (to base columns) information
1881 from a SYS_VIRTUAL record
1882 @param[in,out]	table		table
1883 @param[in,out]	heap		memory heap
1884 @param[in,out]	column		mapped base column's dict_column_t
1885 @param[in,out]	table_id	table id
1886 @param[in,out]	pos		virtual column position
1887 @param[in,out]	base_pos	base column position
1888 @param[in]	rec		SYS_VIRTUAL record
1889 @return error message, or NULL on success */
1890 const char*
dict_load_virtual_low(dict_table_t * table,mem_heap_t * heap,dict_col_t ** column,table_id_t * table_id,ulint * pos,ulint * base_pos,const rec_t * rec)1891 dict_load_virtual_low(
1892 	dict_table_t*	table,
1893 	mem_heap_t*	heap,
1894 	dict_col_t**	column,
1895 	table_id_t*	table_id,
1896 	ulint*		pos,
1897 	ulint*		base_pos,
1898 	const rec_t*	rec)
1899 {
1900 	const byte*	field;
1901 	ulint		len;
1902 	ulint		base;
1903 
1904 	if (rec_get_deleted_flag(rec, 0)) {
1905 		return(dict_load_virtual_del);
1906 	}
1907 
1908 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_VIRTUAL) {
1909 		return("wrong number of columns in SYS_VIRTUAL record");
1910 	}
1911 
1912 	field = rec_get_nth_field_old(
1913 		rec, DICT_FLD__SYS_VIRTUAL__TABLE_ID, &len);
1914 	if (len != 8) {
1915 err_len:
1916 		return("incorrect column length in SYS_VIRTUAL");
1917 	}
1918 
1919 	if (table_id != NULL) {
1920 		*table_id = mach_read_from_8(field);
1921 	} else if (table->id != mach_read_from_8(field)) {
1922 		return("SYS_VIRTUAL.TABLE_ID mismatch");
1923 	}
1924 
1925 	field = rec_get_nth_field_old(
1926 		rec, DICT_FLD__SYS_VIRTUAL__POS, &len);
1927 	if (len != 4) {
1928 		goto err_len;
1929 	}
1930 
1931 	if (pos != NULL) {
1932 		*pos = mach_read_from_4(field);
1933 	}
1934 
1935 	field = rec_get_nth_field_old(
1936 		rec, DICT_FLD__SYS_VIRTUAL__BASE_POS, &len);
1937 	if (len != 4) {
1938 		goto err_len;
1939 	}
1940 
1941 	base = mach_read_from_4(field);
1942 
1943 	if (base_pos != NULL) {
1944 		*base_pos = base;
1945 	}
1946 
1947 	rec_get_nth_field_offs_old(
1948 		rec, DICT_FLD__SYS_VIRTUAL__DB_TRX_ID, &len);
1949 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1950 		goto err_len;
1951 	}
1952 
1953 	rec_get_nth_field_offs_old(
1954 		rec, DICT_FLD__SYS_VIRTUAL__DB_ROLL_PTR, &len);
1955 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1956 		goto err_len;
1957 	}
1958 
1959 	if (column != NULL) {
1960 		*column = dict_table_get_nth_col(table, base);
1961 	}
1962 
1963 	return(NULL);
1964 }
1965 /********************************************************************//**
1966 Loads definitions for table columns. */
1967 static
1968 void
dict_load_columns(dict_table_t * table,mem_heap_t * heap)1969 dict_load_columns(
1970 /*==============*/
1971 	dict_table_t*	table,	/*!< in/out: table */
1972 	mem_heap_t*	heap)	/*!< in/out: memory heap
1973 				for temporary storage */
1974 {
1975 	dict_table_t*	sys_columns;
1976 	dict_index_t*	sys_index;
1977 	btr_pcur_t	pcur;
1978 	dtuple_t*	tuple;
1979 	dfield_t*	dfield;
1980 	const rec_t*	rec;
1981 	byte*		buf;
1982 	ulint		i;
1983 	mtr_t		mtr;
1984 	ulint		n_skipped = 0;
1985 
1986 	ut_ad(mutex_own(&dict_sys->mutex));
1987 
1988 	mtr_start(&mtr);
1989 
1990 	sys_columns = dict_table_get_low("SYS_COLUMNS");
1991 	sys_index = UT_LIST_GET_FIRST(sys_columns->indexes);
1992 	ut_ad(!dict_table_is_comp(sys_columns));
1993 
1994 	ut_ad(name_of_col_is(sys_columns, sys_index,
1995 			     DICT_FLD__SYS_COLUMNS__NAME, "NAME"));
1996 	ut_ad(name_of_col_is(sys_columns, sys_index,
1997 			     DICT_FLD__SYS_COLUMNS__PREC, "PREC"));
1998 
1999 	tuple = dtuple_create(heap, 1);
2000 	dfield = dtuple_get_nth_field(tuple, 0);
2001 
2002 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2003 	mach_write_to_8(buf, table->id);
2004 
2005 	dfield_set_data(dfield, buf, 8);
2006 	dict_index_copy_types(tuple, sys_index, 1);
2007 
2008 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2009 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2010 
2011 	ut_ad(table->n_t_cols == static_cast<ulint>(
2012 	      table->n_cols) + static_cast<ulint>(table->n_v_cols));
2013 
2014 	for (i = 0;
2015 	     i + DATA_N_SYS_COLS < table->n_t_cols + n_skipped;
2016 	     i++) {
2017 		const char*	err_msg;
2018 		const char*	name = NULL;
2019 		ulint		nth_v_col = ULINT_UNDEFINED;
2020 
2021 		rec = btr_pcur_get_rec(&pcur);
2022 
2023 		ut_a(btr_pcur_is_on_user_rec(&pcur));
2024 
2025 		err_msg = dict_load_column_low(table, heap, NULL, NULL,
2026 					       &name, rec, &nth_v_col);
2027 
2028 		if (err_msg == dict_load_column_del) {
2029 			n_skipped++;
2030 			goto next_rec;
2031 		} else if (err_msg) {
2032 			ib::fatal() << err_msg;
2033 		}
2034 
2035 		/* Note: Currently we have one DOC_ID column that is
2036 		shared by all FTS indexes on a table. And only non-virtual
2037 		column can be used for FULLTEXT index */
2038 		if (innobase_strcasecmp(name,
2039 					FTS_DOC_ID_COL_NAME) == 0
2040 		    && nth_v_col == ULINT_UNDEFINED) {
2041 			dict_col_t*	col;
2042 			/* As part of normal loading of tables the
2043 			table->flag is not set for tables with FTS
2044 			till after the FTS indexes are loaded. So we
2045 			create the fts_t instance here if there isn't
2046 			one already created.
2047 
2048 			This case does not arise for table create as
2049 			the flag is set before the table is created. */
2050 			if (table->fts == NULL) {
2051 				table->fts = fts_create(table);
2052 				fts_optimize_add_table(table);
2053 			}
2054 
2055 			ut_a(table->fts->doc_col == ULINT_UNDEFINED);
2056 
2057 			col = dict_table_get_nth_col(table, i - n_skipped);
2058 
2059 			ut_ad(col->len == sizeof(doc_id_t));
2060 
2061 			if (col->prtype & DATA_FTS_DOC_ID) {
2062 				DICT_TF2_FLAG_SET(
2063 					table, DICT_TF2_FTS_HAS_DOC_ID);
2064 				DICT_TF2_FLAG_UNSET(
2065 					table, DICT_TF2_FTS_ADD_DOC_ID);
2066 			}
2067 
2068 			table->fts->doc_col = i - n_skipped;
2069 		}
2070 next_rec:
2071 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2072 	}
2073 
2074 	btr_pcur_close(&pcur);
2075 	mtr_commit(&mtr);
2076 }
2077 
2078 /** Loads SYS_VIRTUAL info for one virtual column
2079 @param[in,out]	table		table
2080 @param[in]	nth_v_col	virtual column sequence num
2081 @param[in,out]	v_col		virtual column
2082 @param[in,out]	heap		memory heap
2083 */
2084 static
2085 void
dict_load_virtual_one_col(dict_table_t * table,ulint nth_v_col,dict_v_col_t * v_col,mem_heap_t * heap)2086 dict_load_virtual_one_col(
2087 	dict_table_t*	table,
2088 	ulint		nth_v_col,
2089 	dict_v_col_t*	v_col,
2090 	mem_heap_t*	heap)
2091 {
2092 	dict_table_t*	sys_virtual;
2093 	dict_index_t*	sys_virtual_index;
2094 	btr_pcur_t	pcur;
2095 	dtuple_t*	tuple;
2096 	dfield_t*	dfield;
2097 	const rec_t*	rec;
2098 	byte*		buf;
2099 	ulint		i = 0;
2100 	mtr_t		mtr;
2101 	ulint		skipped = 0;
2102 
2103 	ut_ad(mutex_own(&dict_sys->mutex));
2104 
2105 	if (v_col->num_base == 0) {
2106 		return;
2107 	}
2108 
2109 	mtr_start(&mtr);
2110 
2111 	sys_virtual = dict_table_get_low("SYS_VIRTUAL");
2112 	sys_virtual_index = UT_LIST_GET_FIRST(sys_virtual->indexes);
2113 	ut_ad(!dict_table_is_comp(sys_virtual));
2114 
2115 	ut_ad(name_of_col_is(sys_virtual, sys_virtual_index,
2116 			     DICT_FLD__SYS_VIRTUAL__POS, "POS"));
2117 
2118 	tuple = dtuple_create(heap, 2);
2119 
2120 	/* table ID field */
2121 	dfield = dtuple_get_nth_field(tuple, 0);
2122 
2123 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2124 	mach_write_to_8(buf, table->id);
2125 
2126 	dfield_set_data(dfield, buf, 8);
2127 
2128 	/* virtual column pos field */
2129 	dfield = dtuple_get_nth_field(tuple, 1);
2130 
2131 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
2132 	ulint	vcol_pos = dict_create_v_col_pos(nth_v_col, v_col->m_col.ind);
2133 	mach_write_to_4(buf, vcol_pos);
2134 
2135 	dfield_set_data(dfield, buf, 4);
2136 
2137 	dict_index_copy_types(tuple, sys_virtual_index, 2);
2138 
2139 	btr_pcur_open_on_user_rec(sys_virtual_index, tuple, PAGE_CUR_GE,
2140 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2141 
2142 	for (i = 0; i < v_col->num_base + skipped; i++) {
2143 		const char*	err_msg;
2144 		ulint		pos;
2145 
2146 		ut_ad(btr_pcur_is_on_user_rec(&pcur));
2147 
2148 		rec = btr_pcur_get_rec(&pcur);
2149 
2150 		ut_a(btr_pcur_is_on_user_rec(&pcur));
2151 
2152 		err_msg = dict_load_virtual_low(table, heap,
2153 						&v_col->base_col[i - skipped],
2154 						NULL,
2155 					        &pos, NULL, rec);
2156 
2157 		if (err_msg) {
2158 			if (err_msg != dict_load_virtual_del) {
2159 				ib::fatal() << err_msg;
2160 			} else {
2161 				skipped++;
2162 			}
2163 		} else {
2164 			ut_ad(pos == vcol_pos);
2165 		}
2166 
2167 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2168 	}
2169 
2170 	btr_pcur_close(&pcur);
2171 	mtr_commit(&mtr);
2172 }
2173 
2174 /** Loads info from SYS_VIRTUAL for virtual columns.
2175 @param[in,out]	table	table
2176 @param[in]	heap	memory heap
2177 */
2178 static
2179 void
dict_load_virtual(dict_table_t * table,mem_heap_t * heap)2180 dict_load_virtual(
2181 	dict_table_t*	table,
2182 	mem_heap_t*	heap)
2183 {
2184 	for (ulint i = 0; i < table->n_v_cols; i++) {
2185 		dict_v_col_t*	v_col = dict_table_get_nth_v_col(table, i);
2186 
2187 		dict_load_virtual_one_col(table, i, v_col, heap);
2188 	}
2189 }
2190 
2191 /** Error message for a delete-marked record in dict_load_field_low() */
2192 static const char* dict_load_field_del = "delete-marked record in SYS_FIELDS";
2193 
2194 /********************************************************************//**
2195 Loads an index field definition from a SYS_FIELDS record to
2196 dict_index_t.
2197 @return error message, or NULL on success */
2198 const char*
dict_load_field_low(byte * index_id,dict_index_t * index,dict_field_t * sys_field,ulint * pos,byte * last_index_id,mem_heap_t * heap,const rec_t * rec)2199 dict_load_field_low(
2200 /*================*/
2201 	byte*		index_id,	/*!< in/out: index id (8 bytes)
2202 					an "in" value if index != NULL
2203 					and "out" if index == NULL */
2204 	dict_index_t*	index,		/*!< in/out: index, could be NULL
2205 					if we just populate a dict_field_t
2206 					struct with information from
2207 					a SYS_FIELDS record */
2208 	dict_field_t*	sys_field,	/*!< out: dict_field_t to be
2209 					filled */
2210 	ulint*		pos,		/*!< out: Field position */
2211 	byte*		last_index_id,	/*!< in: last index id */
2212 	mem_heap_t*	heap,		/*!< in/out: memory heap
2213 					for temporary storage */
2214 	const rec_t*	rec)		/*!< in: SYS_FIELDS record */
2215 {
2216 	const byte*	field;
2217 	ulint		len;
2218 	ulint		pos_and_prefix_len;
2219 	ulint		prefix_len;
2220 	ibool		first_field;
2221 	ulint		position;
2222 
2223 	/* Either index or sys_field is supplied, not both */
2224 	ut_a((!index) || (!sys_field));
2225 
2226 	if (rec_get_deleted_flag(rec, 0)) {
2227 		return(dict_load_field_del);
2228 	}
2229 
2230 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FIELDS) {
2231 		return("wrong number of columns in SYS_FIELDS record");
2232 	}
2233 
2234 	field = rec_get_nth_field_old(
2235 		rec, DICT_FLD__SYS_FIELDS__INDEX_ID, &len);
2236 	if (len != 8) {
2237 err_len:
2238 		return("incorrect column length in SYS_FIELDS");
2239 	}
2240 
2241 	if (!index) {
2242 		ut_a(last_index_id);
2243 		memcpy(index_id, (const char*) field, 8);
2244 		first_field = memcmp(index_id, last_index_id, 8);
2245 	} else {
2246 		first_field = (index->n_def == 0);
2247 		if (memcmp(field, index_id, 8)) {
2248 			return("SYS_FIELDS.INDEX_ID mismatch");
2249 		}
2250 	}
2251 
2252 	/* The next field stores the field position in the index and a
2253 	possible column prefix length if the index field does not
2254 	contain the whole column. The storage format is like this: if
2255 	there is at least one prefix field in the index, then the HIGH
2256 	2 bytes contain the field number (index->n_def) and the low 2
2257 	bytes the prefix length for the field. Otherwise the field
2258 	number (index->n_def) is contained in the 2 LOW bytes. */
2259 
2260 	field = rec_get_nth_field_old(
2261 		rec, DICT_FLD__SYS_FIELDS__POS, &len);
2262 	if (len != 4) {
2263 		goto err_len;
2264 	}
2265 
2266 	pos_and_prefix_len = mach_read_from_4(field);
2267 
2268 	if (index && UNIV_UNLIKELY
2269 	    ((pos_and_prefix_len & 0xFFFFUL) != index->n_def
2270 	     && (pos_and_prefix_len >> 16 & 0xFFFF) != index->n_def)) {
2271 		return("SYS_FIELDS.POS mismatch");
2272 	}
2273 
2274 	if (first_field || pos_and_prefix_len > 0xFFFFUL) {
2275 		prefix_len = pos_and_prefix_len & 0xFFFFUL;
2276 		position = (pos_and_prefix_len & 0xFFFF0000UL)  >> 16;
2277 	} else {
2278 		prefix_len = 0;
2279 		position = pos_and_prefix_len & 0xFFFFUL;
2280 	}
2281 
2282 	rec_get_nth_field_offs_old(
2283 		rec, DICT_FLD__SYS_FIELDS__DB_TRX_ID, &len);
2284 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2285 		goto err_len;
2286 	}
2287 	rec_get_nth_field_offs_old(
2288 		rec, DICT_FLD__SYS_FIELDS__DB_ROLL_PTR, &len);
2289 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2290 		goto err_len;
2291 	}
2292 
2293 	field = rec_get_nth_field_old(
2294 		rec, DICT_FLD__SYS_FIELDS__COL_NAME, &len);
2295 	if (len == 0 || len == UNIV_SQL_NULL) {
2296 		goto err_len;
2297 	}
2298 
2299 	if (index) {
2300 		dict_mem_index_add_field(
2301 			index, mem_heap_strdupl(heap, (const char*) field, len),
2302 			prefix_len);
2303 	} else {
2304 		ut_a(sys_field);
2305 		ut_a(pos);
2306 
2307 		sys_field->name = mem_heap_strdupl(
2308 			heap, (const char*) field, len);
2309 		sys_field->prefix_len = prefix_len;
2310 		*pos = position;
2311 	}
2312 
2313 	return(NULL);
2314 }
2315 
2316 /********************************************************************//**
2317 Loads definitions for index fields.
2318 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption */
2319 static
2320 ulint
dict_load_fields(dict_index_t * index,mem_heap_t * heap)2321 dict_load_fields(
2322 /*=============*/
2323 	dict_index_t*	index,	/*!< in/out: index whose fields to load */
2324 	mem_heap_t*	heap)	/*!< in: memory heap for temporary storage */
2325 {
2326 	dict_table_t*	sys_fields;
2327 	dict_index_t*	sys_index;
2328 	btr_pcur_t	pcur;
2329 	dtuple_t*	tuple;
2330 	dfield_t*	dfield;
2331 	const rec_t*	rec;
2332 	byte*		buf;
2333 	ulint		i;
2334 	mtr_t		mtr;
2335 	dberr_t		error;
2336 
2337 	ut_ad(mutex_own(&dict_sys->mutex));
2338 
2339 	mtr_start(&mtr);
2340 
2341 	sys_fields = dict_table_get_low("SYS_FIELDS");
2342 	sys_index = UT_LIST_GET_FIRST(sys_fields->indexes);
2343 	ut_ad(!dict_table_is_comp(sys_fields));
2344 	ut_ad(name_of_col_is(sys_fields, sys_index,
2345 			     DICT_FLD__SYS_FIELDS__COL_NAME, "COL_NAME"));
2346 
2347 	tuple = dtuple_create(heap, 1);
2348 	dfield = dtuple_get_nth_field(tuple, 0);
2349 
2350 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2351 	mach_write_to_8(buf, index->id);
2352 
2353 	dfield_set_data(dfield, buf, 8);
2354 	dict_index_copy_types(tuple, sys_index, 1);
2355 
2356 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2357 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2358 	for (i = 0; i < index->n_fields; i++) {
2359 		const char* err_msg;
2360 
2361 		rec = btr_pcur_get_rec(&pcur);
2362 
2363 		ut_a(btr_pcur_is_on_user_rec(&pcur));
2364 
2365 		err_msg = dict_load_field_low(buf, index, NULL, NULL, NULL,
2366 					      heap, rec);
2367 
2368 		if (err_msg == dict_load_field_del) {
2369 			/* There could be delete marked records in
2370 			SYS_FIELDS because SYS_FIELDS.INDEX_ID can be
2371 			updated by ALTER TABLE ADD INDEX. */
2372 
2373 			goto next_rec;
2374 		} else if (err_msg) {
2375 			ib::error() << err_msg;
2376 			error = DB_CORRUPTION;
2377 			goto func_exit;
2378 		}
2379 next_rec:
2380 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2381 	}
2382 
2383 	error = DB_SUCCESS;
2384 func_exit:
2385 	btr_pcur_close(&pcur);
2386 	mtr_commit(&mtr);
2387 	return(error);
2388 }
2389 
2390 /** Error message for a delete-marked record in dict_load_index_low() */
2391 static const char* dict_load_index_del = "delete-marked record in SYS_INDEXES";
2392 /** Error message for table->id mismatch in dict_load_index_low() */
2393 static const char* dict_load_index_id_err = "SYS_INDEXES.TABLE_ID mismatch";
2394 
2395 /********************************************************************//**
2396 Loads an index definition from a SYS_INDEXES record to dict_index_t.
2397 If allocate=TRUE, we will create a dict_index_t structure and fill it
2398 accordingly. If allocated=FALSE, the dict_index_t will be supplied by
2399 the caller and filled with information read from the record.  @return
2400 error message, or NULL on success */
2401 const char*
dict_load_index_low(byte * table_id,const char * table_name,mem_heap_t * heap,const rec_t * rec,ibool allocate,dict_index_t ** index)2402 dict_load_index_low(
2403 /*================*/
2404 	byte*		table_id,	/*!< in/out: table id (8 bytes),
2405 					an "in" value if allocate=TRUE
2406 					and "out" when allocate=FALSE */
2407 	const char*	table_name,	/*!< in: table name */
2408 	mem_heap_t*	heap,		/*!< in/out: temporary memory heap */
2409 	const rec_t*	rec,		/*!< in: SYS_INDEXES record */
2410 	ibool		allocate,	/*!< in: TRUE=allocate *index,
2411 					FALSE=fill in a pre-allocated
2412 					*index */
2413 	dict_index_t**	index)		/*!< out,own: index, or NULL */
2414 {
2415 	const byte*	field;
2416 	ulint		len;
2417 	ulint		name_len;
2418 	char*		name_buf;
2419 	index_id_t	id;
2420 	ulint		n_fields;
2421 	ulint		type;
2422 	ulint		space;
2423 	ulint		merge_threshold;
2424 
2425 	if (allocate) {
2426 		/* If allocate=TRUE, no dict_index_t will
2427 		be supplied. Initialize "*index" to NULL */
2428 		*index = NULL;
2429 	}
2430 
2431 	if (rec_get_deleted_flag(rec, 0)) {
2432 		return(dict_load_index_del);
2433 	}
2434 
2435 	if (rec_get_n_fields_old(rec) == DICT_NUM_FIELDS__SYS_INDEXES) {
2436 		/* MERGE_THRESHOLD exists */
2437 		field = rec_get_nth_field_old(
2438 			rec, DICT_FLD__SYS_INDEXES__MERGE_THRESHOLD, &len);
2439 		switch (len) {
2440 		case 4:
2441 			merge_threshold = mach_read_from_4(field);
2442 			break;
2443 		case UNIV_SQL_NULL:
2444 			merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2445 			break;
2446 		default:
2447 			return("incorrect MERGE_THRESHOLD length"
2448 			       " in SYS_INDEXES");
2449 		}
2450 	} else if (rec_get_n_fields_old(rec)
2451 		   == DICT_NUM_FIELDS__SYS_INDEXES - 1) {
2452 		/* MERGE_THRESHOLD doesn't exist */
2453 
2454 		merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2455 	} else {
2456 		return("wrong number of columns in SYS_INDEXES record");
2457 	}
2458 
2459 	field = rec_get_nth_field_old(
2460 		rec, DICT_FLD__SYS_INDEXES__TABLE_ID, &len);
2461 	if (len != 8) {
2462 err_len:
2463 		return("incorrect column length in SYS_INDEXES");
2464 	}
2465 
2466 	if (!allocate) {
2467 		/* We are reading a SYS_INDEXES record. Copy the table_id */
2468 		memcpy(table_id, (const char*) field, 8);
2469 	} else if (memcmp(field, table_id, 8)) {
2470 		/* Caller supplied table_id, verify it is the same
2471 		id as on the index record */
2472 		return(dict_load_index_id_err);
2473 	}
2474 
2475 	field = rec_get_nth_field_old(
2476 		rec, DICT_FLD__SYS_INDEXES__ID, &len);
2477 	if (len != 8) {
2478 		goto err_len;
2479 	}
2480 
2481 	id = mach_read_from_8(field);
2482 
2483 	rec_get_nth_field_offs_old(
2484 		rec, DICT_FLD__SYS_INDEXES__DB_TRX_ID, &len);
2485 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2486 		goto err_len;
2487 	}
2488 	rec_get_nth_field_offs_old(
2489 		rec, DICT_FLD__SYS_INDEXES__DB_ROLL_PTR, &len);
2490 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2491 		goto err_len;
2492 	}
2493 
2494 	field = rec_get_nth_field_old(
2495 		rec, DICT_FLD__SYS_INDEXES__NAME, &name_len);
2496 	if (name_len == UNIV_SQL_NULL) {
2497 		goto err_len;
2498 	}
2499 
2500 	name_buf = mem_heap_strdupl(heap, (const char*) field,
2501 				    name_len);
2502 
2503 	field = rec_get_nth_field_old(
2504 		rec, DICT_FLD__SYS_INDEXES__N_FIELDS, &len);
2505 	if (len != 4) {
2506 		goto err_len;
2507 	}
2508 	n_fields = mach_read_from_4(field);
2509 
2510 	field = rec_get_nth_field_old(
2511 		rec, DICT_FLD__SYS_INDEXES__TYPE, &len);
2512 	if (len != 4) {
2513 		goto err_len;
2514 	}
2515 	type = mach_read_from_4(field);
2516 	if (type & (~0U << DICT_IT_BITS)) {
2517 		return("unknown SYS_INDEXES.TYPE bits");
2518 	}
2519 
2520 	field = rec_get_nth_field_old(
2521 		rec, DICT_FLD__SYS_INDEXES__SPACE, &len);
2522 	if (len != 4) {
2523 		goto err_len;
2524 	}
2525 	space = mach_read_from_4(field);
2526 
2527 	field = rec_get_nth_field_old(
2528 		rec, DICT_FLD__SYS_INDEXES__PAGE_NO, &len);
2529 	if (len != 4) {
2530 		goto err_len;
2531 	}
2532 
2533 	if (allocate) {
2534 		*index = dict_mem_index_create(table_name, name_buf,
2535 					       space, type, n_fields);
2536 	} else {
2537 		ut_a(*index);
2538 
2539 		dict_mem_fill_index_struct(*index, NULL, NULL, name_buf,
2540 					   space, type, n_fields);
2541 	}
2542 
2543 	(*index)->id = id;
2544 	(*index)->page = mach_read_from_4(field);
2545 	ut_ad((*index)->page);
2546 	(*index)->merge_threshold = merge_threshold;
2547 
2548 	return(NULL);
2549 }
2550 
2551 /********************************************************************//**
2552 Loads definitions for table indexes. Adds them to the data dictionary
2553 cache.
2554 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption of dictionary
2555 table or DB_UNSUPPORTED if table has unknown index type */
2556 static MY_ATTRIBUTE((nonnull))
2557 dberr_t
dict_load_indexes(dict_table_t * table,mem_heap_t * heap,dict_err_ignore_t ignore_err)2558 dict_load_indexes(
2559 /*==============*/
2560 	dict_table_t*	table,	/*!< in/out: table */
2561 	mem_heap_t*	heap,	/*!< in: memory heap for temporary storage */
2562 	dict_err_ignore_t ignore_err)
2563 				/*!< in: error to be ignored when
2564 				loading the index definition */
2565 {
2566 	dict_table_t*	sys_indexes;
2567 	dict_index_t*	sys_index;
2568 	btr_pcur_t	pcur;
2569 	dtuple_t*	tuple;
2570 	dfield_t*	dfield;
2571 	const rec_t*	rec;
2572 	byte*		buf;
2573 	mtr_t		mtr;
2574 	dberr_t		error = DB_SUCCESS;
2575 
2576 	ut_ad(mutex_own(&dict_sys->mutex));
2577 
2578 	mtr_start(&mtr);
2579 
2580 	sys_indexes = dict_table_get_low("SYS_INDEXES");
2581 	sys_index = UT_LIST_GET_FIRST(sys_indexes->indexes);
2582 	ut_ad(!dict_table_is_comp(sys_indexes));
2583 	ut_ad(name_of_col_is(sys_indexes, sys_index,
2584 			     DICT_FLD__SYS_INDEXES__NAME, "NAME"));
2585 	ut_ad(name_of_col_is(sys_indexes, sys_index,
2586 			     DICT_FLD__SYS_INDEXES__PAGE_NO, "PAGE_NO"));
2587 
2588 	tuple = dtuple_create(heap, 1);
2589 	dfield = dtuple_get_nth_field(tuple, 0);
2590 
2591 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2592 	mach_write_to_8(buf, table->id);
2593 
2594 	dfield_set_data(dfield, buf, 8);
2595 	dict_index_copy_types(tuple, sys_index, 1);
2596 
2597 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2598 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2599 	for (;;) {
2600 		dict_index_t*	index = NULL;
2601 		const char*	err_msg;
2602 
2603 		if (!btr_pcur_is_on_user_rec(&pcur)) {
2604 
2605 			/* We should allow the table to open even
2606 			without index when DICT_ERR_IGNORE_CORRUPT is set.
2607 			DICT_ERR_IGNORE_CORRUPT is currently only set
2608 			for drop table */
2609 			if (dict_table_get_first_index(table) == NULL
2610 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2611 				ib::warn() << "Cannot load table "
2612 					<< table->name
2613 					<< " because it has no indexes in"
2614 					" InnoDB internal data dictionary.";
2615 				error = DB_CORRUPTION;
2616 				goto func_exit;
2617 			}
2618 
2619 			break;
2620 		}
2621 
2622 		rec = btr_pcur_get_rec(&pcur);
2623 
2624 		if ((ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2625 		    && (rec_get_n_fields_old(rec)
2626 			== DICT_NUM_FIELDS__SYS_INDEXES
2627 			/* a record for older SYS_INDEXES table
2628 			(missing merge_threshold column) is acceptable. */
2629 			|| rec_get_n_fields_old(rec)
2630 			   == DICT_NUM_FIELDS__SYS_INDEXES - 1)) {
2631 			const byte*	field;
2632 			ulint		len;
2633 			field = rec_get_nth_field_old(
2634 				rec, DICT_FLD__SYS_INDEXES__NAME, &len);
2635 
2636 			if (len != UNIV_SQL_NULL
2637 			    && static_cast<char>(*field)
2638 			    == static_cast<char>(*TEMP_INDEX_PREFIX_STR)) {
2639 				/* Skip indexes whose name starts with
2640 				TEMP_INDEX_PREFIX, because they will
2641 				be dropped during crash recovery. */
2642 				goto next_rec;
2643 			}
2644 		}
2645 
2646 		err_msg = dict_load_index_low(
2647 			buf, table->name.m_name, heap, rec, TRUE, &index);
2648 		ut_ad((index == NULL && err_msg != NULL)
2649 		      || (index != NULL && err_msg == NULL));
2650 
2651 		if (err_msg == dict_load_index_id_err) {
2652 			/* TABLE_ID mismatch means that we have
2653 			run out of index definitions for the table. */
2654 
2655 			if (dict_table_get_first_index(table) == NULL
2656 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2657 
2658 				ib::warn() << "Failed to load the"
2659 					" clustered index for table "
2660 					<< table->name
2661 					<< " because of the following error: "
2662 					<< err_msg << "."
2663 					" Refusing to load the rest of the"
2664 					" indexes (if any) and the whole table"
2665 					" altogether.";
2666 				error = DB_CORRUPTION;
2667 				goto func_exit;
2668 			}
2669 
2670 			break;
2671 		} else if (err_msg == dict_load_index_del) {
2672 			/* Skip delete-marked records. */
2673 			goto next_rec;
2674 		} else if (err_msg) {
2675 			ib::error() << err_msg;
2676 			if (ignore_err & DICT_ERR_IGNORE_CORRUPT) {
2677 				goto next_rec;
2678 			}
2679 			error = DB_CORRUPTION;
2680 			goto func_exit;
2681 		}
2682 
2683 		ut_ad(index);
2684 
2685 		/* Check whether the index is corrupted */
2686 		if (dict_index_is_corrupted(index)) {
2687 
2688 			ib::error() << "Index " << index->name
2689 				<< " of table " << table->name
2690 				<< " is corrupted";
2691 
2692 			if (!srv_load_corrupted
2693 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)
2694 			    && dict_index_is_clust(index)) {
2695 				dict_mem_index_free(index);
2696 
2697 				error = DB_INDEX_CORRUPT;
2698 				goto func_exit;
2699 			} else {
2700 				/* We will load the index if
2701 				1) srv_load_corrupted is TRUE
2702 				2) ignore_err is set with
2703 				DICT_ERR_IGNORE_CORRUPT
2704 				3) if the index corrupted is a secondary
2705 				index */
2706 				ib::info() << "Load corrupted index "
2707 					<< index->name
2708 					<< " of table " << table->name;
2709 			}
2710 		}
2711 
2712 		if (index->type & DICT_FTS
2713 		    && !dict_table_has_fts_index(table)) {
2714 			/* This should have been created by now. */
2715 			ut_a(table->fts != NULL);
2716 			DICT_TF2_FLAG_SET(table, DICT_TF2_FTS);
2717 		}
2718 
2719 		/* We check for unsupported types first, so that the
2720 		subsequent checks are relevant for the supported types. */
2721 		if (index->type & ~(DICT_CLUSTERED | DICT_UNIQUE
2722 				    | DICT_CORRUPT | DICT_FTS
2723 				    | DICT_SPATIAL | DICT_VIRTUAL)) {
2724 
2725 			ib::error() << "Unknown type " << index->type
2726 				<< " of index " << index->name
2727 				<< " of table " << table->name;
2728 
2729 			error = DB_UNSUPPORTED;
2730 			dict_mem_index_free(index);
2731 			goto func_exit;
2732 		} else if (index->page == FIL_NULL
2733 			   && !table->file_unreadable
2734 			   && (!(index->type & DICT_FTS))) {
2735 
2736 			ib::error() << "Trying to load index " << index->name
2737 				<< " for table " << table->name
2738 				<< ", but the index tree has been freed!";
2739 
2740 			if (ignore_err & DICT_ERR_IGNORE_INDEX_ROOT) {
2741 				/* If caller can tolerate this error,
2742 				we will continue to load the index and
2743 				let caller deal with this error. However
2744 				mark the index and table corrupted. We
2745 				only need to mark such in the index
2746 				dictionary cache for such metadata corruption,
2747 				since we would always be able to set it
2748 				when loading the dictionary cache */
2749 				index->table = table;
2750 				dict_set_corrupted_index_cache_only(index);
2751 
2752 				ib::info() << "Index is corrupt but forcing"
2753 					" load into data dictionary";
2754 			} else {
2755 corrupted:
2756 				dict_mem_index_free(index);
2757 				error = DB_CORRUPTION;
2758 				goto func_exit;
2759 			}
2760 		} else if (!dict_index_is_clust(index)
2761 			   && NULL == dict_table_get_first_index(table)) {
2762 
2763 			ib::error() << "Trying to load index " << index->name
2764 				<< " for table " << table->name
2765 				<< ", but the first index is not clustered!";
2766 
2767 			goto corrupted;
2768 		} else if (dict_is_sys_table(table->id)
2769 			   && (dict_index_is_clust(index)
2770 			       || ((table == dict_sys->sys_tables)
2771 				   && !strcmp("ID_IND", index->name)))) {
2772 
2773 			/* The index was created in memory already at booting
2774 			of the database server */
2775 			dict_mem_index_free(index);
2776 		} else {
2777 			dict_load_fields(index, heap);
2778 
2779 			error = dict_index_add_to_cache(
2780 				table, index, index->page, FALSE);
2781 
2782 			/* The data dictionary tables should never contain
2783 			invalid index definitions.  If we ignored this error
2784 			and simply did not load this index definition, the
2785 			.frm file would disagree with the index definitions
2786 			inside InnoDB. */
2787 			if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
2788 
2789 				goto func_exit;
2790 			}
2791 		}
2792 next_rec:
2793 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2794 	}
2795 
2796 	ut_ad(table->fts_doc_id_index == NULL);
2797 
2798 	if (table->fts != NULL) {
2799 		table->fts_doc_id_index = dict_table_get_index_on_name(
2800 			table, FTS_DOC_ID_INDEX_NAME);
2801 	}
2802 
2803 	/* If the table contains FTS indexes, populate table->fts->indexes */
2804 	if (dict_table_has_fts_index(table)) {
2805 		ut_ad(table->fts_doc_id_index != NULL);
2806 		/* table->fts->indexes should have been created. */
2807 		ut_a(table->fts->indexes != NULL);
2808 		dict_table_get_all_fts_indexes(table, table->fts->indexes);
2809 	}
2810 
2811 func_exit:
2812 	btr_pcur_close(&pcur);
2813 	mtr_commit(&mtr);
2814 
2815 	return(error);
2816 }
2817 
2818 /** Loads a table definition from a SYS_TABLES record to dict_table_t.
2819 Does not load any columns or indexes.
2820 @param[in]	name	Table name
2821 @param[in]	rec	SYS_TABLES record
2822 @param[out,own]	table	table, or NULL
2823 @return error message, or NULL on success */
2824 static
2825 const char*
dict_load_table_low(table_name_t & name,const rec_t * rec,dict_table_t ** table)2826 dict_load_table_low(
2827 	table_name_t&	name,
2828 	const rec_t*	rec,
2829 	dict_table_t**	table)
2830 {
2831 	table_id_t	table_id;
2832 	ulint		space_id;
2833 	ulint		n_cols;
2834 	ulint		t_num;
2835 	ulint		flags;
2836 	ulint		flags2;
2837 	ulint		n_v_col;
2838 
2839 	const char* error_text = dict_sys_tables_rec_check(rec);
2840 	if (error_text != NULL) {
2841 		return(error_text);
2842 	}
2843 
2844 	dict_sys_tables_rec_read(rec, name, &table_id, &space_id,
2845 				 &t_num, &flags, &flags2);
2846 
2847 	if (flags == ULINT_UNDEFINED) {
2848 		return("incorrect flags in SYS_TABLES");
2849 	}
2850 
2851 	dict_table_decode_n_col(t_num, &n_cols, &n_v_col);
2852 
2853 	*table = dict_mem_table_create(
2854 		name.m_name, space_id, n_cols + n_v_col, n_v_col, flags, flags2);
2855 	(*table)->id = table_id;
2856 	(*table)->set_file_readable();
2857 
2858 	return(NULL);
2859 }
2860 
2861 /********************************************************************//**
2862 Using the table->heap, copy the null-terminated filepath into
2863 table->data_dir_path and replace the 'databasename/tablename.ibd'
2864 portion with 'tablename'.
2865 This allows SHOW CREATE TABLE to return the correct DATA DIRECTORY path.
2866 Make this data directory path only if it has not yet been saved. */
2867 void
dict_save_data_dir_path(dict_table_t * table,char * filepath)2868 dict_save_data_dir_path(
2869 /*====================*/
2870 	dict_table_t*	table,		/*!< in/out: table */
2871 	char*		filepath)	/*!< in: filepath of tablespace */
2872 {
2873 	ut_ad(mutex_own(&dict_sys->mutex));
2874 	ut_a(DICT_TF_HAS_DATA_DIR(table->flags));
2875 
2876 	ut_a(!table->data_dir_path);
2877 	ut_a(filepath);
2878 
2879 	/* Be sure this filepath is not the default filepath. */
2880 	char*	default_filepath = fil_make_filepath(
2881 			NULL, table->name.m_name, IBD, false);
2882 	if (default_filepath) {
2883 		if (0 != strcmp(filepath, default_filepath)) {
2884 			ulint pathlen = strlen(filepath);
2885 			ut_a(pathlen < OS_FILE_MAX_PATH);
2886 			ut_a(0 == strcmp(filepath + pathlen - 4, DOT_IBD));
2887 
2888 			table->data_dir_path = mem_heap_strdup(
2889 				table->heap, filepath);
2890 			os_file_make_data_dir_path(table->data_dir_path);
2891 		}
2892 
2893 		ut_free(default_filepath);
2894 	}
2895 }
2896 
2897 /** Make sure the data_dir_path is saved in dict_table_t if DATA DIRECTORY
2898 was used. Try to read it from the fil_system first, then from SYS_DATAFILES.
2899 @param[in]	table		Table object
2900 @param[in]	dict_mutex_own	true if dict_sys->mutex is owned already */
2901 void
dict_get_and_save_data_dir_path(dict_table_t * table,bool dict_mutex_own)2902 dict_get_and_save_data_dir_path(
2903 	dict_table_t*	table,
2904 	bool		dict_mutex_own)
2905 {
2906 	if (DICT_TF_HAS_DATA_DIR(table->flags)
2907 	    && (!table->data_dir_path)) {
2908 		char*	path = fil_space_get_first_path(table->space);
2909 
2910 		if (!dict_mutex_own) {
2911 			dict_mutex_enter_for_mysql();
2912 		}
2913 
2914 		if (path == NULL) {
2915 			path = dict_get_first_path(table->space);
2916 		}
2917 
2918 		if (path != NULL) {
2919 			dict_save_data_dir_path(table, path);
2920 			ut_free(path);
2921 		}
2922 
2923 		if (table->data_dir_path == NULL) {
2924 			/* Since we did not set the table data_dir_path,
2925 			unset the flag.  This does not change SYS_DATAFILES
2926 			or SYS_TABLES or FSP_FLAGS on the header page of the
2927 			tablespace, but it makes dict_table_t consistent. */
2928 			table->flags &= ~DICT_TF_MASK_DATA_DIR;
2929 		}
2930 
2931 		if (!dict_mutex_own) {
2932 			dict_mutex_exit_for_mysql();
2933 		}
2934 	}
2935 }
2936 
2937 /** Make sure the tablespace name is saved in dict_table_t if the table
2938 uses a general tablespace.
2939 Try to read it from the fil_system_t first, then from SYS_TABLESPACES.
2940 @param[in]	table		Table object
2941 @param[in]	dict_mutex_own)	true if dict_sys->mutex is owned already */
2942 void
dict_get_and_save_space_name(dict_table_t * table,bool dict_mutex_own)2943 dict_get_and_save_space_name(
2944 	dict_table_t*	table,
2945 	bool		dict_mutex_own)
2946 {
2947 	/* Do this only for general tablespaces. */
2948 	if (!DICT_TF_HAS_SHARED_SPACE(table->flags)) {
2949 		return;
2950 	}
2951 
2952 	bool	use_cache = true;
2953 	if (table->tablespace != NULL) {
2954 
2955 		if (srv_sys_tablespaces_open
2956 		    && dict_table_has_temp_general_tablespace_name(
2957 			    table->tablespace)) {
2958 			/* We previous saved the temporary name,
2959 			get the real one now. */
2960 			use_cache = false;
2961 		} else {
2962 			/* Keep and use this name */
2963 			return;
2964 		}
2965 	}
2966 
2967 	if (use_cache) {
2968 		fil_space_t* space = fil_space_acquire_silent(table->space);
2969 
2970 		if (space != NULL) {
2971 			/* Use this name unless it is a temporary general
2972 			tablespace name and we can now replace it. */
2973 			if (!srv_sys_tablespaces_open
2974 			    || !dict_table_has_temp_general_tablespace_name(
2975 				    space->name)) {
2976 
2977 				/* Use this tablespace name */
2978 				table->tablespace = mem_heap_strdup(
2979 					table->heap, space->name);
2980 
2981 				fil_space_release(space);
2982 				return;
2983 			}
2984 			fil_space_release(space);
2985 		}
2986 	}
2987 
2988 	/* Read it from the dictionary. */
2989 	if (srv_sys_tablespaces_open) {
2990 		if (!dict_mutex_own) {
2991 			dict_mutex_enter_for_mysql();
2992 		}
2993 
2994 		table->tablespace = dict_space_get_name(
2995 			table->space, table->heap);
2996 
2997 		if (!dict_mutex_own) {
2998 			dict_mutex_exit_for_mysql();
2999 		}
3000 	}
3001 }
3002 
3003 /** Loads a table definition and also all its index definitions, and also
3004 the cluster definition if the table is a member in a cluster. Also loads
3005 all foreign key constraints where the foreign key is in the table or where
3006 a foreign key references columns in this table.
3007 @param[in]	name		Table name in the dbname/tablename format
3008 @param[in]	cached		true=add to cache, false=do not
3009 @param[in]	ignore_err	Error to be ignored when loading
3010 				table and its index definition
3011 @return table, NULL if does not exist; if the table is stored in an
3012 .ibd file, but the file does not exist, then we set the file_unreadable
3013 flag in the table object we return. */
3014 dict_table_t*
dict_load_table(const char * name,bool cached,dict_err_ignore_t ignore_err)3015 dict_load_table(
3016 	const char*	name,
3017 	bool		cached,
3018 	dict_err_ignore_t ignore_err)
3019 {
3020 	dict_names_t			fk_list;
3021 	dict_table_t*			result;
3022 	dict_names_t::iterator		i;
3023 	table_name_t			table_name;
3024 
3025 	DBUG_ENTER("dict_load_table");
3026 	DBUG_PRINT("dict_load_table", ("loading table: '%s'", name));
3027 
3028 	ut_ad(mutex_own(&dict_sys->mutex));
3029 
3030 	table_name.m_name = const_cast<char*>(name);
3031 
3032 	result = dict_table_check_if_in_cache_low(name);
3033 
3034 	if (!result) {
3035 		result = dict_load_table_one(table_name, cached, ignore_err,
3036 					     fk_list);
3037 		while (!fk_list.empty()) {
3038 			table_name_t	fk_table_name;
3039 			dict_table_t*	fk_table;
3040 
3041 			fk_table_name.m_name =
3042 				const_cast<char*>(fk_list.front());
3043 			fk_table = dict_table_check_if_in_cache_low(
3044 				fk_table_name.m_name);
3045 			if (!fk_table) {
3046 				dict_load_table_one(fk_table_name, cached,
3047 						    ignore_err, fk_list);
3048 			}
3049 			fk_list.pop_front();
3050 		}
3051 	}
3052 
3053 	DBUG_RETURN(result);
3054 }
3055 
3056 /** Opens a tablespace for dict_load_table_one()
3057 @param[in,out]	table		A table that refers to the tablespace to open
3058 @param[in]	heap		A memory heap
3059 @param[in]	ignore_err	Whether to ignore an error. */
3060 UNIV_INLINE
3061 void
dict_load_tablespace(dict_table_t * table,mem_heap_t * heap,dict_err_ignore_t ignore_err)3062 dict_load_tablespace(
3063 	dict_table_t*		table,
3064 	mem_heap_t*		heap,
3065 	dict_err_ignore_t	ignore_err)
3066 {
3067 	/* The system tablespace is always available. */
3068 	if (is_system_tablespace(table->space)) {
3069 		return;
3070 	}
3071 
3072 	if (table->flags2 & DICT_TF2_DISCARDED) {
3073 		ib::warn() << "Tablespace for table " << table->name
3074 			<< " is set as discarded.";
3075 		table->set_file_unreadable();
3076 		return;
3077 	}
3078 
3079 	if (dict_table_is_temporary(table)) {
3080 		/* Do not bother to retry opening temporary tables. */
3081 		table->set_file_unreadable();
3082 		return;
3083 	}
3084 
3085 	/* A file-per-table table name is also the tablespace name.
3086 	A general tablespace name is not the same as the table name.
3087 	Use the general tablespace name if it can be read from the
3088 	dictionary, if not use 'innodb_general_##. */
3089 	char*	shared_space_name = NULL;
3090 	char*	space_name;
3091 	if (DICT_TF_HAS_SHARED_SPACE(table->flags)) {
3092 		if (srv_sys_tablespaces_open) {
3093 			shared_space_name =
3094 				dict_space_get_name(table->space, NULL);
3095 
3096 		} else {
3097 			/* Make the temporary tablespace name. */
3098 			shared_space_name = static_cast<char*>(
3099 				ut_malloc_nokey(
3100 					strlen(general_space_name) + 20));
3101 
3102 			sprintf(shared_space_name, "%s_" ULINTPF,
3103 				general_space_name,
3104 				static_cast<ulint>(table->space));
3105 		}
3106 		space_name = shared_space_name;
3107 	} else {
3108 		space_name = table->name.m_name;
3109 	}
3110 
3111 	/* The tablespace may already be open. */
3112 	if (fil_space_for_table_exists_in_mem(
3113 		    table->space, space_name, false,
3114 		    true, heap, table->id)) {
3115 		ut_free(shared_space_name);
3116 		return;
3117 	}
3118 
3119 	if (!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)) {
3120 		ib::error() << "Failed to find tablespace for table "
3121 			<< table->name << " in the cache. Attempting"
3122 			" to load the tablespace with space id "
3123 			<< table->space;
3124 	}
3125 
3126 	/* Use the remote filepath if needed. This parameter is optional
3127 	in the call to fil_ibd_open(). If not supplied, it will be built
3128 	from the space_name. */
3129 	char* filepath = NULL;
3130 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
3131 		/* This will set table->data_dir_path from either
3132 		fil_system or SYS_DATAFILES */
3133 		dict_get_and_save_data_dir_path(table, true);
3134 
3135 		if (table->data_dir_path) {
3136 			filepath = fil_make_filepath(
3137 				table->data_dir_path,
3138 				table->name.m_name, IBD, true);
3139 		}
3140 
3141 	} else if (DICT_TF_HAS_SHARED_SPACE(table->flags)) {
3142 		/* Set table->tablespace from either
3143 		fil_system or SYS_TABLESPACES */
3144 		dict_get_and_save_space_name(table, true);
3145 
3146 		/* Set the filepath from either
3147 		fil_system or SYS_DATAFILES. */
3148 		filepath = dict_get_first_path(table->space);
3149 		if (filepath == NULL) {
3150 			ib::warn() << "Could not find the filepath"
3151 				" for table " << table->name <<
3152 				", space ID " << table->space;
3153 		}
3154 	}
3155 
3156 	/* Try to open the tablespace.  We set the 2nd param (fix_dict) to
3157 	false because we do not have an x-lock on dict_operation_lock */
3158 	ulint fsp_flags = dict_tf_to_fsp_flags(table->flags,
3159 					       false,
3160 					       dict_table_is_encrypted(table));
3161 
3162 	Keyring_encryption_info keyring_encryption_info;
3163 
3164 	dberr_t err = fil_ibd_open(
3165 		true, false, FIL_TYPE_TABLESPACE, table->space,
3166 		fsp_flags, space_name, filepath, keyring_encryption_info);
3167 
3168 	if (err != DB_SUCCESS) {
3169 		/* We failed to find a sensible tablespace file */
3170 		table->set_file_unreadable();
3171 	}
3172 
3173 	table->keyring_encryption_info = keyring_encryption_info;
3174 
3175 	ut_free(shared_space_name);
3176 	ut_free(filepath);
3177 }
3178 
3179 /** Loads a table definition and also all its index definitions.
3180 
3181 Loads those foreign key constraints whose referenced table is already in
3182 dictionary cache.  If a foreign key constraint is not loaded, then the
3183 referenced table is pushed into the output stack (fk_tables), if it is not
3184 NULL.  These tables must be subsequently loaded so that all the foreign
3185 key constraints are loaded into memory.
3186 
3187 @param[in]	name		Table name in the db/tablename format
3188 @param[in]	cached		true=add to cache, false=do not
3189 @param[in]	ignore_err	Error to be ignored when loading table
3190 				and its index definition
3191 @param[out]	fk_tables	Related table names that must also be
3192 				loaded to ensure that all foreign key
3193 				constraints are loaded.
3194 @return table, NULL if does not exist; if the table is stored in an
3195 .ibd file, but the file does not exist, then we set the
3196 file_unreadable flag TRUE in the table object we return */
3197 static
3198 dict_table_t*
dict_load_table_one(table_name_t & name,bool cached,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)3199 dict_load_table_one(
3200 	table_name_t&		name,
3201 	bool			cached,
3202 	dict_err_ignore_t	ignore_err,
3203 	dict_names_t&		fk_tables)
3204 {
3205 	dberr_t		err;
3206 	dict_table_t*	table;
3207 	dict_table_t*	sys_tables;
3208 	btr_pcur_t	pcur;
3209 	dict_index_t*	sys_index;
3210 	dtuple_t*	tuple;
3211 	mem_heap_t*	heap;
3212 	dfield_t*	dfield;
3213 	const rec_t*	rec;
3214 	const byte*	field;
3215 	ulint		len;
3216 	const char*	err_msg;
3217 	mtr_t		mtr;
3218 
3219 	DBUG_ENTER("dict_load_table_one");
3220 	DBUG_PRINT("dict_load_table_one", ("table: %s", name.m_name));
3221 
3222 	ut_ad(mutex_own(&dict_sys->mutex));
3223 
3224 	heap = mem_heap_create(32000);
3225 
3226 	mtr_start(&mtr);
3227 
3228 	sys_tables = dict_table_get_low("SYS_TABLES");
3229 	sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
3230 	ut_ad(!dict_table_is_comp(sys_tables));
3231 	ut_ad(name_of_col_is(sys_tables, sys_index,
3232 			     DICT_FLD__SYS_TABLES__ID, "ID"));
3233 	ut_ad(name_of_col_is(sys_tables, sys_index,
3234 			     DICT_FLD__SYS_TABLES__N_COLS, "N_COLS"));
3235 	ut_ad(name_of_col_is(sys_tables, sys_index,
3236 			     DICT_FLD__SYS_TABLES__TYPE, "TYPE"));
3237 	ut_ad(name_of_col_is(sys_tables, sys_index,
3238 			     DICT_FLD__SYS_TABLES__MIX_LEN, "MIX_LEN"));
3239 	ut_ad(name_of_col_is(sys_tables, sys_index,
3240 			     DICT_FLD__SYS_TABLES__SPACE, "SPACE"));
3241 
3242 	tuple = dtuple_create(heap, 1);
3243 	dfield = dtuple_get_nth_field(tuple, 0);
3244 
3245 	dfield_set_data(dfield, name.m_name, ut_strlen(name.m_name));
3246 	dict_index_copy_types(tuple, sys_index, 1);
3247 
3248 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3249 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3250 	rec = btr_pcur_get_rec(&pcur);
3251 
3252 	if (!btr_pcur_is_on_user_rec(&pcur)
3253 	    || rec_get_deleted_flag(rec, 0)) {
3254 		/* Not found */
3255 err_exit:
3256 		btr_pcur_close(&pcur);
3257 		mtr_commit(&mtr);
3258 		mem_heap_free(heap);
3259 
3260 		DBUG_RETURN(NULL);
3261 	}
3262 
3263 	field = rec_get_nth_field_old(
3264 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
3265 
3266 	/* Check if the table name in record is the searched one */
3267 	if (len != ut_strlen(name.m_name)
3268 	    || 0 != ut_memcmp(name.m_name, field, len)) {
3269 
3270 		goto err_exit;
3271 	}
3272 
3273 	err_msg = dict_load_table_low(name, rec, &table);
3274 
3275 	if (err_msg) {
3276 
3277 		ib::error() << err_msg;
3278 		goto err_exit;
3279 	}
3280 
3281 	btr_pcur_close(&pcur);
3282 	mtr_commit(&mtr);
3283 
3284 	dict_load_tablespace(table, heap, ignore_err);
3285 
3286 	dict_load_columns(table, heap);
3287 
3288 	dict_load_virtual(table, heap);
3289 
3290 	if (cached) {
3291 		dict_table_add_to_cache(table, TRUE, heap);
3292 	} else {
3293 		dict_table_add_system_columns(table, heap);
3294 	}
3295 
3296 	mem_heap_empty(heap);
3297 
3298 	/* If there is no tablespace for the table then we only need to
3299 	load the index definitions. So that we can IMPORT the tablespace
3300 	later. When recovering table locks for resurrected incomplete
3301 	transactions, the tablespace should exist, because DDL operations
3302 	were not allowed while the table is being locked by a transaction. */
3303 	dict_err_ignore_t index_load_err =
3304 		!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
3305 		&& table->file_unreadable
3306 		? DICT_ERR_IGNORE_ALL
3307 		: ignore_err;
3308 	err = dict_load_indexes(table, heap, index_load_err);
3309 
3310 	if (err == DB_INDEX_CORRUPT) {
3311 		/* Refuse to load the table if the table has a corrupted
3312 		cluster index */
3313 		if (!srv_load_corrupted) {
3314 
3315 			ib::error() << "Load table " << table->name
3316 				<< " failed, the table has"
3317 				" corrupted clustered indexes. Turn on"
3318 				" 'innodb_force_load_corrupted' to drop it";
3319 			dict_table_remove_from_cache(table);
3320 			table = NULL;
3321 			goto func_exit;
3322 		} else {
3323 			dict_index_t*	clust_index;
3324 			clust_index = dict_table_get_first_index(table);
3325 
3326 			if (dict_index_is_corrupted(clust_index)) {
3327 				table->corrupted = TRUE;
3328 			}
3329 		}
3330 	}
3331 
3332 	/* We don't trust the table->flags2(retrieved from SYS_TABLES.MIX_LEN
3333 	field) if the datafiles are from 3.23.52 version. To identify this
3334 	version, we do the below check and reset the flags. */
3335 	if (!DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)
3336 	    && table->space == srv_sys_space.space_id()
3337 	    && table->flags == 0) {
3338 		table->flags2 = 0;
3339 	}
3340 
3341 	DBUG_EXECUTE_IF("ib_table_invalid_flags",
3342 			if(strcmp(table->name.m_name, "test/t1") == 0) {
3343 				table->flags2 = 255;
3344 				table->flags = 255;
3345 			});
3346 
3347 	if (!dict_tf2_is_valid(table->flags, table->flags2)) {
3348 		ib::error() << "Table " << table->name << " in InnoDB"
3349 			" data dictionary contains invalid flags."
3350 			" SYS_TABLES.MIX_LEN=" << table->flags2;
3351 		table->flags2 &= ~(DICT_TF2_TEMPORARY|DICT_TF2_INTRINSIC);
3352 		dict_table_remove_from_cache(table);
3353 		table = NULL;
3354 		err = DB_FAIL;
3355 		goto func_exit;
3356 	}
3357 
3358 	/* Initialize table foreign_child value. Its value could be
3359 	changed when dict_load_foreigns() is called below */
3360 	table->fk_max_recusive_level = 0;
3361 
3362 	/* If the force recovery flag is set, we open the table irrespective
3363 	of the error condition, since the user may want to dump data from the
3364 	clustered index. However we load the foreign key information only if
3365 	all indexes were loaded. */
3366 	if (!cached || table->file_unreadable) {
3367 		/* Don't attempt to load the indexes from disk. */
3368 	} else if (err == DB_SUCCESS) {
3369 		err = dict_load_foreigns(table->name.m_name, NULL,
3370 					 true, true,
3371 					 ignore_err, fk_tables);
3372 
3373 		if (err != DB_SUCCESS) {
3374 			ib::warn() << "Load table " << table->name
3375 				<< " failed, the table has missing"
3376 				" foreign key indexes. Turn off"
3377 				" 'foreign_key_checks' and try again.";
3378 
3379 			dict_table_remove_from_cache(table);
3380 			table = NULL;
3381 		} else {
3382 			dict_mem_table_fill_foreign_vcol_set(table);
3383 			table->fk_max_recusive_level = 0;
3384 		}
3385 	} else {
3386 		dict_index_t*   index;
3387 
3388 		/* Make sure that at least the clustered index was loaded.
3389 		Otherwise refuse to load the table */
3390 		index = dict_table_get_first_index(table);
3391 
3392 		if (!srv_force_recovery
3393 		    || !index
3394 		    || !dict_index_is_clust(index)) {
3395 
3396 			dict_table_remove_from_cache(table);
3397 			table = NULL;
3398 
3399 		} else if (dict_index_is_corrupted(index)
3400 			   && !table->file_unreadable) {
3401 
3402 			/* It is possible we force to load a corrupted
3403 			clustered index if srv_load_corrupted is set.
3404 			Mark the table as corrupted in this case */
3405 			table->corrupted = TRUE;
3406 		}
3407 	}
3408 
3409 func_exit:
3410 	mem_heap_free(heap);
3411 
3412 	ut_ad(!table
3413 	      || ignore_err != DICT_ERR_IGNORE_NONE
3414 	      || table->file_unreadable
3415 	      || !table->corrupted);
3416 
3417 	if (table && table->fts) {
3418 		if (!(dict_table_has_fts_index(table)
3419 		      || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)
3420 		      || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID))) {
3421 			/* the table->fts could be created in dict_load_column
3422 			when a user defined FTS_DOC_ID is present, but no
3423 			FTS */
3424 			fts_optimize_remove_table(table);
3425 			fts_free(table);
3426 		} else {
3427 			fts_optimize_add_table(table);
3428 		}
3429 	}
3430 
3431 	ut_ad(err != DB_SUCCESS || dict_foreign_set_validate(*table));
3432 
3433 	DBUG_RETURN(table);
3434 }
3435 
3436 /***********************************************************************//**
3437 Loads a table object based on the table id.
3438 @return table; NULL if table does not exist */
3439 dict_table_t*
dict_load_table_on_id(table_id_t table_id,dict_err_ignore_t ignore_err)3440 dict_load_table_on_id(
3441 /*==================*/
3442 	table_id_t		table_id,	/*!< in: table id */
3443 	dict_err_ignore_t	ignore_err)	/*!< in: errors to ignore
3444 						when loading the table */
3445 {
3446 	byte		id_buf[8];
3447 	btr_pcur_t	pcur;
3448 	mem_heap_t*	heap;
3449 	dtuple_t*	tuple;
3450 	dfield_t*	dfield;
3451 	dict_index_t*	sys_table_ids;
3452 	dict_table_t*	sys_tables;
3453 	const rec_t*	rec;
3454 	const byte*	field;
3455 	ulint		len;
3456 	dict_table_t*	table;
3457 	mtr_t		mtr;
3458 
3459 	ut_ad(mutex_own(&dict_sys->mutex));
3460 
3461 	table = NULL;
3462 
3463 	/* NOTE that the operation of this function is protected by
3464 	the dictionary mutex, and therefore no deadlocks can occur
3465 	with other dictionary operations. */
3466 
3467 	mtr_start(&mtr);
3468 	/*---------------------------------------------------*/
3469 	/* Get the secondary index based on ID for table SYS_TABLES */
3470 	sys_tables = dict_sys->sys_tables;
3471 	sys_table_ids = dict_table_get_next_index(
3472 		dict_table_get_first_index(sys_tables));
3473 	ut_ad(!dict_table_is_comp(sys_tables));
3474 	ut_ad(!dict_index_is_clust(sys_table_ids));
3475 	heap = mem_heap_create(256);
3476 
3477 	tuple  = dtuple_create(heap, 1);
3478 	dfield = dtuple_get_nth_field(tuple, 0);
3479 
3480 	/* Write the table id in byte format to id_buf */
3481 	mach_write_to_8(id_buf, table_id);
3482 
3483 	dfield_set_data(dfield, id_buf, 8);
3484 	dict_index_copy_types(tuple, sys_table_ids, 1);
3485 
3486 	btr_pcur_open_on_user_rec(sys_table_ids, tuple, PAGE_CUR_GE,
3487 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3488 
3489 	rec = btr_pcur_get_rec(&pcur);
3490 
3491 	if (page_rec_is_user_rec(rec)) {
3492 		/*---------------------------------------------------*/
3493 		/* Now we have the record in the secondary index
3494 		containing the table ID and NAME */
3495 check_rec:
3496 		field = rec_get_nth_field_old(
3497 			rec, DICT_FLD__SYS_TABLE_IDS__ID, &len);
3498 		ut_ad(len == 8);
3499 
3500 		/* Check if the table id in record is the one searched for */
3501 		if (table_id == mach_read_from_8(field)) {
3502 			if (rec_get_deleted_flag(rec, 0)) {
3503 				/* Until purge has completed, there
3504 				may be delete-marked duplicate records
3505 				for the same SYS_TABLES.ID, but different
3506 				SYS_TABLES.NAME. */
3507 				while (btr_pcur_move_to_next(&pcur, &mtr)) {
3508 					rec = btr_pcur_get_rec(&pcur);
3509 
3510 					if (page_rec_is_user_rec(rec)) {
3511 						goto check_rec;
3512 					}
3513 				}
3514 			} else {
3515 				/* Now we get the table name from the record */
3516 				field = rec_get_nth_field_old(rec,
3517 					DICT_FLD__SYS_TABLE_IDS__NAME, &len);
3518 				/* Load the table definition to memory */
3519 				char*	table_name = mem_heap_strdupl(
3520 					heap, (char*) field, len);
3521 				table = dict_load_table(table_name, true, ignore_err);
3522 			}
3523 		}
3524 	}
3525 
3526 	btr_pcur_close(&pcur);
3527 	mtr_commit(&mtr);
3528 	mem_heap_free(heap);
3529 
3530 	return(table);
3531 }
3532 
3533 /********************************************************************//**
3534 This function is called when the database is booted. Loads system table
3535 index definitions except for the clustered index which is added to the
3536 dictionary cache at booting before calling this function. */
3537 void
dict_load_sys_table(dict_table_t * table)3538 dict_load_sys_table(
3539 /*================*/
3540 	dict_table_t*	table)	/*!< in: system table */
3541 {
3542 	mem_heap_t*	heap;
3543 
3544 	ut_ad(mutex_own(&dict_sys->mutex));
3545 
3546 	heap = mem_heap_create(1000);
3547 
3548 	dict_load_indexes(table, heap, DICT_ERR_IGNORE_NONE);
3549 
3550 	mem_heap_free(heap);
3551 }
3552 
3553 /********************************************************************//**
3554 Loads foreign key constraint col names (also for the referenced table).
3555 Members that must be set (and valid) in foreign:
3556 foreign->heap
3557 foreign->n_fields
3558 foreign->id ('\0'-terminated)
3559 Members that will be created and set by this function:
3560 foreign->foreign_col_names[i]
3561 foreign->referenced_col_names[i]
3562 (for i=0..foreign->n_fields-1) */
3563 static
3564 void
dict_load_foreign_cols(dict_foreign_t * foreign)3565 dict_load_foreign_cols(
3566 /*===================*/
3567 	dict_foreign_t*	foreign)/*!< in/out: foreign constraint object */
3568 {
3569 	dict_table_t*	sys_foreign_cols;
3570 	dict_index_t*	sys_index;
3571 	btr_pcur_t	pcur;
3572 	dtuple_t*	tuple;
3573 	dfield_t*	dfield;
3574 	const rec_t*	rec;
3575 	const byte*	field;
3576 	ulint		len;
3577 	ulint		i;
3578 	mtr_t		mtr;
3579 	size_t		id_len;
3580 
3581 	ut_ad(mutex_own(&dict_sys->mutex));
3582 
3583 	id_len = strlen(foreign->id);
3584 
3585 	foreign->foreign_col_names = static_cast<const char**>(
3586 		mem_heap_alloc(foreign->heap,
3587 			       foreign->n_fields * sizeof(void*)));
3588 
3589 	foreign->referenced_col_names = static_cast<const char**>(
3590 		mem_heap_alloc(foreign->heap,
3591 			       foreign->n_fields * sizeof(void*)));
3592 
3593 	mtr_start(&mtr);
3594 
3595 	sys_foreign_cols = dict_table_get_low("SYS_FOREIGN_COLS");
3596 
3597 	sys_index = UT_LIST_GET_FIRST(sys_foreign_cols->indexes);
3598 	ut_ad(!dict_table_is_comp(sys_foreign_cols));
3599 
3600 	tuple = dtuple_create(foreign->heap, 1);
3601 	dfield = dtuple_get_nth_field(tuple, 0);
3602 
3603 	dfield_set_data(dfield, foreign->id, id_len);
3604 	dict_index_copy_types(tuple, sys_index, 1);
3605 
3606 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3607 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3608 	for (i = 0; i < foreign->n_fields; i++) {
3609 
3610 		rec = btr_pcur_get_rec(&pcur);
3611 
3612 		ut_a(btr_pcur_is_on_user_rec(&pcur));
3613 		ut_a(!rec_get_deleted_flag(rec, 0));
3614 
3615 		field = rec_get_nth_field_old(
3616 			rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
3617 
3618 		if (len != id_len || ut_memcmp(foreign->id, field, len) != 0) {
3619 			const rec_t*	pos;
3620 			ulint		pos_len;
3621 			const rec_t*	for_col_name;
3622 			ulint		for_col_name_len;
3623 			const rec_t*	ref_col_name;
3624 			ulint		ref_col_name_len;
3625 
3626 			pos = rec_get_nth_field_old(
3627 				rec, DICT_FLD__SYS_FOREIGN_COLS__POS,
3628 				&pos_len);
3629 
3630 			for_col_name = rec_get_nth_field_old(
3631 				rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME,
3632 				&for_col_name_len);
3633 
3634 			ref_col_name = rec_get_nth_field_old(
3635 				rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME,
3636 				&ref_col_name_len);
3637 
3638 			ib::fatal	sout;
3639 
3640 			sout << "Unable to load column names for foreign"
3641 				" key '" << foreign->id
3642 				<< "' because it was not found in"
3643 				" InnoDB internal table SYS_FOREIGN_COLS. The"
3644 				" closest entry we found is:"
3645 				" (ID='";
3646 			sout.write(field, len);
3647 			sout << "', POS=" << mach_read_from_4(pos)
3648 				<< ", FOR_COL_NAME='";
3649 			sout.write(for_col_name, for_col_name_len);
3650 			sout << "', REF_COL_NAME='";
3651 			sout.write(ref_col_name, ref_col_name_len);
3652 			sout << "')";
3653 		}
3654 
3655 		field = rec_get_nth_field_old(
3656 			rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
3657 		ut_a(len == 4);
3658 		ut_a(i == mach_read_from_4(field));
3659 
3660 		field = rec_get_nth_field_old(
3661 			rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
3662 		foreign->foreign_col_names[i] = mem_heap_strdupl(
3663 			foreign->heap, (char*) field, len);
3664 
3665 		field = rec_get_nth_field_old(
3666 			rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
3667 		foreign->referenced_col_names[i] = mem_heap_strdupl(
3668 			foreign->heap, (char*) field, len);
3669 
3670 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
3671 	}
3672 
3673 	btr_pcur_close(&pcur);
3674 	mtr_commit(&mtr);
3675 }
3676 
3677 /***********************************************************************//**
3678 Loads a foreign key constraint to the dictionary cache. If the referenced
3679 table is not yet loaded, it is added in the output parameter (fk_tables).
3680 @return DB_SUCCESS or error code */
3681 static MY_ATTRIBUTE((nonnull(1), warn_unused_result))
3682 dberr_t
dict_load_foreign(const char * id,const char ** col_names,bool check_recursive,bool check_charsets,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)3683 dict_load_foreign(
3684 /*==============*/
3685 	const char*		id,
3686 				/*!< in: foreign constraint id, must be
3687 				'\0'-terminated */
3688 	const char**		col_names,
3689 				/*!< in: column names, or NULL
3690 				to use foreign->foreign_table->col_names */
3691 	bool			check_recursive,
3692 				/*!< in: whether to record the foreign table
3693 				parent count to avoid unlimited recursive
3694 				load of chained foreign tables */
3695 	bool			check_charsets,
3696 				/*!< in: whether to check charset
3697 				compatibility */
3698 	dict_err_ignore_t	ignore_err,
3699 				/*!< in: error to be ignored */
3700 	dict_names_t&	fk_tables)
3701 				/*!< out: the foreign key constraint is added
3702 				to the dictionary cache only if the referenced
3703 				table is already in cache.  Otherwise, the
3704 				foreign key constraint is not added to cache,
3705 				and the referenced table is added to this
3706 				stack. */
3707 {
3708 	dict_foreign_t*	foreign;
3709 	dict_table_t*	sys_foreign;
3710 	btr_pcur_t	pcur;
3711 	dict_index_t*	sys_index;
3712 	dtuple_t*	tuple;
3713 	mem_heap_t*	heap2;
3714 	dfield_t*	dfield;
3715 	const rec_t*	rec;
3716 	const byte*	field;
3717 	ulint		len;
3718 	ulint		n_fields_and_type;
3719 	mtr_t		mtr;
3720 	dict_table_t*	for_table;
3721 	dict_table_t*	ref_table;
3722 	size_t		id_len;
3723 
3724 	DBUG_ENTER("dict_load_foreign");
3725 	DBUG_PRINT("dict_load_foreign",
3726 		   ("id: '%s', check_recursive: %d", id, check_recursive));
3727 
3728 	ut_ad(mutex_own(&dict_sys->mutex));
3729 
3730 	id_len = strlen(id);
3731 
3732 	heap2 = mem_heap_create(1000);
3733 
3734 	mtr_start(&mtr);
3735 
3736 	sys_foreign = dict_table_get_low("SYS_FOREIGN");
3737 
3738 	sys_index = UT_LIST_GET_FIRST(sys_foreign->indexes);
3739 	ut_ad(!dict_table_is_comp(sys_foreign));
3740 
3741 	tuple = dtuple_create(heap2, 1);
3742 	dfield = dtuple_get_nth_field(tuple, 0);
3743 
3744 	dfield_set_data(dfield, id, id_len);
3745 	dict_index_copy_types(tuple, sys_index, 1);
3746 
3747 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3748 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3749 	rec = btr_pcur_get_rec(&pcur);
3750 
3751 	if (!btr_pcur_is_on_user_rec(&pcur)
3752 	    || rec_get_deleted_flag(rec, 0)) {
3753 		/* Not found */
3754 
3755 		ib::error() << "Cannot load foreign constraint " << id
3756 			<< ": could not find the relevant record in "
3757 			<< "SYS_FOREIGN";
3758 
3759 		btr_pcur_close(&pcur);
3760 		mtr_commit(&mtr);
3761 		mem_heap_free(heap2);
3762 
3763 		DBUG_RETURN(DB_ERROR);
3764 	}
3765 
3766 	field = rec_get_nth_field_old(rec, DICT_FLD__SYS_FOREIGN__ID, &len);
3767 
3768 	/* Check if the id in record is the searched one */
3769 	if (len != id_len || ut_memcmp(id, field, len) != 0) {
3770 
3771 		{
3772 			ib::error	err;
3773 			err << "Cannot load foreign constraint " << id
3774 				<< ": found ";
3775 			err.write(field, len);
3776 			err << " instead in SYS_FOREIGN";
3777 		}
3778 
3779 		btr_pcur_close(&pcur);
3780 		mtr_commit(&mtr);
3781 		mem_heap_free(heap2);
3782 
3783 		DBUG_RETURN(DB_ERROR);
3784 	}
3785 
3786 	/* Read the table names and the number of columns associated
3787 	with the constraint */
3788 
3789 	mem_heap_free(heap2);
3790 
3791 	foreign = dict_mem_foreign_create();
3792 
3793 	n_fields_and_type = mach_read_from_4(
3794 		rec_get_nth_field_old(
3795 			rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len));
3796 
3797 	ut_a(len == 4);
3798 
3799 	/* We store the type in the bits 24..29 of n_fields_and_type. */
3800 
3801 	foreign->type = (unsigned int) (n_fields_and_type >> 24);
3802 	foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL);
3803 
3804 	foreign->id = mem_heap_strdupl(foreign->heap, id, id_len);
3805 
3806 	field = rec_get_nth_field_old(
3807 		rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
3808 
3809 	foreign->foreign_table_name = mem_heap_strdupl(
3810 		foreign->heap, (char*) field, len);
3811 	dict_mem_foreign_table_name_lookup_set(foreign, TRUE);
3812 
3813 	const ulint foreign_table_name_len = len;
3814 
3815 	field = rec_get_nth_field_old(
3816 		rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
3817 	foreign->referenced_table_name = mem_heap_strdupl(
3818 		foreign->heap, (char*) field, len);
3819 	dict_mem_referenced_table_name_lookup_set(foreign, TRUE);
3820 
3821 	btr_pcur_close(&pcur);
3822 	mtr_commit(&mtr);
3823 
3824 	dict_load_foreign_cols(foreign);
3825 
3826 	ref_table = dict_table_check_if_in_cache_low(
3827 		foreign->referenced_table_name_lookup);
3828 	for_table = dict_table_check_if_in_cache_low(
3829 		foreign->foreign_table_name_lookup);
3830 
3831 	if (!for_table) {
3832 		/* To avoid recursively loading the tables related through
3833 		the foreign key constraints, the child table name is saved
3834 		here.  The child table will be loaded later, along with its
3835 		foreign key constraint. */
3836 
3837 		lint	old_size = mem_heap_get_size(ref_table->heap);
3838 
3839 		ut_a(ref_table != NULL);
3840 		fk_tables.push_back(
3841 			mem_heap_strdupl(ref_table->heap,
3842 					 foreign->foreign_table_name_lookup,
3843 					 foreign_table_name_len));
3844 
3845 		lint	new_size = mem_heap_get_size(ref_table->heap);
3846 		dict_sys->size += new_size - old_size;
3847 
3848 		dict_foreign_remove_from_cache(foreign);
3849 		DBUG_RETURN(DB_SUCCESS);
3850 	}
3851 
3852 	ut_a(for_table || ref_table);
3853 
3854 	/* Note that there may already be a foreign constraint object in
3855 	the dictionary cache for this constraint: then the following
3856 	call only sets the pointers in it to point to the appropriate table
3857 	and index objects and frees the newly created object foreign.
3858 	Adding to the cache should always succeed since we are not creating
3859 	a new foreign key constraint but loading one from the data
3860 	dictionary. */
3861 
3862 	DBUG_RETURN(dict_foreign_add_to_cache(foreign, col_names,
3863 					      check_charsets,
3864 					      ignore_err));
3865 }
3866 
3867 /***********************************************************************//**
3868 Loads foreign key constraints where the table is either the foreign key
3869 holder or where the table is referenced by a foreign key. Adds these
3870 constraints to the data dictionary.
3871 
3872 The foreign key constraint is loaded only if the referenced table is also
3873 in the dictionary cache.  If the referenced table is not in dictionary
3874 cache, then it is added to the output parameter (fk_tables).
3875 
3876 @return DB_SUCCESS or error code */
3877 dberr_t
dict_load_foreigns(const char * table_name,const char ** col_names,bool check_recursive,bool check_charsets,dict_err_ignore_t ignore_err,dict_names_t & fk_tables)3878 dict_load_foreigns(
3879 /*===============*/
3880 	const char*		table_name,	/*!< in: table name */
3881 	const char**		col_names,	/*!< in: column names, or NULL
3882 						to use table->col_names */
3883 	bool			check_recursive,/*!< in: Whether to check
3884 						recursive load of tables
3885 						chained by FK */
3886 	bool			check_charsets,	/*!< in: whether to check
3887 						charset compatibility */
3888 	dict_err_ignore_t	ignore_err,	/*!< in: error to be ignored */
3889 	dict_names_t&		fk_tables)
3890 						/*!< out: stack of table
3891 						names which must be loaded
3892 						subsequently to load all the
3893 						foreign key constraints. */
3894 {
3895 	ulint		tuple_buf[(DTUPLE_EST_ALLOC(1) + sizeof(ulint) - 1)
3896 				/ sizeof(ulint)];
3897 	btr_pcur_t	pcur;
3898 	dtuple_t*	tuple;
3899 	dfield_t*	dfield;
3900 	dict_index_t*	sec_index;
3901 	dict_table_t*	sys_foreign;
3902 	const rec_t*	rec;
3903 	const byte*	field;
3904 	ulint		len;
3905 	dberr_t		err;
3906 	mtr_t		mtr;
3907 
3908 	DBUG_ENTER("dict_load_foreigns");
3909 
3910 	ut_ad(mutex_own(&dict_sys->mutex));
3911 
3912 	sys_foreign = dict_table_get_low("SYS_FOREIGN");
3913 
3914 	if (sys_foreign == NULL) {
3915 		/* No foreign keys defined yet in this database */
3916 
3917 		ib::info() << "No foreign key system tables in the database";
3918 		DBUG_RETURN(DB_ERROR);
3919 	}
3920 
3921 	ut_ad(!dict_table_is_comp(sys_foreign));
3922 	mtr_start(&mtr);
3923 
3924 	/* Get the secondary index based on FOR_NAME from table
3925 	SYS_FOREIGN */
3926 
3927 	sec_index = dict_table_get_next_index(
3928 		dict_table_get_first_index(sys_foreign));
3929 	ut_ad(!dict_index_is_clust(sec_index));
3930 start_load:
3931 
3932 	tuple = dtuple_create_from_mem(tuple_buf, sizeof(tuple_buf), 1, 0);
3933 	dfield = dtuple_get_nth_field(tuple, 0);
3934 
3935 	dfield_set_data(dfield, table_name, ut_strlen(table_name));
3936 	dict_index_copy_types(tuple, sec_index, 1);
3937 
3938 	btr_pcur_open_on_user_rec(sec_index, tuple, PAGE_CUR_GE,
3939 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3940 loop:
3941 	rec = btr_pcur_get_rec(&pcur);
3942 
3943 	if (!btr_pcur_is_on_user_rec(&pcur)) {
3944 		/* End of index */
3945 
3946 		goto load_next_index;
3947 	}
3948 
3949 	/* Now we have the record in the secondary index containing a table
3950 	name and a foreign constraint ID */
3951 
3952 	field = rec_get_nth_field_old(
3953 		rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__NAME, &len);
3954 
3955 	/* Check if the table name in the record is the one searched for; the
3956 	following call does the comparison in the latin1_swedish_ci
3957 	charset-collation, in a case-insensitive way. */
3958 
3959 	if (0 != cmp_data_data(dfield_get_type(dfield)->mtype,
3960 			       dfield_get_type(dfield)->prtype,
3961 			       static_cast<const byte*>(
3962 				       dfield_get_data(dfield)),
3963 			       dfield_get_len(dfield),
3964 			       field, len)) {
3965 
3966 		goto load_next_index;
3967 	}
3968 
3969 	/* Since table names in SYS_FOREIGN are stored in a case-insensitive
3970 	order, we have to check that the table name matches also in a binary
3971 	string comparison. On Unix, MySQL allows table names that only differ
3972 	in character case.  If lower_case_table_names=2 then what is stored
3973 	may not be the same case, but the previous comparison showed that they
3974 	match with no-case.  */
3975 
3976 	if (rec_get_deleted_flag(rec, 0)) {
3977 		goto next_rec;
3978 	}
3979 
3980 	if ((innobase_get_lower_case_table_names() != 2)
3981 	    && (0 != ut_memcmp(field, table_name, len))) {
3982 		goto next_rec;
3983 	}
3984 
3985 	/* Now we get a foreign key constraint id */
3986 	field = rec_get_nth_field_old(
3987 		rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__ID, &len);
3988 
3989 	/* Copy the string because the page may be modified or evicted
3990 	after mtr_commit() below. */
3991 	char	fk_id[MAX_TABLE_NAME_LEN + 1];
3992 
3993 	ut_a(len <= MAX_TABLE_NAME_LEN);
3994 	memcpy(fk_id, field, len);
3995 	fk_id[len] = '\0';
3996 
3997 	btr_pcur_store_position(&pcur, &mtr);
3998 
3999 	mtr_commit(&mtr);
4000 
4001 	/* Load the foreign constraint definition to the dictionary cache */
4002 
4003 	err = dict_load_foreign(fk_id, col_names,
4004 				check_recursive, check_charsets, ignore_err,
4005 				fk_tables);
4006 
4007 	if (err != DB_SUCCESS) {
4008 		btr_pcur_close(&pcur);
4009 
4010 		DBUG_RETURN(err);
4011 	}
4012 
4013 	mtr_start(&mtr);
4014 
4015 	btr_pcur_restore_position(BTR_SEARCH_LEAF, &pcur, &mtr);
4016 next_rec:
4017 	btr_pcur_move_to_next_user_rec(&pcur, &mtr);
4018 
4019 	goto loop;
4020 
4021 load_next_index:
4022 	btr_pcur_close(&pcur);
4023 	mtr_commit(&mtr);
4024 
4025 	sec_index = dict_table_get_next_index(sec_index);
4026 
4027 	if (sec_index != NULL) {
4028 
4029 		mtr_start(&mtr);
4030 
4031 		/* Switch to scan index on REF_NAME, fk_max_recusive_level
4032 		already been updated when scanning FOR_NAME index, no need to
4033 		update again */
4034 		check_recursive = FALSE;
4035 
4036 		goto start_load;
4037 	}
4038 
4039 	DBUG_RETURN(DB_SUCCESS);
4040 }
4041 
4042 /***********************************************************************//**
4043 Loads a table id based on the index id.
4044 @return true if found */
4045 static
4046 bool
dict_load_table_id_on_index_id(index_id_t index_id,table_id_t * table_id)4047 dict_load_table_id_on_index_id(
4048 /*===========================*/
4049         index_id_t              index_id,  /*!< in: index id */
4050         table_id_t*             table_id) /*!< out: table id */
4051 {
4052         /* check hard coded indexes */
4053         switch(index_id) {
4054         case DICT_TABLES_ID:
4055         case DICT_COLUMNS_ID:
4056         case DICT_INDEXES_ID:
4057         case DICT_FIELDS_ID:
4058                 *table_id = index_id;
4059                 return true;
4060         case DICT_TABLE_IDS_ID:
4061                 /* The following is a secondary index on SYS_TABLES */
4062                 *table_id = DICT_TABLES_ID;
4063                 return true;
4064         }
4065 
4066         bool            found = false;
4067         mtr_t           mtr;
4068 
4069         ut_ad(mutex_own(&(dict_sys->mutex)));
4070 
4071         /* NOTE that the operation of this function is protected by
4072         the dictionary mutex, and therefore no deadlocks can occur
4073         with other dictionary operations. */
4074 
4075         mtr_start(&mtr);
4076 
4077         btr_pcur_t pcur;
4078         const rec_t* rec = dict_startscan_system(&pcur, &mtr, SYS_INDEXES);
4079 
4080         while (rec) {
4081                 ulint len;
4082                 const byte* field = rec_get_nth_field_old(
4083                         rec, DICT_FLD__SYS_INDEXES__ID, &len);
4084                 ut_ad(len == 8);
4085 
4086                 /* Check if the index id is the one searched for */
4087                 if (index_id == mach_read_from_8(field)) {
4088                         found = true;
4089                         /* Now we get the table id */
4090                         const byte* field = rec_get_nth_field_old(
4091                                 rec,
4092                                 DICT_FLD__SYS_INDEXES__TABLE_ID,
4093                                 &len);
4094                         *table_id = mach_read_from_8(field);
4095                         break;
4096                 }
4097                 mtr_commit(&mtr);
4098                 mtr_start(&mtr);
4099                 rec = dict_getnext_system(&pcur, &mtr);
4100         }
4101 
4102         btr_pcur_close(&pcur);
4103         mtr_commit(&mtr);
4104 
4105         return(found);
4106 }
4107 
4108 dict_table_t*
dict_table_open_on_index_id(index_id_t index_id,bool dict_locked)4109 dict_table_open_on_index_id(
4110 /*========================*/
4111         index_id_t index_id,    /*!< in: index id */
4112         bool dict_locked)       /*!< in: dict locked */
4113 {
4114         if (!dict_locked) {
4115                 mutex_enter(&dict_sys->mutex);
4116         }
4117 
4118         ut_ad(mutex_own(&dict_sys->mutex));
4119         table_id_t table_id;
4120         dict_table_t * table = NULL;
4121         if (dict_load_table_id_on_index_id(index_id, &table_id)) {
4122                 bool local_dict_locked = true;
4123                 table = dict_table_open_on_id(table_id,
4124                                               local_dict_locked,
4125                                               DICT_TABLE_OP_LOAD_TABLESPACE);
4126         }
4127 
4128         if (!dict_locked) {
4129                 mutex_exit(&dict_sys->mutex);
4130         }
4131         return table;
4132 }
4133 
4134