1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file dict/dict0load.cc
29 Loads to the memory cache database object definitions
30 from dictionary tables
31 
32 Created 4/24/1996 Heikki Tuuri
33 *******************************************************/
34 
35 #include "dict0load.h"
36 #include "mysql_version.h"
37 
38 #ifdef UNIV_NONINL
39 #include "dict0load.ic"
40 #endif
41 
42 #include "btr0pcur.h"
43 #include "btr0btr.h"
44 #include "page0page.h"
45 #include "mach0data.h"
46 #include "dict0dict.h"
47 #include "dict0boot.h"
48 #include "dict0stats.h"
49 #include "rem0cmp.h"
50 #include "srv0start.h"
51 #include "srv0srv.h"
52 #include "dict0crea.h"
53 #include "dict0priv.h"
54 #include "ha_prototypes.h" /* innobase_casedn_str() */
55 #include "fts0priv.h"
56 
57 /** Following are the InnoDB system tables. The positions in
58 this array are referenced by enum dict_system_table_id. */
59 static const char* SYSTEM_TABLE_NAME[] = {
60 	"SYS_TABLES",
61 	"SYS_INDEXES",
62 	"SYS_COLUMNS",
63 	"SYS_FIELDS",
64 	"SYS_FOREIGN",
65 	"SYS_FOREIGN_COLS",
66 	"SYS_TABLESPACES",
67 	"SYS_DATAFILES",
68 	"SYS_ZIP_DICT",
69 	"SYS_ZIP_DICT_COLS"
70 };
71 
72 /* If this flag is TRUE, then we will load the cluster index's (and tables')
73 metadata even if it is marked as "corrupted". */
74 UNIV_INTERN my_bool     srv_load_corrupted = FALSE;
75 
76 #ifdef UNIV_DEBUG
77 /****************************************************************//**
78 Compare the name of an index column.
79 @return	TRUE if the i'th column of index is 'name'. */
80 static
81 ibool
name_of_col_is(const dict_table_t * table,const dict_index_t * index,ulint i,const char * name)82 name_of_col_is(
83 /*===========*/
84 	const dict_table_t*	table,	/*!< in: table */
85 	const dict_index_t*	index,	/*!< in: index */
86 	ulint			i,	/*!< in: index field offset */
87 	const char*		name)	/*!< in: name to compare to */
88 {
89 	ulint	tmp = dict_col_get_no(dict_field_get_col(
90 					      dict_index_get_nth_field(
91 						      index, i)));
92 
93 	return(strcmp(name, dict_table_get_col_name(table, tmp)) == 0);
94 }
95 #endif /* UNIV_DEBUG */
96 
97 /********************************************************************//**
98 Finds the first table name in the given database.
99 @return own: table name, NULL if does not exist; the caller must free
100 the memory in the string! */
101 UNIV_INTERN
102 char*
dict_get_first_table_name_in_db(const char * name)103 dict_get_first_table_name_in_db(
104 /*============================*/
105 	const char*	name)	/*!< in: database name which ends in '/' */
106 {
107 	dict_table_t*	sys_tables;
108 	btr_pcur_t	pcur;
109 	dict_index_t*	sys_index;
110 	dtuple_t*	tuple;
111 	mem_heap_t*	heap;
112 	dfield_t*	dfield;
113 	const rec_t*	rec;
114 	const byte*	field;
115 	ulint		len;
116 	mtr_t		mtr;
117 
118 	ut_ad(mutex_own(&(dict_sys->mutex)));
119 
120 	heap = mem_heap_create(1000);
121 
122 	mtr_start(&mtr);
123 
124 	sys_tables = dict_table_get_low("SYS_TABLES");
125 	sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
126 	ut_ad(!dict_table_is_comp(sys_tables));
127 
128 	tuple = dtuple_create(heap, 1);
129 	dfield = dtuple_get_nth_field(tuple, 0);
130 
131 	dfield_set_data(dfield, name, ut_strlen(name));
132 	dict_index_copy_types(tuple, sys_index, 1);
133 
134 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
135 				  BTR_SEARCH_LEAF, &pcur, &mtr);
136 loop:
137 	rec = btr_pcur_get_rec(&pcur);
138 
139 	if (!btr_pcur_is_on_user_rec(&pcur)) {
140 		/* Not found */
141 
142 		btr_pcur_close(&pcur);
143 		mtr_commit(&mtr);
144 		mem_heap_free(heap);
145 
146 		return(NULL);
147 	}
148 
149 	field = rec_get_nth_field_old(
150 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
151 
152 	if (len < strlen(name)
153 	    || ut_memcmp(name, field, strlen(name)) != 0) {
154 		/* Not found */
155 
156 		btr_pcur_close(&pcur);
157 		mtr_commit(&mtr);
158 		mem_heap_free(heap);
159 
160 		return(NULL);
161 	}
162 
163 	if (!rec_get_deleted_flag(rec, 0)) {
164 
165 		/* We found one */
166 
167 		char*	table_name = mem_strdupl((char*) field, len);
168 
169 		btr_pcur_close(&pcur);
170 		mtr_commit(&mtr);
171 		mem_heap_free(heap);
172 
173 		return(table_name);
174 	}
175 
176 	btr_pcur_move_to_next_user_rec(&pcur, &mtr);
177 
178 	goto loop;
179 }
180 
181 /********************************************************************//**
182 Prints to the standard output information on all tables found in the data
183 dictionary system table. */
184 UNIV_INTERN
185 void
dict_print(void)186 dict_print(void)
187 /*============*/
188 {
189 	dict_table_t*	table;
190 	btr_pcur_t	pcur;
191 	const rec_t*	rec;
192 	mem_heap_t*	heap;
193 	mtr_t		mtr;
194 
195 	/* Enlarge the fatal semaphore wait timeout during the InnoDB table
196 	monitor printout */
197 
198 	os_increment_counter_by_amount(
199 		server_mutex,
200 		srv_fatal_semaphore_wait_threshold,
201 		SRV_SEMAPHORE_WAIT_EXTENSION);
202 
203 	heap = mem_heap_create(1000);
204 	mutex_enter(&(dict_sys->mutex));
205 	mtr_start(&mtr);
206 
207 	rec = dict_startscan_system(&pcur, &mtr, SYS_TABLES);
208 
209 	while (rec) {
210 		const char* err_msg;
211 
212 		err_msg = static_cast<const char*>(
213 			dict_process_sys_tables_rec_and_mtr_commit(
214 				heap, rec, &table, DICT_TABLE_LOAD_FROM_CACHE,
215 				&mtr));
216 
217 		if (!err_msg) {
218 			dict_table_print(table);
219 		} else {
220 			ut_print_timestamp(stderr);
221 			fprintf(stderr, "  InnoDB: %s\n", err_msg);
222 		}
223 
224 		mem_heap_empty(heap);
225 
226 		mtr_start(&mtr);
227 		rec = dict_getnext_system(&pcur, &mtr);
228 	}
229 
230 	mtr_commit(&mtr);
231 	mutex_exit(&(dict_sys->mutex));
232 	mem_heap_free(heap);
233 
234 	/* Restore the fatal semaphore wait timeout */
235 	os_decrement_counter_by_amount(
236 		server_mutex,
237 		srv_fatal_semaphore_wait_threshold,
238 		SRV_SEMAPHORE_WAIT_EXTENSION);
239 }
240 
241 /********************************************************************//**
242 This function gets the next system table record as it scans the table.
243 @return	the next record if found, NULL if end of scan */
244 static
245 const rec_t*
dict_getnext_system_low(btr_pcur_t * pcur,mtr_t * mtr)246 dict_getnext_system_low(
247 /*====================*/
248 	btr_pcur_t*	pcur,		/*!< in/out: persistent cursor to the
249 					record*/
250 	mtr_t*		mtr)		/*!< in: the mini-transaction */
251 {
252 	rec_t*	rec = NULL;
253 
254 	while (!rec || rec_get_deleted_flag(rec, 0)) {
255 		btr_pcur_move_to_next_user_rec(pcur, mtr);
256 
257 		rec = btr_pcur_get_rec(pcur);
258 
259 		if (!btr_pcur_is_on_user_rec(pcur)) {
260 			/* end of index */
261 			btr_pcur_close(pcur);
262 
263 			return(NULL);
264 		}
265 	}
266 
267 	/* Get a record, let's save the position */
268 	btr_pcur_store_position(pcur, mtr);
269 
270 	return(rec);
271 }
272 
273 /********************************************************************//**
274 This function opens a system table, and returns the first record.
275 @return	first record of the system table */
276 UNIV_INTERN
277 const rec_t*
dict_startscan_system(btr_pcur_t * pcur,mtr_t * mtr,dict_system_id_t system_id)278 dict_startscan_system(
279 /*==================*/
280 	btr_pcur_t*	pcur,		/*!< out: persistent cursor to
281 					the record */
282 	mtr_t*		mtr,		/*!< in: the mini-transaction */
283 	dict_system_id_t system_id)	/*!< in: which system table to open */
284 {
285 	dict_table_t*	system_table;
286 	dict_index_t*	clust_index;
287 	const rec_t*	rec;
288 
289 	ut_a(system_id < SYS_NUM_SYSTEM_TABLES);
290 
291 	system_table = dict_table_get_low(SYSTEM_TABLE_NAME[system_id]);
292 
293 	clust_index = UT_LIST_GET_FIRST(system_table->indexes);
294 
295 	btr_pcur_open_at_index_side(true, clust_index, BTR_SEARCH_LEAF, pcur,
296 				    true, 0, mtr);
297 
298 	rec = dict_getnext_system_low(pcur, mtr);
299 
300 	return(rec);
301 }
302 
303 /********************************************************************//**
304 This function gets the next system table record as it scans the table.
305 @return	the next record if found, NULL if end of scan */
306 UNIV_INTERN
307 const rec_t*
dict_getnext_system(btr_pcur_t * pcur,mtr_t * mtr)308 dict_getnext_system(
309 /*================*/
310 	btr_pcur_t*	pcur,		/*!< in/out: persistent cursor
311 					to the record */
312 	mtr_t*		mtr)		/*!< in: the mini-transaction */
313 {
314 	const rec_t*	rec;
315 
316 	/* Restore the position */
317 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
318 
319 	/* Get the next record */
320 	rec = dict_getnext_system_low(pcur, mtr);
321 
322 	return(rec);
323 }
324 
325 /********************************************************************//**
326 This function processes one SYS_TABLES record and populate the dict_table_t
327 struct for the table. Extracted out of dict_print() to be used by
328 both monitor table output and information schema innodb_sys_tables output.
329 @return error message, or NULL on success */
330 UNIV_INTERN
331 const char*
dict_process_sys_tables_rec_and_mtr_commit(mem_heap_t * heap,const rec_t * rec,dict_table_t ** table,dict_table_info_t status,mtr_t * mtr)332 dict_process_sys_tables_rec_and_mtr_commit(
333 /*=======================================*/
334 	mem_heap_t*	heap,		/*!< in/out: temporary memory heap */
335 	const rec_t*	rec,		/*!< in: SYS_TABLES record */
336 	dict_table_t**	table,		/*!< out: dict_table_t to fill */
337 	dict_table_info_t status,	/*!< in: status bit controls
338 					options such as whether we shall
339 					look for dict_table_t from cache
340 					first */
341 	mtr_t*		mtr)		/*!< in/out: mini-transaction,
342 					will be committed */
343 {
344 	ulint		len;
345 	const char*	field;
346 	const char*	err_msg = NULL;
347 	char*		table_name;
348 
349 	field = (const char*) rec_get_nth_field_old(
350 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
351 
352 	ut_a(!rec_get_deleted_flag(rec, 0));
353 
354 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
355 
356 	/* Get the table name */
357 	table_name = mem_heap_strdupl(heap, field, len);
358 
359 	/* If DICT_TABLE_LOAD_FROM_CACHE is set, first check
360 	whether there is cached dict_table_t struct */
361 	if (status & DICT_TABLE_LOAD_FROM_CACHE) {
362 
363 		/* Commit before load the table again */
364 		mtr_commit(mtr);
365 
366 		*table = dict_table_get_low(table_name);
367 
368 		if (!(*table)) {
369 			err_msg = "Table not found in cache";
370 		}
371 	} else {
372 		err_msg = dict_load_table_low(table_name, rec, table);
373 		mtr_commit(mtr);
374 	}
375 
376 	if (err_msg) {
377 		return(err_msg);
378 	}
379 
380 	return(NULL);
381 }
382 
383 /********************************************************************//**
384 This function parses a SYS_INDEXES record and populate a dict_index_t
385 structure with the information from the record. For detail information
386 about SYS_INDEXES fields, please refer to dict_boot() function.
387 @return error message, or NULL on success */
388 UNIV_INTERN
389 const char*
dict_process_sys_indexes_rec(mem_heap_t * heap,const rec_t * rec,dict_index_t * index,table_id_t * table_id)390 dict_process_sys_indexes_rec(
391 /*=========================*/
392 	mem_heap_t*	heap,		/*!< in/out: heap memory */
393 	const rec_t*	rec,		/*!< in: current SYS_INDEXES rec */
394 	dict_index_t*	index,		/*!< out: index to be filled */
395 	table_id_t*	table_id)	/*!< out: index table id */
396 {
397 	const char*	err_msg;
398 	byte*		buf;
399 
400 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
401 
402 	/* Parse the record, and get "dict_index_t" struct filled */
403 	err_msg = dict_load_index_low(buf, NULL,
404 				      heap, rec, FALSE, &index);
405 
406 	*table_id = mach_read_from_8(buf);
407 
408 	return(err_msg);
409 }
410 
411 /********************************************************************//**
412 This function parses a SYS_COLUMNS record and populate a dict_column_t
413 structure with the information from the record.
414 @return error message, or NULL on success */
415 UNIV_INTERN
416 const char*
dict_process_sys_columns_rec(mem_heap_t * heap,const rec_t * rec,dict_col_t * column,table_id_t * table_id,const char ** col_name)417 dict_process_sys_columns_rec(
418 /*=========================*/
419 	mem_heap_t*	heap,		/*!< in/out: heap memory */
420 	const rec_t*	rec,		/*!< in: current SYS_COLUMNS rec */
421 	dict_col_t*	column,		/*!< out: dict_col_t to be filled */
422 	table_id_t*	table_id,	/*!< out: table id */
423 	const char**	col_name)	/*!< out: column name */
424 {
425 	const char*	err_msg;
426 
427 	/* Parse the record, and get "dict_col_t" struct filled */
428 	err_msg = dict_load_column_low(NULL, heap, column,
429 				       table_id, col_name, rec);
430 
431 	return(err_msg);
432 }
433 
434 /********************************************************************//**
435 This function parses a SYS_FIELDS record and populates a dict_field_t
436 structure with the information from the record.
437 @return error message, or NULL on success */
438 UNIV_INTERN
439 const char*
dict_process_sys_fields_rec(mem_heap_t * heap,const rec_t * rec,dict_field_t * sys_field,ulint * pos,index_id_t * index_id,index_id_t last_id)440 dict_process_sys_fields_rec(
441 /*========================*/
442 	mem_heap_t*	heap,		/*!< in/out: heap memory */
443 	const rec_t*	rec,		/*!< in: current SYS_FIELDS rec */
444 	dict_field_t*	sys_field,	/*!< out: dict_field_t to be
445 					filled */
446 	ulint*		pos,		/*!< out: Field position */
447 	index_id_t*	index_id,	/*!< out: current index id */
448 	index_id_t	last_id)	/*!< in: previous index id */
449 {
450 	byte*		buf;
451 	byte*		last_index_id;
452 	const char*	err_msg;
453 
454 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
455 
456 	last_index_id = static_cast<byte*>(mem_heap_alloc(heap, 8));
457 	mach_write_to_8(last_index_id, last_id);
458 
459 	err_msg = dict_load_field_low(buf, NULL, sys_field,
460 				      pos, last_index_id, heap, rec);
461 
462 	*index_id = mach_read_from_8(buf);
463 
464 	return(err_msg);
465 
466 }
467 
468 /********************************************************************//**
469 This function parses a SYS_FOREIGN record and populate a dict_foreign_t
470 structure with the information from the record. For detail information
471 about SYS_FOREIGN fields, please refer to dict_load_foreign() function.
472 @return error message, or NULL on success */
473 UNIV_INTERN
474 const char*
dict_process_sys_foreign_rec(mem_heap_t * heap,const rec_t * rec,dict_foreign_t * foreign)475 dict_process_sys_foreign_rec(
476 /*=========================*/
477 	mem_heap_t*	heap,		/*!< in/out: heap memory */
478 	const rec_t*	rec,		/*!< in: current SYS_FOREIGN rec */
479 	dict_foreign_t*	foreign)	/*!< out: dict_foreign_t struct
480 					to be filled */
481 {
482 	ulint		len;
483 	const byte*	field;
484 	ulint		n_fields_and_type;
485 
486 	if (rec_get_deleted_flag(rec, 0)) {
487 		return("delete-marked record in SYS_FOREIGN");
488 	}
489 
490 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN) {
491 		return("wrong number of columns in SYS_FOREIGN record");
492 	}
493 
494 	field = rec_get_nth_field_old(
495 		rec, DICT_FLD__SYS_FOREIGN__ID, &len);
496 	if (len == 0 || len == UNIV_SQL_NULL) {
497 err_len:
498 		return("incorrect column length in SYS_FOREIGN");
499 	}
500 
501 	/* This recieves a dict_foreign_t* that points to a stack variable.
502 	So mem_heap_free(foreign->heap) is not used as elsewhere.
503 	Since the heap used here is freed elsewhere, foreign->heap
504 	is not assigned. */
505 	foreign->id = mem_heap_strdupl(heap, (const char*) field, len);
506 
507 	rec_get_nth_field_offs_old(
508 		rec, DICT_FLD__SYS_FOREIGN__DB_TRX_ID, &len);
509 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
510 		goto err_len;
511 	}
512 	rec_get_nth_field_offs_old(
513 		rec, DICT_FLD__SYS_FOREIGN__DB_ROLL_PTR, &len);
514 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
515 		goto err_len;
516 	}
517 
518 	/* The _lookup versions of the referenced and foreign table names
519 	 are not assigned since they are not used in this dict_foreign_t */
520 
521 	field = rec_get_nth_field_old(
522 		rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
523 	if (len == 0 || len == UNIV_SQL_NULL) {
524 		goto err_len;
525 	}
526 	foreign->foreign_table_name = mem_heap_strdupl(
527 		heap, (const char*) field, len);
528 
529 	field = rec_get_nth_field_old(
530 		rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
531 	if (len == 0 || len == UNIV_SQL_NULL) {
532 		goto err_len;
533 	}
534 	foreign->referenced_table_name = mem_heap_strdupl(
535 		heap, (const char*) field, len);
536 
537 	field = rec_get_nth_field_old(
538 		rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len);
539 	if (len != 4) {
540 		goto err_len;
541 	}
542 	n_fields_and_type = mach_read_from_4(field);
543 
544 	foreign->type = (unsigned int) (n_fields_and_type >> 24);
545 	foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL);
546 
547 	return(NULL);
548 }
549 
550 /********************************************************************//**
551 This function parses a SYS_FOREIGN_COLS record and extract necessary
552 information from the record and return to caller.
553 @return error message, or NULL on success */
554 UNIV_INTERN
555 const char*
dict_process_sys_foreign_col_rec(mem_heap_t * heap,const rec_t * rec,const char ** name,const char ** for_col_name,const char ** ref_col_name,ulint * pos)556 dict_process_sys_foreign_col_rec(
557 /*=============================*/
558 	mem_heap_t*	heap,		/*!< in/out: heap memory */
559 	const rec_t*	rec,		/*!< in: current SYS_FOREIGN_COLS rec */
560 	const char**	name,		/*!< out: foreign key constraint name */
561 	const char**	for_col_name,	/*!< out: referencing column name */
562 	const char**	ref_col_name,	/*!< out: referenced column name
563 					in referenced table */
564 	ulint*		pos)		/*!< out: column position */
565 {
566 	ulint		len;
567 	const byte*	field;
568 
569 	if (rec_get_deleted_flag(rec, 0)) {
570 		return("delete-marked record in SYS_FOREIGN_COLS");
571 	}
572 
573 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN_COLS) {
574 		return("wrong number of columns in SYS_FOREIGN_COLS record");
575 	}
576 
577 	field = rec_get_nth_field_old(
578 		rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
579 	if (len == 0 || len == UNIV_SQL_NULL) {
580 err_len:
581 		return("incorrect column length in SYS_FOREIGN_COLS");
582 	}
583 	*name = mem_heap_strdupl(heap, (char*) field, len);
584 
585 	field = rec_get_nth_field_old(
586 		rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
587 	if (len != 4) {
588 		goto err_len;
589 	}
590 	*pos = mach_read_from_4(field);
591 
592 	rec_get_nth_field_offs_old(
593 		rec, DICT_FLD__SYS_FOREIGN_COLS__DB_TRX_ID, &len);
594 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
595 		goto err_len;
596 	}
597 	rec_get_nth_field_offs_old(
598 		rec, DICT_FLD__SYS_FOREIGN_COLS__DB_ROLL_PTR, &len);
599 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
600 		goto err_len;
601 	}
602 
603 	field = rec_get_nth_field_old(
604 		rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
605 	if (len == 0 || len == UNIV_SQL_NULL) {
606 		goto err_len;
607 	}
608 	*for_col_name = mem_heap_strdupl(heap, (char*) field, len);
609 
610 	field = rec_get_nth_field_old(
611 		rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
612 	if (len == 0 || len == UNIV_SQL_NULL) {
613 		goto err_len;
614 	}
615 	*ref_col_name = mem_heap_strdupl(heap, (char*) field, len);
616 
617 	return(NULL);
618 }
619 
620 /********************************************************************//**
621 This function parses a SYS_TABLESPACES record, extracts necessary
622 information from the record and returns to caller.
623 @return error message, or NULL on success */
624 UNIV_INTERN
625 const char*
dict_process_sys_tablespaces(mem_heap_t * heap,const rec_t * rec,ulint * space,const char ** name,ulint * flags)626 dict_process_sys_tablespaces(
627 /*=========================*/
628 	mem_heap_t*	heap,		/*!< in/out: heap memory */
629 	const rec_t*	rec,		/*!< in: current SYS_TABLESPACES rec */
630 	ulint*		space,		/*!< out: space id */
631 	const char**	name,		/*!< out: tablespace name */
632 	ulint*		flags)		/*!< out: tablespace flags */
633 {
634 	ulint		len;
635 	const byte*	field;
636 
637 	/* Initialize the output values */
638 	*space = ULINT_UNDEFINED;
639 	*name = NULL;
640 	*flags = ULINT_UNDEFINED;
641 
642 	if (rec_get_deleted_flag(rec, 0)) {
643 		return("delete-marked record in SYS_TABLESPACES");
644 	}
645 
646 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLESPACES) {
647 		return("wrong number of columns in SYS_TABLESPACES record");
648 	}
649 
650 	field = rec_get_nth_field_old(
651 		rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
652 	if (len != DICT_FLD_LEN_SPACE) {
653 err_len:
654 		return("incorrect column length in SYS_TABLESPACES");
655 	}
656 	*space = mach_read_from_4(field);
657 
658 	rec_get_nth_field_offs_old(
659 		rec, DICT_FLD__SYS_TABLESPACES__DB_TRX_ID, &len);
660 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
661 		goto err_len;
662 	}
663 
664 	rec_get_nth_field_offs_old(
665 		rec, DICT_FLD__SYS_TABLESPACES__DB_ROLL_PTR, &len);
666 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
667 		goto err_len;
668 	}
669 
670 	field = rec_get_nth_field_old(
671 		rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
672 	if (len == 0 || len == UNIV_SQL_NULL) {
673 		goto err_len;
674 	}
675 	*name = mem_heap_strdupl(heap, (char*) field, len);
676 
677 	field = rec_get_nth_field_old(
678 		rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
679 	if (len != DICT_FLD_LEN_FLAGS) {
680 		goto err_len;
681 	}
682 	*flags = mach_read_from_4(field);
683 
684 	return(NULL);
685 }
686 
687 /********************************************************************//**
688 This function parses a SYS_DATAFILES record, extracts necessary
689 information from the record and returns it to the caller.
690 @return error message, or NULL on success */
691 UNIV_INTERN
692 const char*
dict_process_sys_datafiles(mem_heap_t * heap,const rec_t * rec,ulint * space,const char ** path)693 dict_process_sys_datafiles(
694 /*=======================*/
695 	mem_heap_t*	heap,		/*!< in/out: heap memory */
696 	const rec_t*	rec,		/*!< in: current SYS_DATAFILES rec */
697 	ulint*		space,		/*!< out: space id */
698 	const char**	path)		/*!< out: datafile paths */
699 {
700 	ulint		len;
701 	const byte*	field;
702 
703 	if (rec_get_deleted_flag(rec, 0)) {
704 		return("delete-marked record in SYS_DATAFILES");
705 	}
706 
707 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_DATAFILES) {
708 		return("wrong number of columns in SYS_DATAFILES record");
709 	}
710 
711 	field = rec_get_nth_field_old(
712 		rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
713 	if (len != DICT_FLD_LEN_SPACE) {
714 err_len:
715 		return("incorrect column length in SYS_DATAFILES");
716 	}
717 	*space = mach_read_from_4(field);
718 
719 	rec_get_nth_field_offs_old(
720 		rec, DICT_FLD__SYS_DATAFILES__DB_TRX_ID, &len);
721 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
722 		goto err_len;
723 	}
724 
725 	rec_get_nth_field_offs_old(
726 		rec, DICT_FLD__SYS_DATAFILES__DB_ROLL_PTR, &len);
727 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
728 		goto err_len;
729 	}
730 
731 	field = rec_get_nth_field_old(
732 		rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
733 	if (len == 0 || len == UNIV_SQL_NULL) {
734 		goto err_len;
735 	}
736 	*path = mem_heap_strdupl(heap, (char*) field, len);
737 
738 	return(NULL);
739 }
740 
741 /** This function parses a SYS_ZIP_DICT record, extracts necessary
742 information from the record and returns to caller.
743 @return error message, or NULL on success */
744 UNIV_INTERN
745 const char*
dict_process_sys_zip_dict(mem_heap_t * heap,ulint zip_size,const rec_t * rec,ulint * id,const char ** name,const char ** data,ulint * data_len)746 dict_process_sys_zip_dict(
747 	mem_heap_t*	heap,		/*!< in/out: heap memory */
748 	ulint		zip_size,	/*!< in: nonzero=compressed BLOB page size */
749 	const rec_t*	rec,		/*!< in: current SYS_ZIP_DICT rec */
750 	ulint*		id,		/*!< out: dict id */
751 	const char**	name,		/*!< out: dict name */
752 	const char**	data,		/*!< out: dict data */
753 	ulint*		data_len)	/*!< out: dict data length */
754 {
755 	ulint		len;
756 	const byte*	field;
757 
758 	/* Initialize the output values */
759 	*id = ULINT_UNDEFINED;
760 	*name = NULL;
761 	*data = NULL;
762 	*data_len = 0;
763 
764 	if (UNIV_UNLIKELY(rec_get_deleted_flag(rec, 0))) {
765 		return("delete-marked record in SYS_ZIP_DICT");
766 	}
767 
768 	if (UNIV_UNLIKELY(
769 		rec_get_n_fields_old(rec)!= DICT_NUM_FIELDS__SYS_ZIP_DICT)) {
770 		return("wrong number of columns in SYS_ZIP_DICT record");
771 	}
772 
773 	field = rec_get_nth_field_old(
774 		rec, DICT_FLD__SYS_ZIP_DICT__ID, &len);
775 	if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
776 		goto err_len;
777 	}
778 	*id = mach_read_from_4(field);
779 
780 	rec_get_nth_field_offs_old(
781 		rec, DICT_FLD__SYS_ZIP_DICT__DB_TRX_ID, &len);
782 	if (UNIV_UNLIKELY(len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL)) {
783 		goto err_len;
784 	}
785 
786 	rec_get_nth_field_offs_old(
787 		rec, DICT_FLD__SYS_ZIP_DICT__DB_ROLL_PTR, &len);
788 	if (UNIV_UNLIKELY(len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL)) {
789 		goto err_len;
790 	}
791 
792 	field = rec_get_nth_field_old(
793 		rec, DICT_FLD__SYS_ZIP_DICT__NAME, &len);
794 	if (UNIV_UNLIKELY(len == 0 || len == UNIV_SQL_NULL)) {
795 		goto err_len;
796 	}
797 	*name = mem_heap_strdupl(heap, (char*) field, len);
798 
799 	field = rec_get_nth_field_old(
800 		rec, DICT_FLD__SYS_ZIP_DICT__DATA, &len);
801 	if (UNIV_UNLIKELY(len == UNIV_SQL_NULL)) {
802 		goto err_len;
803 	}
804 
805 	if (rec_get_1byte_offs_flag(rec) == 0 &&
806 		rec_2_is_field_extern(rec, DICT_FLD__SYS_ZIP_DICT__DATA)) {
807 		ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
808 
809 		if (UNIV_UNLIKELY
810 			(!memcmp(field + len - BTR_EXTERN_FIELD_REF_SIZE,
811 				field_ref_zero,
812 				BTR_EXTERN_FIELD_REF_SIZE))) {
813 			goto err_len;
814 		}
815 		*data = reinterpret_cast<char*>(
816 			btr_copy_externally_stored_field(data_len, field,
817 							zip_size, len, heap));
818 	}
819 	else {
820 		*data_len = len;
821 		*data = static_cast<char*>(mem_heap_dup(heap, field, len));
822 	}
823 
824 	return(NULL);
825 
826 err_len:
827 	return("incorrect column length in SYS_ZIP_DICT");
828 }
829 
830 /** This function parses a SYS_ZIP_DICT_COLS record, extracts necessary
831 information from the record and returns to caller.
832 @return error message, or NULL on success */
833 UNIV_INTERN
834 const char*
dict_process_sys_zip_dict_cols(mem_heap_t * heap,const rec_t * rec,ulint * table_id,ulint * column_pos,ulint * dict_id)835 dict_process_sys_zip_dict_cols(
836 	mem_heap_t*	heap,		/*!< in/out: heap memory */
837 	const rec_t*	rec,		/*!< in: current SYS_ZIP_DICT rec */
838 	ulint*		table_id,	/*!< out: table id */
839 	ulint*		column_pos,	/*!< out: column position */
840 	ulint*		dict_id)	/*!< out: dict id */
841 {
842 	ulint		len;
843 	const byte*	field;
844 
845 	/* Initialize the output values */
846 	*table_id = ULINT_UNDEFINED;
847 	*column_pos = ULINT_UNDEFINED;
848 	*dict_id = ULINT_UNDEFINED;
849 
850 	if (UNIV_UNLIKELY(rec_get_deleted_flag(rec, 0))) {
851 		return("delete-marked record in SYS_ZIP_DICT_COLS");
852 	}
853 
854 	if (UNIV_UNLIKELY(rec_get_n_fields_old(rec) !=
855 		DICT_NUM_FIELDS__SYS_ZIP_DICT_COLS)) {
856 		return("wrong number of columns in SYS_ZIP_DICT_COLS"
857 			" record");
858 	}
859 
860 	field = rec_get_nth_field_old(
861 		rec, DICT_FLD__SYS_ZIP_DICT_COLS__TABLE_ID, &len);
862 	if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
863 err_len:
864 		return("incorrect column length in SYS_ZIP_DICT_COLS");
865 	}
866 	*table_id = mach_read_from_4(field);
867 
868 	field = rec_get_nth_field_old(
869 		rec, DICT_FLD__SYS_ZIP_DICT_COLS__COLUMN_POS, &len);
870 	if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
871 		goto err_len;
872 	}
873 	*column_pos = mach_read_from_4(field);
874 
875 	rec_get_nth_field_offs_old(
876 		rec, DICT_FLD__SYS_ZIP_DICT_COLS__DB_TRX_ID, &len);
877 	if (UNIV_UNLIKELY(len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL)) {
878 		goto err_len;
879 	}
880 
881 	rec_get_nth_field_offs_old(
882 		rec, DICT_FLD__SYS_ZIP_DICT_COLS__DB_ROLL_PTR, &len);
883 	if (UNIV_UNLIKELY(len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL)) {
884 		goto err_len;
885 	}
886 
887 	field = rec_get_nth_field_old(
888 		rec, DICT_FLD__SYS_ZIP_DICT_COLS__DICT_ID, &len);
889 	if (UNIV_UNLIKELY(len != DICT_FLD_LEN_SPACE)) {
890 		goto err_len;
891 	}
892 	*dict_id = mach_read_from_4(field);
893 
894 	return(NULL);
895 }
896 /********************************************************************//**
897 Determine the flags of a table as stored in SYS_TABLES.TYPE and N_COLS.
898 @return  ULINT_UNDEFINED if error, else a valid dict_table_t::flags. */
899 static
900 ulint
dict_sys_tables_get_flags(const rec_t * rec)901 dict_sys_tables_get_flags(
902 /*======================*/
903 	const rec_t*	rec)	/*!< in: a record of SYS_TABLES */
904 {
905 	const byte*	field;
906 	ulint		len;
907 	ulint		type;
908 	ulint		n_cols;
909 
910 	/* read the 4 byte flags from the TYPE field */
911 	field = rec_get_nth_field_old(
912 		rec, DICT_FLD__SYS_TABLES__TYPE, &len);
913 	ut_a(len == 4);
914 	type = mach_read_from_4(field);
915 
916 	/* The low order bit of SYS_TABLES.TYPE is always set to 1. But in
917 	dict_table_t::flags the low order bit is used to determine if the
918 	row format is Redundant or Compact when the format is Antelope.
919 	Read the 4 byte N_COLS field and look at the high order bit.  It
920 	should be set for COMPACT and later.  It should not be set for
921 	REDUNDANT. */
922 	field = rec_get_nth_field_old(
923 		rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
924 	ut_a(len == 4);
925 	n_cols = mach_read_from_4(field);
926 
927 	/* This validation function also combines the DICT_N_COLS_COMPACT
928 	flag in n_cols into the type field to effectively make it a
929 	dict_table_t::flags. */
930 
931 	if (ULINT_UNDEFINED == dict_sys_tables_type_validate(type, n_cols)) {
932 		return(ULINT_UNDEFINED);
933 	}
934 
935 	return(dict_sys_tables_type_to_tf(type, n_cols));
936 }
937 
938 /********************************************************************//**
939 Gets the filepath for a spaceid from SYS_DATAFILES and checks it against
940 the contents of a link file. This function is called when there is no
941 fil_node_t entry for this space ID so both durable locations on  disk
942 must be checked and compared.
943 We use a temporary heap here for the table lookup, but not for the path
944 returned which the caller must free.
945 This function can return NULL if the space ID is not found in SYS_DATAFILES,
946 then the caller will assume that the ibd file is in the normal datadir.
947 @return	own: A copy of the first datafile found in SYS_DATAFILES.PATH for
948 the given space ID. NULL if space ID is zero or not found. */
949 UNIV_INTERN
950 char*
dict_get_first_path(ulint space,const char * name)951 dict_get_first_path(
952 /*================*/
953 	ulint		space,	/*!< in: space id */
954 	const char*	name)	/*!< in: tablespace name */
955 {
956 	mtr_t		mtr;
957 	dict_table_t*	sys_datafiles;
958 	dict_index_t*	sys_index;
959 	dtuple_t*	tuple;
960 	dfield_t*	dfield;
961 	byte*		buf;
962 	btr_pcur_t	pcur;
963 	const rec_t*	rec;
964 	const byte*	field;
965 	ulint		len;
966 	char*		dict_filepath = NULL;
967 	mem_heap_t*	heap = mem_heap_create(1024);
968 
969 	ut_ad(mutex_own(&(dict_sys->mutex)));
970 
971 	mtr_start(&mtr);
972 
973 	sys_datafiles = dict_table_get_low("SYS_DATAFILES");
974 	sys_index = UT_LIST_GET_FIRST(sys_datafiles->indexes);
975 	ut_ad(!dict_table_is_comp(sys_datafiles));
976 	ut_ad(name_of_col_is(sys_datafiles, sys_index,
977 			     DICT_FLD__SYS_DATAFILES__SPACE, "SPACE"));
978 	ut_ad(name_of_col_is(sys_datafiles, sys_index,
979 			     DICT_FLD__SYS_DATAFILES__PATH, "PATH"));
980 
981 	tuple = dtuple_create(heap, 1);
982 	dfield = dtuple_get_nth_field(tuple, DICT_FLD__SYS_DATAFILES__SPACE);
983 
984 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
985 	mach_write_to_4(buf, space);
986 
987 	dfield_set_data(dfield, buf, 4);
988 	dict_index_copy_types(tuple, sys_index, 1);
989 
990 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
991 				  BTR_SEARCH_LEAF, &pcur, &mtr);
992 
993 	rec = btr_pcur_get_rec(&pcur);
994 
995 	/* If the file-per-table tablespace was created with
996 	an earlier version of InnoDB, then this record is not
997 	in SYS_DATAFILES.  But a link file still might exist. */
998 
999 	if (btr_pcur_is_on_user_rec(&pcur)) {
1000 		/* A record for this space ID was found. */
1001 		field = rec_get_nth_field_old(
1002 			rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
1003 		ut_a(len > 0 || len == UNIV_SQL_NULL);
1004 		ut_a(len < OS_FILE_MAX_PATH);
1005 		dict_filepath = mem_strdupl((char*) field, len);
1006 		ut_a(dict_filepath);
1007 	}
1008 
1009 	btr_pcur_close(&pcur);
1010 	mtr_commit(&mtr);
1011 	mem_heap_free(heap);
1012 
1013 	return(dict_filepath);
1014 }
1015 
1016 /********************************************************************//**
1017 Update the record for space_id in SYS_TABLESPACES to this filepath.
1018 @return	DB_SUCCESS if OK, dberr_t if the insert failed */
1019 UNIV_INTERN
1020 dberr_t
dict_update_filepath(ulint space_id,const char * filepath)1021 dict_update_filepath(
1022 /*=================*/
1023 	ulint		space_id,	/*!< in: space id */
1024 	const char*	filepath)	/*!< in: filepath */
1025 {
1026 	dberr_t		err = DB_SUCCESS;
1027 	trx_t*		trx;
1028 
1029 #ifdef UNIV_SYNC_DEBUG
1030 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_EX));
1031 #endif /* UNIV_SYNC_DEBUG */
1032 	ut_ad(mutex_own(&(dict_sys->mutex)));
1033 
1034 	trx = trx_allocate_for_background();
1035 	trx->op_info = "update filepath";
1036 	trx->dict_operation_lock_mode = RW_X_LATCH;
1037 	trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
1038 
1039 	pars_info_t*	info = pars_info_create();
1040 
1041 	pars_info_add_int4_literal(info, "space", space_id);
1042 	pars_info_add_str_literal(info, "path", filepath);
1043 
1044 	err = que_eval_sql(info,
1045 			   "PROCEDURE UPDATE_FILEPATH () IS\n"
1046 			   "BEGIN\n"
1047 			   "UPDATE SYS_DATAFILES"
1048 			   " SET PATH = :path\n"
1049 			   " WHERE SPACE = :space;\n"
1050 			   "END;\n", FALSE, trx);
1051 
1052 	trx_commit_for_mysql(trx);
1053 	trx->dict_operation_lock_mode = 0;
1054 	trx_free_for_background(trx);
1055 
1056 	if (err == DB_SUCCESS) {
1057 		/* We just updated SYS_DATAFILES due to the contents in
1058 		a link file.  Make a note that we did this. */
1059 		ib_logf(IB_LOG_LEVEL_INFO,
1060 			"The InnoDB data dictionary table SYS_DATAFILES "
1061 			"for tablespace ID %lu was updated to use file %s.",
1062 			(ulong) space_id, filepath);
1063 	} else {
1064 		ib_logf(IB_LOG_LEVEL_WARN,
1065 			"Problem updating InnoDB data dictionary table "
1066 			"SYS_DATAFILES for tablespace ID %lu to file %s.",
1067 			(ulong) space_id, filepath);
1068 	}
1069 
1070 	return(err);
1071 }
1072 
1073 /********************************************************************//**
1074 Insert records into SYS_TABLESPACES and SYS_DATAFILES.
1075 @return	DB_SUCCESS if OK, dberr_t if the insert failed */
1076 UNIV_INTERN
1077 dberr_t
dict_insert_tablespace_and_filepath(ulint space,const char * name,const char * filepath,ulint fsp_flags)1078 dict_insert_tablespace_and_filepath(
1079 /*================================*/
1080 	ulint		space,		/*!< in: space id */
1081 	const char*	name,		/*!< in: talespace name */
1082 	const char*	filepath,	/*!< in: filepath */
1083 	ulint		fsp_flags)	/*!< in: tablespace flags */
1084 {
1085 	dberr_t		err = DB_SUCCESS;
1086 	trx_t*		trx;
1087 
1088 #ifdef UNIV_SYNC_DEBUG
1089 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_EX));
1090 #endif /* UNIV_SYNC_DEBUG */
1091 	ut_ad(mutex_own(&(dict_sys->mutex)));
1092 	ut_ad(filepath);
1093 
1094 	trx = trx_allocate_for_background();
1095 	trx->op_info = "insert tablespace and filepath";
1096 	trx->dict_operation_lock_mode = RW_X_LATCH;
1097 	trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
1098 
1099 	/* A record for this space ID was not found in
1100 	SYS_DATAFILES. Assume the record is also missing in
1101 	SYS_TABLESPACES.  Insert records onto them both. */
1102 	err = dict_create_add_tablespace_to_dictionary(
1103 		space, name, fsp_flags, filepath, trx, false);
1104 
1105 	trx_commit_for_mysql(trx);
1106 	trx->dict_operation_lock_mode = 0;
1107 	trx_free_for_background(trx);
1108 
1109 	return(err);
1110 }
1111 
1112 /********************************************************************//**
1113 This function looks at each table defined in SYS_TABLES.  It checks the
1114 tablespace for any table with a space_id > 0.  It looks up the tablespace
1115 in SYS_DATAFILES to ensure the correct path.
1116 
1117 In a crash recovery we already have all the tablespace objects created.
1118 This function compares the space id information in the InnoDB data dictionary
1119 to what we already read with fil_load_single_table_tablespaces().
1120 
1121 In a normal startup, we create the tablespace objects for every table in
1122 InnoDB's data dictionary, if the corresponding .ibd file exists.
1123 We also scan the biggest space id, and store it to fil_system. */
1124 UNIV_INTERN
1125 void
dict_check_tablespaces_and_store_max_id(dict_check_t dict_check)1126 dict_check_tablespaces_and_store_max_id(
1127 /*====================================*/
1128 	dict_check_t	dict_check)	/*!< in: how to check */
1129 {
1130 	dict_table_t*	sys_tables;
1131 	dict_index_t*	sys_index;
1132 	btr_pcur_t	pcur;
1133 	const rec_t*	rec;
1134 	ulint		max_space_id;
1135 	mtr_t		mtr;
1136 
1137 	rw_lock_x_lock(&dict_operation_lock);
1138 	mutex_enter(&(dict_sys->mutex));
1139 
1140 	mtr_start(&mtr);
1141 
1142 	sys_tables = dict_table_get_low("SYS_TABLES");
1143 	sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
1144 	ut_ad(!dict_table_is_comp(sys_tables));
1145 
1146 	max_space_id = mtr_read_ulint(dict_hdr_get(&mtr)
1147 				      + DICT_HDR_MAX_SPACE_ID,
1148 				      MLOG_4BYTES, &mtr);
1149 	fil_set_max_space_id_if_bigger(max_space_id);
1150 
1151 	btr_pcur_open_at_index_side(true, sys_index, BTR_SEARCH_LEAF, &pcur,
1152 				    true, 0, &mtr);
1153 loop:
1154 	btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1155 
1156 	rec = btr_pcur_get_rec(&pcur);
1157 
1158 	if (!btr_pcur_is_on_user_rec(&pcur)) {
1159 		/* end of index */
1160 
1161 		btr_pcur_close(&pcur);
1162 		mtr_commit(&mtr);
1163 
1164 		/* We must make the tablespace cache aware of the biggest
1165 		known space id */
1166 
1167 		/* printf("Biggest space id in data dictionary %lu\n",
1168 		max_space_id); */
1169 		fil_set_max_space_id_if_bigger(max_space_id);
1170 
1171 		mutex_exit(&(dict_sys->mutex));
1172 		rw_lock_x_unlock(&dict_operation_lock);
1173 
1174 		return;
1175 	}
1176 
1177 	if (!rec_get_deleted_flag(rec, 0)) {
1178 
1179 		/* We found one */
1180 		const byte*	field;
1181 		ulint		len;
1182 		ulint		space_id;
1183 		ulint		flags;
1184 		char*		name;
1185 
1186 		field = rec_get_nth_field_old(
1187 			rec, DICT_FLD__SYS_TABLES__NAME, &len);
1188 
1189 		name = mem_strdupl((char*) field, len);
1190 
1191 		char	table_name[MAX_FULL_NAME_LEN + 1];
1192 
1193 		innobase_format_name(
1194 			table_name, sizeof(table_name), name, FALSE);
1195 
1196 		flags = dict_sys_tables_get_flags(rec);
1197 		if (UNIV_UNLIKELY(flags == ULINT_UNDEFINED)) {
1198 			/* Read again the 4 bytes from rec. */
1199 			field = rec_get_nth_field_old(
1200 				rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1201 			ut_ad(len == 4); /* this was checked earlier */
1202 			flags = mach_read_from_4(field);
1203 
1204 			ib_logf(IB_LOG_LEVEL_ERROR,
1205 				"Table '%s' in InnoDB data dictionary"
1206 				" has unknown type %lx", table_name, flags);
1207 			mem_free(name);
1208 			goto loop;
1209 		}
1210 
1211 		field = rec_get_nth_field_old(
1212 			rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1213 		ut_a(len == 4);
1214 
1215 		space_id = mach_read_from_4(field);
1216 
1217 		btr_pcur_store_position(&pcur, &mtr);
1218 
1219 		mtr_commit(&mtr);
1220 
1221 		/* For tables created with old versions of InnoDB,
1222 		SYS_TABLES.MIX_LEN may contain garbage.  Such tables
1223 		would always be in ROW_FORMAT=REDUNDANT. Pretend that
1224 		all such tables are non-temporary. That is, do not
1225 		suppress error printouts about temporary or discarded
1226 		tablespaces not being found. */
1227 
1228 		field = rec_get_nth_field_old(
1229 			rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1230 
1231 		bool		is_temp = false;
1232 		bool		discarded = false;
1233 		ib_uint32_t	flags2 = static_cast<ib_uint32_t>(
1234 			mach_read_from_4(field));
1235 
1236 		/* Check that the tablespace (the .ibd file) really
1237 		exists; print a warning to the .err log if not.
1238 		Do not print warnings for temporary tables or for
1239 		tablespaces that have been discarded. */
1240 
1241 		field = rec_get_nth_field_old(
1242 			rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1243 
1244 		/* MIX_LEN valid only for ROW_FORMAT > REDUNDANT. */
1245 		if (mach_read_from_4(field) & DICT_N_COLS_COMPACT) {
1246 
1247 			is_temp = !!(flags2 & DICT_TF2_TEMPORARY);
1248 			discarded = !!(flags2 & DICT_TF2_DISCARDED);
1249 		}
1250 
1251 		if (space_id == 0) {
1252 			/* The system tablespace always exists. */
1253 			ut_ad(!discarded);
1254 			goto next_tablespace;
1255 		}
1256 
1257 		switch (dict_check) {
1258 		case DICT_CHECK_ALL_LOADED:
1259 			/* All tablespaces should have been found in
1260 			fil_load_single_table_tablespaces(). */
1261 			if (fil_space_for_table_exists_in_mem(
1262 				space_id, name, TRUE, !(is_temp || discarded),
1263 				false, NULL, 0)
1264 			    && !(is_temp || discarded)) {
1265 				/* If user changes the path of .ibd files in
1266 				   *.isl files before doing crash recovery ,
1267 				   then this leads to inconsistency in
1268 				   SYS_DATAFILES system table because the
1269 				   tables are loaded from the updated path
1270 				   but the SYS_DATAFILES still points to the
1271 				   old path.Therefore after crash recovery
1272 				   update SYS_DATAFILES with the updated path.*/
1273 				ut_ad(space_id);
1274 				ut_ad(recv_needed_recovery);
1275 				char *dict_path = dict_get_first_path(space_id,
1276 								      name);
1277 				char *remote_path = fil_read_link_file(name);
1278 				if(dict_path && remote_path) {
1279 					if(strcmp(dict_path,remote_path)) {
1280 						dict_update_filepath(space_id,
1281 								     remote_path);
1282 						}
1283 				}
1284 				if(dict_path)
1285 					mem_free(dict_path);
1286 				if(remote_path)
1287 					mem_free(remote_path);
1288 			}
1289 			break;
1290 
1291 		case DICT_CHECK_SOME_LOADED:
1292 			/* Some tablespaces may have been opened in
1293 			trx_resurrect_table_locks(). */
1294 			if (fil_space_for_table_exists_in_mem(
1295 				    space_id, name, FALSE, FALSE,
1296 				    false, NULL, 0)) {
1297 				break;
1298 			}
1299 			/* fall through */
1300 		case DICT_CHECK_NONE_LOADED:
1301 			if (discarded) {
1302 				ib_logf(IB_LOG_LEVEL_INFO,
1303 					"DISCARD flag set for table '%s',"
1304 					" ignored.",
1305 					table_name);
1306 				break;
1307 			}
1308 
1309 			/* It is a normal database startup: create the
1310 			space object and check that the .ibd file exists.
1311 			If the table uses a remote tablespace, look for the
1312 			space_id in SYS_DATAFILES to find the filepath */
1313 
1314 			/* Use the remote filepath if known. */
1315 			char*	filepath = NULL;
1316 			if (DICT_TF_HAS_DATA_DIR(flags)) {
1317 				filepath = dict_get_first_path(
1318 					space_id, name);
1319 			}
1320 
1321 			/* We set the 2nd param (fix_dict = true)
1322 			here because we already have an x-lock on
1323 			dict_operation_lock and dict_sys->mutex. Besides,
1324 			this is at startup and we are now single threaded.
1325 			If the filepath is not known, it will need to
1326 			be discovered. */
1327 			dberr_t	err = fil_open_single_table_tablespace(
1328 				false, srv_read_only_mode ? false : true,
1329 				space_id, dict_tf_to_fsp_flags(flags),
1330 				name, filepath);
1331 
1332 			if (err != DB_SUCCESS) {
1333 				ib_logf(IB_LOG_LEVEL_ERROR,
1334 					"Tablespace open failed for '%s', "
1335 					"ignored.", table_name);
1336 			}
1337 
1338 			if (filepath) {
1339 				mem_free(filepath);
1340 			}
1341 
1342 			break;
1343 		}
1344 
1345 		if (space_id > max_space_id) {
1346 			max_space_id = space_id;
1347 		}
1348 
1349 next_tablespace:
1350 		mem_free(name);
1351 		mtr_start(&mtr);
1352 
1353 		btr_pcur_restore_position(BTR_SEARCH_LEAF, &pcur, &mtr);
1354 	}
1355 
1356 	goto loop;
1357 }
1358 
1359 /********************************************************************//**
1360 Loads a table column definition from a SYS_COLUMNS record to
1361 dict_table_t.
1362 @return error message, or NULL on success */
1363 UNIV_INTERN
1364 const char*
dict_load_column_low(dict_table_t * table,mem_heap_t * heap,dict_col_t * column,table_id_t * table_id,const char ** col_name,const rec_t * rec)1365 dict_load_column_low(
1366 /*=================*/
1367 	dict_table_t*	table,		/*!< in/out: table, could be NULL
1368 					if we just populate a dict_column_t
1369 					struct with information from
1370 					a SYS_COLUMNS record */
1371 	mem_heap_t*	heap,		/*!< in/out: memory heap
1372 					for temporary storage */
1373 	dict_col_t*	column,		/*!< out: dict_column_t to fill,
1374 					or NULL if table != NULL */
1375 	table_id_t*	table_id,	/*!< out: table id */
1376 	const char**	col_name,	/*!< out: column name */
1377 	const rec_t*	rec)		/*!< in: SYS_COLUMNS record */
1378 {
1379 	char*		name;
1380 	const byte*	field;
1381 	ulint		len;
1382 	ulint		mtype;
1383 	ulint		prtype;
1384 	ulint		col_len;
1385 	ulint		pos;
1386 
1387 	ut_ad(table || column);
1388 
1389 	if (rec_get_deleted_flag(rec, 0)) {
1390 		return("delete-marked record in SYS_COLUMNS");
1391 	}
1392 
1393 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_COLUMNS) {
1394 		return("wrong number of columns in SYS_COLUMNS record");
1395 	}
1396 
1397 	field = rec_get_nth_field_old(
1398 		rec, DICT_FLD__SYS_COLUMNS__TABLE_ID, &len);
1399 	if (len != 8) {
1400 err_len:
1401 		return("incorrect column length in SYS_COLUMNS");
1402 	}
1403 
1404 	if (table_id) {
1405 		*table_id = mach_read_from_8(field);
1406 	} else if (table->id != mach_read_from_8(field)) {
1407 		return("SYS_COLUMNS.TABLE_ID mismatch");
1408 	}
1409 
1410 	field = rec_get_nth_field_old(
1411 		rec, DICT_FLD__SYS_COLUMNS__POS, &len);
1412 	if (len != 4) {
1413 
1414 		goto err_len;
1415 	}
1416 
1417 	pos = mach_read_from_4(field);
1418 
1419 	if (table && table->n_def != pos) {
1420 		return("SYS_COLUMNS.POS mismatch");
1421 	}
1422 
1423 	rec_get_nth_field_offs_old(
1424 		rec, DICT_FLD__SYS_COLUMNS__DB_TRX_ID, &len);
1425 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1426 		goto err_len;
1427 	}
1428 	rec_get_nth_field_offs_old(
1429 		rec, DICT_FLD__SYS_COLUMNS__DB_ROLL_PTR, &len);
1430 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1431 		goto err_len;
1432 	}
1433 
1434 	field = rec_get_nth_field_old(
1435 		rec, DICT_FLD__SYS_COLUMNS__NAME, &len);
1436 	if (len == 0 || len == UNIV_SQL_NULL) {
1437 		goto err_len;
1438 	}
1439 
1440 	name = mem_heap_strdupl(heap, (const char*) field, len);
1441 
1442 	if (col_name) {
1443 		*col_name = name;
1444 	}
1445 
1446 	field = rec_get_nth_field_old(
1447 		rec, DICT_FLD__SYS_COLUMNS__MTYPE, &len);
1448 	if (len != 4) {
1449 		goto err_len;
1450 	}
1451 
1452 	mtype = mach_read_from_4(field);
1453 
1454 	field = rec_get_nth_field_old(
1455 		rec, DICT_FLD__SYS_COLUMNS__PRTYPE, &len);
1456 	if (len != 4) {
1457 		goto err_len;
1458 	}
1459 	prtype = mach_read_from_4(field);
1460 
1461 	if (dtype_get_charset_coll(prtype) == 0
1462 	    && dtype_is_string_type(mtype)) {
1463 		/* The table was created with < 4.1.2. */
1464 
1465 		if (dtype_is_binary_string_type(mtype, prtype)) {
1466 			/* Use the binary collation for
1467 			string columns of binary type. */
1468 
1469 			prtype = dtype_form_prtype(
1470 				prtype,
1471 				DATA_MYSQL_BINARY_CHARSET_COLL);
1472 		} else {
1473 			/* Use the default charset for
1474 			other than binary columns. */
1475 
1476 			prtype = dtype_form_prtype(
1477 				prtype,
1478 				data_mysql_default_charset_coll);
1479 		}
1480 	}
1481 
1482 	field = rec_get_nth_field_old(
1483 		rec, DICT_FLD__SYS_COLUMNS__LEN, &len);
1484 	if (len != 4) {
1485 		goto err_len;
1486 	}
1487 	col_len = mach_read_from_4(field);
1488 	field = rec_get_nth_field_old(
1489 		rec, DICT_FLD__SYS_COLUMNS__PREC, &len);
1490 	if (len != 4) {
1491 		goto err_len;
1492 	}
1493 
1494 	if (!column) {
1495 		dict_mem_table_add_col(table, heap, name, mtype,
1496 				       prtype, col_len);
1497 	} else {
1498 		dict_mem_fill_column_struct(column, pos, mtype,
1499 					    prtype, col_len);
1500 	}
1501 
1502 	return(NULL);
1503 }
1504 
1505 /********************************************************************//**
1506 Loads definitions for table columns. */
1507 static
1508 void
dict_load_columns(dict_table_t * table,mem_heap_t * heap)1509 dict_load_columns(
1510 /*==============*/
1511 	dict_table_t*	table,	/*!< in/out: table */
1512 	mem_heap_t*	heap)	/*!< in/out: memory heap
1513 				for temporary storage */
1514 {
1515 	dict_table_t*	sys_columns;
1516 	dict_index_t*	sys_index;
1517 	btr_pcur_t	pcur;
1518 	dtuple_t*	tuple;
1519 	dfield_t*	dfield;
1520 	const rec_t*	rec;
1521 	byte*		buf;
1522 	ulint		i;
1523 	mtr_t		mtr;
1524 
1525 	ut_ad(mutex_own(&(dict_sys->mutex)));
1526 
1527 	mtr_start(&mtr);
1528 
1529 	sys_columns = dict_table_get_low("SYS_COLUMNS");
1530 	sys_index = UT_LIST_GET_FIRST(sys_columns->indexes);
1531 	ut_ad(!dict_table_is_comp(sys_columns));
1532 
1533 	ut_ad(name_of_col_is(sys_columns, sys_index,
1534 			     DICT_FLD__SYS_COLUMNS__NAME, "NAME"));
1535 	ut_ad(name_of_col_is(sys_columns, sys_index,
1536 			     DICT_FLD__SYS_COLUMNS__PREC, "PREC"));
1537 
1538 	tuple = dtuple_create(heap, 1);
1539 	dfield = dtuple_get_nth_field(tuple, 0);
1540 
1541 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
1542 	mach_write_to_8(buf, table->id);
1543 
1544 	dfield_set_data(dfield, buf, 8);
1545 	dict_index_copy_types(tuple, sys_index, 1);
1546 
1547 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
1548 				  BTR_SEARCH_LEAF, &pcur, &mtr);
1549 	for (i = 0; i + DATA_N_SYS_COLS < (ulint) table->n_cols; i++) {
1550 		const char*	err_msg;
1551 		const char*	name = NULL;
1552 
1553 		rec = btr_pcur_get_rec(&pcur);
1554 
1555 		ut_a(btr_pcur_is_on_user_rec(&pcur));
1556 
1557 		err_msg = dict_load_column_low(table, heap, NULL, NULL,
1558 					       &name, rec);
1559 
1560 		if (err_msg) {
1561 			fprintf(stderr, "InnoDB: %s\n", err_msg);
1562 			ut_error;
1563 		}
1564 
1565 		/* Note: Currently we have one DOC_ID column that is
1566 		shared by all FTS indexes on a table. */
1567 		if (innobase_strcasecmp(name,
1568 					FTS_DOC_ID_COL_NAME) == 0) {
1569 			dict_col_t*	col;
1570 			/* As part of normal loading of tables the
1571 			table->flag is not set for tables with FTS
1572 			till after the FTS indexes are loaded. So we
1573 			create the fts_t instance here if there isn't
1574 			one already created.
1575 
1576 			This case does not arise for table create as
1577 			the flag is set before the table is created. */
1578 			if (table->fts == NULL) {
1579 				table->fts = fts_create(table);
1580 				fts_optimize_add_table(table);
1581 			}
1582 
1583 			ut_a(table->fts->doc_col == ULINT_UNDEFINED);
1584 
1585 			col = dict_table_get_nth_col(table, i);
1586 
1587 			ut_ad(col->len == sizeof(doc_id_t));
1588 
1589 			if (col->prtype & DATA_FTS_DOC_ID) {
1590 				DICT_TF2_FLAG_SET(
1591 					table, DICT_TF2_FTS_HAS_DOC_ID);
1592 				DICT_TF2_FLAG_UNSET(
1593 					table, DICT_TF2_FTS_ADD_DOC_ID);
1594 			}
1595 
1596 			table->fts->doc_col = i;
1597 		}
1598 
1599 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1600 	}
1601 
1602 	btr_pcur_close(&pcur);
1603 	mtr_commit(&mtr);
1604 }
1605 
1606 /** Error message for a delete-marked record in dict_load_field_low() */
1607 static const char* dict_load_field_del = "delete-marked record in SYS_FIELDS";
1608 
1609 /********************************************************************//**
1610 Loads an index field definition from a SYS_FIELDS record to
1611 dict_index_t.
1612 @return error message, or NULL on success */
1613 UNIV_INTERN
1614 const char*
dict_load_field_low(byte * index_id,dict_index_t * index,dict_field_t * sys_field,ulint * pos,byte * last_index_id,mem_heap_t * heap,const rec_t * rec)1615 dict_load_field_low(
1616 /*================*/
1617 	byte*		index_id,	/*!< in/out: index id (8 bytes)
1618 					an "in" value if index != NULL
1619 					and "out" if index == NULL */
1620 	dict_index_t*	index,		/*!< in/out: index, could be NULL
1621 					if we just populate a dict_field_t
1622 					struct with information from
1623 					a SYS_FIELDS record */
1624 	dict_field_t*	sys_field,	/*!< out: dict_field_t to be
1625 					filled */
1626 	ulint*		pos,		/*!< out: Field position */
1627 	byte*		last_index_id,	/*!< in: last index id */
1628 	mem_heap_t*	heap,		/*!< in/out: memory heap
1629 					for temporary storage */
1630 	const rec_t*	rec)		/*!< in: SYS_FIELDS record */
1631 {
1632 	const byte*	field;
1633 	ulint		len;
1634 	ulint		pos_and_prefix_len;
1635 	ulint		prefix_len;
1636 	ibool		first_field;
1637 	ulint		position;
1638 
1639 	/* Either index or sys_field is supplied, not both */
1640 	ut_a((!index) || (!sys_field));
1641 
1642 	if (rec_get_deleted_flag(rec, 0)) {
1643 		return(dict_load_field_del);
1644 	}
1645 
1646 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FIELDS) {
1647 		return("wrong number of columns in SYS_FIELDS record");
1648 	}
1649 
1650 	field = rec_get_nth_field_old(
1651 		rec, DICT_FLD__SYS_FIELDS__INDEX_ID, &len);
1652 	if (len != 8) {
1653 err_len:
1654 		return("incorrect column length in SYS_FIELDS");
1655 	}
1656 
1657 	if (!index) {
1658 		ut_a(last_index_id);
1659 		memcpy(index_id, (const char*) field, 8);
1660 		first_field = memcmp(index_id, last_index_id, 8);
1661 	} else {
1662 		first_field = (index->n_def == 0);
1663 		if (memcmp(field, index_id, 8)) {
1664 			return("SYS_FIELDS.INDEX_ID mismatch");
1665 		}
1666 	}
1667 
1668 	/* The next field stores the field position in the index and a
1669 	possible column prefix length if the index field does not
1670 	contain the whole column. The storage format is like this: if
1671 	there is at least one prefix field in the index, then the HIGH
1672 	2 bytes contain the field number (index->n_def) and the low 2
1673 	bytes the prefix length for the field. Otherwise the field
1674 	number (index->n_def) is contained in the 2 LOW bytes. */
1675 
1676 	field = rec_get_nth_field_old(
1677 		rec, DICT_FLD__SYS_FIELDS__POS, &len);
1678 	if (len != 4) {
1679 		goto err_len;
1680 	}
1681 
1682 	pos_and_prefix_len = mach_read_from_4(field);
1683 
1684 	if (index && UNIV_UNLIKELY
1685 	    ((pos_and_prefix_len & 0xFFFFUL) != index->n_def
1686 	     && (pos_and_prefix_len >> 16 & 0xFFFF) != index->n_def)) {
1687 		return("SYS_FIELDS.POS mismatch");
1688 	}
1689 
1690 	if (first_field || pos_and_prefix_len > 0xFFFFUL) {
1691 		prefix_len = pos_and_prefix_len & 0xFFFFUL;
1692 		position = (pos_and_prefix_len & 0xFFFF0000UL)  >> 16;
1693 	} else {
1694 		prefix_len = 0;
1695 		position = pos_and_prefix_len & 0xFFFFUL;
1696 	}
1697 
1698 	rec_get_nth_field_offs_old(
1699 		rec, DICT_FLD__SYS_FIELDS__DB_TRX_ID, &len);
1700 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1701 		goto err_len;
1702 	}
1703 	rec_get_nth_field_offs_old(
1704 		rec, DICT_FLD__SYS_FIELDS__DB_ROLL_PTR, &len);
1705 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1706 		goto err_len;
1707 	}
1708 
1709 	field = rec_get_nth_field_old(
1710 		rec, DICT_FLD__SYS_FIELDS__COL_NAME, &len);
1711 	if (len == 0 || len == UNIV_SQL_NULL) {
1712 		goto err_len;
1713 	}
1714 
1715 	if (index) {
1716 		dict_mem_index_add_field(
1717 			index, mem_heap_strdupl(heap, (const char*) field, len),
1718 			prefix_len);
1719 	} else {
1720 		ut_a(sys_field);
1721 		ut_a(pos);
1722 
1723 		sys_field->name = mem_heap_strdupl(
1724 			heap, (const char*) field, len);
1725 		sys_field->prefix_len = prefix_len;
1726 		*pos = position;
1727 	}
1728 
1729 	return(NULL);
1730 }
1731 
1732 /********************************************************************//**
1733 Loads definitions for index fields.
1734 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption */
1735 static
1736 ulint
dict_load_fields(dict_index_t * index,mem_heap_t * heap)1737 dict_load_fields(
1738 /*=============*/
1739 	dict_index_t*	index,	/*!< in/out: index whose fields to load */
1740 	mem_heap_t*	heap)	/*!< in: memory heap for temporary storage */
1741 {
1742 	dict_table_t*	sys_fields;
1743 	dict_index_t*	sys_index;
1744 	btr_pcur_t	pcur;
1745 	dtuple_t*	tuple;
1746 	dfield_t*	dfield;
1747 	const rec_t*	rec;
1748 	byte*		buf;
1749 	ulint		i;
1750 	mtr_t		mtr;
1751 	dberr_t		error;
1752 
1753 	ut_ad(mutex_own(&(dict_sys->mutex)));
1754 
1755 	mtr_start(&mtr);
1756 
1757 	sys_fields = dict_table_get_low("SYS_FIELDS");
1758 	sys_index = UT_LIST_GET_FIRST(sys_fields->indexes);
1759 	ut_ad(!dict_table_is_comp(sys_fields));
1760 	ut_ad(name_of_col_is(sys_fields, sys_index,
1761 			     DICT_FLD__SYS_FIELDS__COL_NAME, "COL_NAME"));
1762 
1763 	tuple = dtuple_create(heap, 1);
1764 	dfield = dtuple_get_nth_field(tuple, 0);
1765 
1766 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
1767 	mach_write_to_8(buf, index->id);
1768 
1769 	dfield_set_data(dfield, buf, 8);
1770 	dict_index_copy_types(tuple, sys_index, 1);
1771 
1772 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
1773 				  BTR_SEARCH_LEAF, &pcur, &mtr);
1774 	for (i = 0; i < index->n_fields; i++) {
1775 		const char* err_msg;
1776 
1777 		rec = btr_pcur_get_rec(&pcur);
1778 
1779 		ut_a(btr_pcur_is_on_user_rec(&pcur));
1780 
1781 		err_msg = dict_load_field_low(buf, index, NULL, NULL, NULL,
1782 					      heap, rec);
1783 
1784 		if (err_msg == dict_load_field_del) {
1785 			/* There could be delete marked records in
1786 			SYS_FIELDS because SYS_FIELDS.INDEX_ID can be
1787 			updated by ALTER TABLE ADD INDEX. */
1788 
1789 			goto next_rec;
1790 		} else if (err_msg) {
1791 			fprintf(stderr, "InnoDB: %s\n", err_msg);
1792 			error = DB_CORRUPTION;
1793 			goto func_exit;
1794 		}
1795 next_rec:
1796 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1797 	}
1798 
1799 	error = DB_SUCCESS;
1800 func_exit:
1801 	btr_pcur_close(&pcur);
1802 	mtr_commit(&mtr);
1803 	return(error);
1804 }
1805 
1806 /** Error message for a delete-marked record in dict_load_index_low() */
1807 static const char* dict_load_index_del = "delete-marked record in SYS_INDEXES";
1808 /** Error message for table->id mismatch in dict_load_index_low() */
1809 static const char* dict_load_index_id_err = "SYS_INDEXES.TABLE_ID mismatch";
1810 
1811 /********************************************************************//**
1812 Loads an index definition from a SYS_INDEXES record to dict_index_t.
1813 If allocate=TRUE, we will create a dict_index_t structure and fill it
1814 accordingly. If allocated=FALSE, the dict_index_t will be supplied by
1815 the caller and filled with information read from the record.  @return
1816 error message, or NULL on success */
1817 UNIV_INTERN
1818 const char*
dict_load_index_low(byte * table_id,const char * table_name,mem_heap_t * heap,const rec_t * rec,ibool allocate,dict_index_t ** index)1819 dict_load_index_low(
1820 /*================*/
1821 	byte*		table_id,	/*!< in/out: table id (8 bytes),
1822 					an "in" value if allocate=TRUE
1823 					and "out" when allocate=FALSE */
1824 	const char*	table_name,	/*!< in: table name */
1825 	mem_heap_t*	heap,		/*!< in/out: temporary memory heap */
1826 	const rec_t*	rec,		/*!< in: SYS_INDEXES record */
1827 	ibool		allocate,	/*!< in: TRUE=allocate *index,
1828 					FALSE=fill in a pre-allocated
1829 					*index */
1830 	dict_index_t**	index)		/*!< out,own: index, or NULL */
1831 {
1832 	const byte*	field;
1833 	ulint		len;
1834 	ulint		name_len;
1835 	char*		name_buf;
1836 	index_id_t	id;
1837 	ulint		n_fields;
1838 	ulint		type;
1839 	ulint		space;
1840 
1841 	if (allocate) {
1842 		/* If allocate=TRUE, no dict_index_t will
1843 		be supplied. Initialize "*index" to NULL */
1844 		*index = NULL;
1845 	}
1846 
1847 	if (rec_get_deleted_flag(rec, 0)) {
1848 		return(dict_load_index_del);
1849 	}
1850 
1851 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_INDEXES) {
1852 		return("wrong number of columns in SYS_INDEXES record");
1853 	}
1854 
1855 	field = rec_get_nth_field_old(
1856 		rec, DICT_FLD__SYS_INDEXES__TABLE_ID, &len);
1857 	if (len != 8) {
1858 err_len:
1859 		return("incorrect column length in SYS_INDEXES");
1860 	}
1861 
1862 	if (!allocate) {
1863 		/* We are reading a SYS_INDEXES record. Copy the table_id */
1864 		memcpy(table_id, (const char*) field, 8);
1865 	} else if (memcmp(field, table_id, 8)) {
1866 		/* Caller supplied table_id, verify it is the same
1867 		id as on the index record */
1868 		return(dict_load_index_id_err);
1869 	}
1870 
1871 	field = rec_get_nth_field_old(
1872 		rec, DICT_FLD__SYS_INDEXES__ID, &len);
1873 	if (len != 8) {
1874 		goto err_len;
1875 	}
1876 
1877 	id = mach_read_from_8(field);
1878 
1879 	rec_get_nth_field_offs_old(
1880 		rec, DICT_FLD__SYS_INDEXES__DB_TRX_ID, &len);
1881 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1882 		goto err_len;
1883 	}
1884 	rec_get_nth_field_offs_old(
1885 		rec, DICT_FLD__SYS_INDEXES__DB_ROLL_PTR, &len);
1886 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1887 		goto err_len;
1888 	}
1889 
1890 	field = rec_get_nth_field_old(
1891 		rec, DICT_FLD__SYS_INDEXES__NAME, &name_len);
1892 	if (name_len == UNIV_SQL_NULL) {
1893 		goto err_len;
1894 	}
1895 
1896 	name_buf = mem_heap_strdupl(heap, (const char*) field,
1897 				    name_len);
1898 
1899 	field = rec_get_nth_field_old(
1900 		rec, DICT_FLD__SYS_INDEXES__N_FIELDS, &len);
1901 	if (len != 4) {
1902 		goto err_len;
1903 	}
1904 	n_fields = mach_read_from_4(field);
1905 
1906 	field = rec_get_nth_field_old(
1907 		rec, DICT_FLD__SYS_INDEXES__TYPE, &len);
1908 	if (len != 4) {
1909 		goto err_len;
1910 	}
1911 	type = mach_read_from_4(field);
1912 	if (type & (~0U << DICT_IT_BITS)) {
1913 		return("unknown SYS_INDEXES.TYPE bits");
1914 	}
1915 
1916 	field = rec_get_nth_field_old(
1917 		rec, DICT_FLD__SYS_INDEXES__SPACE, &len);
1918 	if (len != 4) {
1919 		goto err_len;
1920 	}
1921 	space = mach_read_from_4(field);
1922 
1923 	field = rec_get_nth_field_old(
1924 		rec, DICT_FLD__SYS_INDEXES__PAGE_NO, &len);
1925 	if (len != 4) {
1926 		goto err_len;
1927 	}
1928 
1929 	if (allocate) {
1930 		*index = dict_mem_index_create(table_name, name_buf,
1931 					       space, type, n_fields);
1932 	} else {
1933 		ut_a(*index);
1934 
1935 		dict_mem_fill_index_struct(*index, NULL, NULL, name_buf,
1936 					   space, type, n_fields);
1937 	}
1938 
1939 	(*index)->id = id;
1940 	(*index)->page = mach_read_from_4(field);
1941 	btr_search_index_init(*index);
1942 	ut_ad((*index)->page);
1943 
1944 	return(NULL);
1945 }
1946 
1947 /********************************************************************//**
1948 Loads definitions for table indexes. Adds them to the data dictionary
1949 cache.
1950 @return DB_SUCCESS if ok, DB_CORRUPTION if corruption of dictionary
1951 table or DB_UNSUPPORTED if table has unknown index type */
1952 static MY_ATTRIBUTE((nonnull))
1953 dberr_t
dict_load_indexes(dict_table_t * table,mem_heap_t * heap,dict_err_ignore_t ignore_err)1954 dict_load_indexes(
1955 /*==============*/
1956 	dict_table_t*	table,	/*!< in/out: table */
1957 	mem_heap_t*	heap,	/*!< in: memory heap for temporary storage */
1958 	dict_err_ignore_t ignore_err)
1959 				/*!< in: error to be ignored when
1960 				loading the index definition */
1961 {
1962 	dict_table_t*	sys_indexes;
1963 	dict_index_t*	sys_index;
1964 	btr_pcur_t	pcur;
1965 	dtuple_t*	tuple;
1966 	dfield_t*	dfield;
1967 	const rec_t*	rec;
1968 	byte*		buf;
1969 	mtr_t		mtr;
1970 	dberr_t		error = DB_SUCCESS;
1971 
1972 	ut_ad(mutex_own(&(dict_sys->mutex)));
1973 
1974 	mtr_start(&mtr);
1975 
1976 	sys_indexes = dict_table_get_low("SYS_INDEXES");
1977 	sys_index = UT_LIST_GET_FIRST(sys_indexes->indexes);
1978 	ut_ad(!dict_table_is_comp(sys_indexes));
1979 	ut_ad(name_of_col_is(sys_indexes, sys_index,
1980 			     DICT_FLD__SYS_INDEXES__NAME, "NAME"));
1981 	ut_ad(name_of_col_is(sys_indexes, sys_index,
1982 			     DICT_FLD__SYS_INDEXES__PAGE_NO, "PAGE_NO"));
1983 
1984 	tuple = dtuple_create(heap, 1);
1985 	dfield = dtuple_get_nth_field(tuple, 0);
1986 
1987 	buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
1988 	mach_write_to_8(buf, table->id);
1989 
1990 	dfield_set_data(dfield, buf, 8);
1991 	dict_index_copy_types(tuple, sys_index, 1);
1992 
1993 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
1994 				  BTR_SEARCH_LEAF, &pcur, &mtr);
1995 	for (;;) {
1996 		dict_index_t*	index = NULL;
1997 		const char*	err_msg;
1998 
1999 		if (!btr_pcur_is_on_user_rec(&pcur)) {
2000 
2001 			/* We should allow the table to open even
2002 			without index when DICT_ERR_IGNORE_CORRUPT is set.
2003 			DICT_ERR_IGNORE_CORRUPT is currently only set
2004 			for drop table */
2005 			if (dict_table_get_first_index(table) == NULL
2006 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2007 				ib_logf(IB_LOG_LEVEL_WARN,
2008 					"Cannot load table %s "
2009 					"because it has no indexes in "
2010 					"InnoDB internal data dictionary.",
2011 					table->name);
2012 				error = DB_CORRUPTION;
2013 				goto func_exit;
2014 			}
2015 
2016 			break;
2017 		}
2018 
2019 		rec = btr_pcur_get_rec(&pcur);
2020 
2021 		if ((ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2022 		    && rec_get_n_fields_old(rec)
2023 		    == DICT_NUM_FIELDS__SYS_INDEXES) {
2024 			const byte*	field;
2025 			ulint		len;
2026 			field = rec_get_nth_field_old(
2027 				rec, DICT_FLD__SYS_INDEXES__NAME, &len);
2028 
2029 			if (len != UNIV_SQL_NULL
2030 			    && char(*field) == char(TEMP_INDEX_PREFIX)) {
2031 				/* Skip indexes whose name starts with
2032 				TEMP_INDEX_PREFIX, because they will
2033 				be dropped during crash recovery. */
2034 				goto next_rec;
2035 			}
2036 		}
2037 
2038 		err_msg = dict_load_index_low(buf, table->name, heap, rec,
2039 					      TRUE, &index);
2040 		ut_ad((index == NULL && err_msg != NULL)
2041 		      || (index != NULL && err_msg == NULL));
2042 
2043 		if (err_msg == dict_load_index_id_err) {
2044 			/* TABLE_ID mismatch means that we have
2045 			run out of index definitions for the table. */
2046 
2047 			if (dict_table_get_first_index(table) == NULL
2048 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2049 				ib_logf(IB_LOG_LEVEL_WARN,
2050 					"Failed to load the "
2051 					"clustered index for table %s "
2052 					"because of the following error: %s. "
2053 					"Refusing to load the rest of the "
2054 					"indexes (if any) and the whole table "
2055 					"altogether.", table->name, err_msg);
2056 				error = DB_CORRUPTION;
2057 				goto func_exit;
2058 			}
2059 
2060 			break;
2061 		} else if (err_msg == dict_load_index_del) {
2062 			/* Skip delete-marked records. */
2063 			goto next_rec;
2064 		} else if (err_msg) {
2065 			fprintf(stderr, "InnoDB: %s\n", err_msg);
2066 			if (ignore_err & DICT_ERR_IGNORE_CORRUPT) {
2067 				goto next_rec;
2068 			}
2069 			error = DB_CORRUPTION;
2070 			goto func_exit;
2071 		}
2072 
2073 		ut_ad(index);
2074 
2075 		/* Check whether the index is corrupted */
2076 		if (dict_index_is_corrupted(index)) {
2077 			ut_print_timestamp(stderr);
2078 			fputs("  InnoDB: ", stderr);
2079 			dict_index_name_print(stderr, NULL, index);
2080 			fputs(" is corrupted\n", stderr);
2081 
2082 			if (!srv_load_corrupted
2083 			    && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)
2084 			    && dict_index_is_clust(index)) {
2085 				dict_mem_index_free(index);
2086 
2087 				error = DB_INDEX_CORRUPT;
2088 				goto func_exit;
2089 			} else {
2090 				/* We will load the index if
2091 				1) srv_load_corrupted is TRUE
2092 				2) ignore_err is set with
2093 				DICT_ERR_IGNORE_CORRUPT
2094 				3) if the index corrupted is a secondary
2095 				index */
2096 				ut_print_timestamp(stderr);
2097 				fputs("  InnoDB: load corrupted index ", stderr);
2098 				dict_index_name_print(stderr, NULL, index);
2099 				putc('\n', stderr);
2100 			}
2101 		}
2102 
2103 		if (index->type & DICT_FTS
2104 		    && !DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS)) {
2105 			/* This should have been created by now. */
2106 			ut_a(table->fts != NULL);
2107 			DICT_TF2_FLAG_SET(table, DICT_TF2_FTS);
2108 		}
2109 
2110 		/* We check for unsupported types first, so that the
2111 		subsequent checks are relevant for the supported types. */
2112 		if (index->type & ~(DICT_CLUSTERED | DICT_UNIQUE
2113 				    | DICT_CORRUPT | DICT_FTS)) {
2114 			ib_logf(IB_LOG_LEVEL_ERROR,
2115 				"Unknown type %lu of index %s of table %s",
2116 				(ulong) index->type, index->name, table->name);
2117 
2118 			error = DB_UNSUPPORTED;
2119 			dict_mem_index_free(index);
2120 			goto func_exit;
2121 		} else if (index->page == FIL_NULL
2122 			   && !table->ibd_file_missing
2123 			   && (!(index->type & DICT_FTS))) {
2124 
2125 			fprintf(stderr,
2126 				"InnoDB: Error: trying to load index %s"
2127 				" for table %s\n"
2128 				"InnoDB: but the index tree has been freed!\n",
2129 				index->name, table->name);
2130 
2131 			if (ignore_err & DICT_ERR_IGNORE_INDEX_ROOT) {
2132 				/* If caller can tolerate this error,
2133 				we will continue to load the index and
2134 				let caller deal with this error. However
2135 				mark the index and table corrupted. We
2136 				only need to mark such in the index
2137 				dictionary cache for such metadata corruption,
2138 				since we would always be able to set it
2139 				when loading the dictionary cache */
2140 				dict_set_corrupted_index_cache_only(
2141 					index, table);
2142 
2143 				fprintf(stderr,
2144 					"InnoDB: Index is corrupt but forcing"
2145 					" load into data dictionary\n");
2146 			} else {
2147 corrupted:
2148 				dict_mem_index_free(index);
2149 				error = DB_CORRUPTION;
2150 				goto func_exit;
2151 			}
2152 		} else if (!dict_index_is_clust(index)
2153 			   && NULL == dict_table_get_first_index(table)) {
2154 
2155 			fputs("InnoDB: Error: trying to load index ",
2156 			      stderr);
2157 			ut_print_name(stderr, NULL, FALSE, index->name);
2158 			fputs(" for table ", stderr);
2159 			ut_print_name(stderr, NULL, TRUE, table->name);
2160 			fputs("\nInnoDB: but the first index"
2161 			      " is not clustered!\n", stderr);
2162 
2163 			goto corrupted;
2164 		} else if (dict_is_sys_table(table->id)
2165 			   && (dict_index_is_clust(index)
2166 			       || ((table == dict_sys->sys_tables)
2167 				   && !strcmp("ID_IND", index->name)))) {
2168 
2169 			/* The index was created in memory already at booting
2170 			of the database server */
2171 			dict_mem_index_free(index);
2172 		} else {
2173 			dict_load_fields(index, heap);
2174 
2175 			error = dict_index_add_to_cache(
2176 				table, index, index->page, FALSE);
2177 
2178 			/* The data dictionary tables should never contain
2179 			invalid index definitions.  If we ignored this error
2180 			and simply did not load this index definition, the
2181 			.frm file would disagree with the index definitions
2182 			inside InnoDB. */
2183 			if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
2184 
2185 				goto func_exit;
2186 			}
2187 		}
2188 next_rec:
2189 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2190 	}
2191 
2192 	/* If the table contains FTS indexes, populate table->fts->indexes */
2193 	if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS)) {
2194 		/* table->fts->indexes should have been created. */
2195 		ut_a(table->fts->indexes != NULL);
2196 		dict_table_get_all_fts_indexes(table, table->fts->indexes);
2197 	}
2198 
2199 func_exit:
2200 	btr_pcur_close(&pcur);
2201 	mtr_commit(&mtr);
2202 
2203 	return(error);
2204 }
2205 
2206 /********************************************************************//**
2207 Loads a table definition from a SYS_TABLES record to dict_table_t.
2208 Does not load any columns or indexes.
2209 @return error message, or NULL on success */
2210 UNIV_INTERN
2211 const char*
dict_load_table_low(const char * name,const rec_t * rec,dict_table_t ** table)2212 dict_load_table_low(
2213 /*================*/
2214 	const char*	name,		/*!< in: table name */
2215 	const rec_t*	rec,		/*!< in: SYS_TABLES record */
2216 	dict_table_t**	table)		/*!< out,own: table, or NULL */
2217 {
2218 	const byte*	field;
2219 	ulint		len;
2220 	ulint		space;
2221 	ulint		n_cols;
2222 	ulint		flags = 0;
2223 	ulint		flags2;
2224 
2225 	if (rec_get_deleted_flag(rec, 0)) {
2226 		return("delete-marked record in SYS_TABLES");
2227 	}
2228 
2229 	if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLES) {
2230 		return("wrong number of columns in SYS_TABLES record");
2231 	}
2232 
2233 	rec_get_nth_field_offs_old(
2234 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
2235 	if (len == 0 || len == UNIV_SQL_NULL) {
2236 err_len:
2237 		return("incorrect column length in SYS_TABLES");
2238 	}
2239 	rec_get_nth_field_offs_old(
2240 		rec, DICT_FLD__SYS_TABLES__DB_TRX_ID, &len);
2241 	if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2242 		goto err_len;
2243 	}
2244 	rec_get_nth_field_offs_old(
2245 		rec, DICT_FLD__SYS_TABLES__DB_ROLL_PTR, &len);
2246 	if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2247 		goto err_len;
2248 	}
2249 
2250 	rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__ID, &len);
2251 	if (len != 8) {
2252 		goto err_len;
2253 	}
2254 
2255 	field = rec_get_nth_field_old(
2256 		rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
2257 	if (len != 4) {
2258 		goto err_len;
2259 	}
2260 
2261 	n_cols = mach_read_from_4(field);
2262 
2263 	rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__TYPE, &len);
2264 	if (len != 4) {
2265 		goto err_len;
2266 	}
2267 
2268 	rec_get_nth_field_offs_old(
2269 		rec, DICT_FLD__SYS_TABLES__MIX_ID, &len);
2270 	if (len != 8) {
2271 		goto err_len;
2272 	}
2273 
2274 	field = rec_get_nth_field_old(
2275 		rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
2276 	if (len != 4) {
2277 		goto err_len;
2278 	}
2279 
2280 	/* MIX_LEN may hold additional flags in post-antelope file formats. */
2281 	flags2 = mach_read_from_4(field);
2282 
2283 	/* DICT_TF2_FTS will be set when indexes is being loaded */
2284 	flags2 &= ~DICT_TF2_FTS;
2285 
2286 	rec_get_nth_field_offs_old(
2287 		rec, DICT_FLD__SYS_TABLES__CLUSTER_ID, &len);
2288 	if (len != UNIV_SQL_NULL) {
2289 		goto err_len;
2290 	}
2291 
2292 	field = rec_get_nth_field_old(
2293 		rec, DICT_FLD__SYS_TABLES__SPACE, &len);
2294 	if (len != 4) {
2295 		goto err_len;
2296 	}
2297 
2298 	space = mach_read_from_4(field);
2299 
2300 	/* Check if the tablespace exists and has the right name */
2301 	flags = dict_sys_tables_get_flags(rec);
2302 
2303 	if (UNIV_UNLIKELY(flags == ULINT_UNDEFINED)) {
2304 		field = rec_get_nth_field_old(
2305 			rec, DICT_FLD__SYS_TABLES__TYPE, &len);
2306 		ut_ad(len == 4); /* this was checked earlier */
2307 		flags = mach_read_from_4(field);
2308 
2309 		ut_print_timestamp(stderr);
2310 		fputs("  InnoDB: Error: table ", stderr);
2311 		ut_print_filename(stderr, name);
2312 		fprintf(stderr, "\n"
2313 			"InnoDB: in InnoDB data dictionary"
2314 			" has unknown type %lx.\n",
2315 			(ulong) flags);
2316 		return("incorrect flags in SYS_TABLES");
2317 	}
2318 
2319 	/* The high-order bit of N_COLS is the "compact format" flag.
2320 	For tables in that format, MIX_LEN may hold additional flags. */
2321 	if (n_cols & DICT_N_COLS_COMPACT) {
2322 		ut_ad(flags & DICT_TF_COMPACT);
2323 
2324 		if (flags2 & ~DICT_TF2_BIT_MASK) {
2325 			ut_print_timestamp(stderr);
2326 			fputs("  InnoDB: Warning: table ", stderr);
2327 			ut_print_filename(stderr, name);
2328 			fprintf(stderr, "\n"
2329 				"InnoDB: in InnoDB data dictionary"
2330 				" has unknown flags %lx.\n",
2331 				(ulong) flags2);
2332 
2333 			/* Clean it up and keep going */
2334 			flags2 &= DICT_TF2_BIT_MASK;
2335 		}
2336 	} else {
2337 		/* Do not trust the MIX_LEN field when the
2338 		row format is Redundant. */
2339 		flags2 = 0;
2340 	}
2341 
2342 	/* See if the tablespace is available. */
2343 	*table = dict_mem_table_create(
2344 		name, space, n_cols & ~DICT_N_COLS_COMPACT, flags, flags2);
2345 
2346 	field = rec_get_nth_field_old(rec, DICT_FLD__SYS_TABLES__ID, &len);
2347 	ut_ad(len == 8); /* this was checked earlier */
2348 
2349 	(*table)->id = mach_read_from_8(field);
2350 
2351 	(*table)->ibd_file_missing = FALSE;
2352 
2353 	return(NULL);
2354 }
2355 
2356 /********************************************************************//**
2357 Using the table->heap, copy the null-terminated filepath into
2358 table->data_dir_path and replace the 'databasename/tablename.ibd'
2359 portion with 'tablename'.
2360 This allows SHOW CREATE TABLE to return the correct DATA DIRECTORY path.
2361 Make this data directory path only if it has not yet been saved. */
2362 UNIV_INTERN
2363 void
dict_save_data_dir_path(dict_table_t * table,char * filepath)2364 dict_save_data_dir_path(
2365 /*====================*/
2366 	dict_table_t*	table,		/*!< in/out: table */
2367 	char*		filepath)	/*!< in: filepath of tablespace */
2368 {
2369 	ut_ad(mutex_own(&(dict_sys->mutex)));
2370 	ut_a(DICT_TF_HAS_DATA_DIR(table->flags));
2371 
2372 	ut_a(!table->data_dir_path);
2373 	ut_a(filepath);
2374 
2375 	/* Be sure this filepath is not the default filepath. */
2376 	char*	default_filepath = fil_make_ibd_name(table->name, false);
2377 	if (strcmp(filepath, default_filepath)) {
2378 		ulint pathlen = strlen(filepath);
2379 		ut_a(pathlen < OS_FILE_MAX_PATH);
2380 		ut_a(0 == strcmp(filepath + pathlen - 4, ".ibd"));
2381 
2382 		table->data_dir_path = mem_heap_strdup(table->heap, filepath);
2383 		os_file_make_data_dir_path(table->data_dir_path);
2384 	} else {
2385 		/* This does not change SYS_DATAFILES or SYS_TABLES
2386 		or FSP_FLAGS on the header page of the tablespace,
2387 		but it makes dict_table_t consistent */
2388 		table->flags &= ~DICT_TF_MASK_DATA_DIR;
2389 	}
2390 	mem_free(default_filepath);
2391 }
2392 
2393 /*****************************************************************//**
2394 Make sure the data_file_name is saved in dict_table_t if needed. Try to
2395 read it from the file dictionary first, then from SYS_DATAFILES. */
2396 UNIV_INTERN
2397 void
dict_get_and_save_data_dir_path(dict_table_t * table,bool dict_mutex_own)2398 dict_get_and_save_data_dir_path(
2399 /*============================*/
2400 	dict_table_t*	table,		/*!< in/out: table */
2401 	bool		dict_mutex_own)	/*!< in: true if dict_sys->mutex
2402 					is owned already */
2403 {
2404 	if (DICT_TF_HAS_DATA_DIR(table->flags)
2405 	    && (!table->data_dir_path)) {
2406 		char*	path = fil_space_get_first_path(table->space);
2407 
2408 		if (!dict_mutex_own) {
2409 			dict_mutex_enter_for_mysql();
2410 		}
2411 		if (!path) {
2412 			path = dict_get_first_path(
2413 				table->space, table->name);
2414 		}
2415 
2416 		if (path) {
2417 			dict_save_data_dir_path(table, path);
2418 			mem_free(path);
2419 		}
2420 
2421 		if (!dict_mutex_own) {
2422 			dict_mutex_exit_for_mysql();
2423 		}
2424 	}
2425 }
2426 
2427 /********************************************************************//**
2428 Loads a table definition and also all its index definitions, and also
2429 the cluster definition if the table is a member in a cluster. Also loads
2430 all foreign key constraints where the foreign key is in the table or where
2431 a foreign key references columns in this table. Adds all these to the data
2432 dictionary cache.
2433 @return table, NULL if does not exist; if the table is stored in an
2434 .ibd file, but the file does not exist, then we set the
2435 ibd_file_missing flag TRUE in the table object we return */
2436 UNIV_INTERN
2437 dict_table_t*
dict_load_table(const char * name,ibool cached,dict_err_ignore_t ignore_err)2438 dict_load_table(
2439 /*============*/
2440 	const char*	name,	/*!< in: table name in the
2441 				databasename/tablename format */
2442 	ibool		cached,	/*!< in: TRUE=add to cache, FALSE=do not */
2443 	dict_err_ignore_t ignore_err)
2444 				/*!< in: error to be ignored when loading
2445 				table and its indexes' definition */
2446 {
2447 	dberr_t		err;
2448 	dict_table_t*	table;
2449 	dict_table_t*	sys_tables;
2450 	btr_pcur_t	pcur;
2451 	dict_index_t*	sys_index;
2452 	dtuple_t*	tuple;
2453 	mem_heap_t*	heap;
2454 	dfield_t*	dfield;
2455 	const rec_t*	rec;
2456 	const byte*	field;
2457 	ulint		len;
2458 	char*		filepath = NULL;
2459 	const char*	err_msg;
2460 	mtr_t		mtr;
2461 
2462 	ut_ad(mutex_own(&(dict_sys->mutex)));
2463 
2464 	heap = mem_heap_create(32000);
2465 
2466 	mtr_start(&mtr);
2467 
2468 	sys_tables = dict_table_get_low("SYS_TABLES");
2469 	sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
2470 	ut_ad(!dict_table_is_comp(sys_tables));
2471 	ut_ad(name_of_col_is(sys_tables, sys_index,
2472 			     DICT_FLD__SYS_TABLES__ID, "ID"));
2473 	ut_ad(name_of_col_is(sys_tables, sys_index,
2474 			     DICT_FLD__SYS_TABLES__N_COLS, "N_COLS"));
2475 	ut_ad(name_of_col_is(sys_tables, sys_index,
2476 			     DICT_FLD__SYS_TABLES__TYPE, "TYPE"));
2477 	ut_ad(name_of_col_is(sys_tables, sys_index,
2478 			     DICT_FLD__SYS_TABLES__MIX_LEN, "MIX_LEN"));
2479 	ut_ad(name_of_col_is(sys_tables, sys_index,
2480 			     DICT_FLD__SYS_TABLES__SPACE, "SPACE"));
2481 
2482 	tuple = dtuple_create(heap, 1);
2483 	dfield = dtuple_get_nth_field(tuple, 0);
2484 
2485 	dfield_set_data(dfield, name, ut_strlen(name));
2486 	dict_index_copy_types(tuple, sys_index, 1);
2487 
2488 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2489 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2490 	rec = btr_pcur_get_rec(&pcur);
2491 
2492 	if (!btr_pcur_is_on_user_rec(&pcur)
2493 	    || rec_get_deleted_flag(rec, 0)) {
2494 		/* Not found */
2495 err_exit:
2496 		btr_pcur_close(&pcur);
2497 		mtr_commit(&mtr);
2498 		mem_heap_free(heap);
2499 
2500 		return(NULL);
2501 	}
2502 
2503 	field = rec_get_nth_field_old(
2504 		rec, DICT_FLD__SYS_TABLES__NAME, &len);
2505 
2506 	/* Check if the table name in record is the searched one */
2507 	if (len != ut_strlen(name) || ut_memcmp(name, field, len) != 0) {
2508 
2509 		goto err_exit;
2510 	}
2511 
2512 	err_msg = dict_load_table_low(name, rec, &table);
2513 
2514 	if (err_msg) {
2515 
2516 		ut_print_timestamp(stderr);
2517 		fprintf(stderr, "  InnoDB: %s\n", err_msg);
2518 		goto err_exit;
2519 	}
2520 
2521 	char	table_name[MAX_FULL_NAME_LEN + 1];
2522 
2523 	innobase_format_name(table_name, sizeof(table_name), name, FALSE);
2524 
2525 	btr_pcur_close(&pcur);
2526 	mtr_commit(&mtr);
2527 
2528 	if (table->space == 0) {
2529 		/* The system tablespace is always available. */
2530 	} else if (table->flags2 & DICT_TF2_DISCARDED) {
2531 
2532 		ib_logf(IB_LOG_LEVEL_WARN,
2533 			"Table '%s' tablespace is set as discarded.",
2534 			table_name);
2535 
2536 		table->ibd_file_missing = TRUE;
2537 
2538 	} else if (!fil_space_for_table_exists_in_mem(
2539 			table->space, name, FALSE, FALSE, true, heap,
2540 			table->id)) {
2541 
2542 		if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_TEMPORARY)) {
2543 			/* Do not bother to retry opening temporary tables. */
2544 			table->ibd_file_missing = TRUE;
2545 
2546 		} else {
2547 			if (!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)) {
2548 				ib_logf(IB_LOG_LEVEL_ERROR,
2549 					"Failed to find tablespace for "
2550 					"table '%s' in the cache. "
2551 					"Attempting to load the tablespace "
2552 					"with space id %lu.",
2553 					table_name, (ulong) table->space);
2554 			}
2555 
2556 			/* Use the remote filepath if needed. */
2557 			if (DICT_TF_HAS_DATA_DIR(table->flags)) {
2558 				/* This needs to be added to the table
2559 				from SYS_DATAFILES */
2560 				dict_get_and_save_data_dir_path(table, true);
2561 
2562 				if (table->data_dir_path) {
2563 					filepath = os_file_make_remote_pathname(
2564 						table->data_dir_path,
2565 						table->name, "ibd");
2566 				}
2567 			}
2568 
2569 			/* Try to open the tablespace.  We set the
2570 			2nd param (fix_dict = false) here because we
2571 			do not have an x-lock on dict_operation_lock */
2572 			err = fil_open_single_table_tablespace(
2573 				true, false, table->space,
2574 				dict_tf_to_fsp_flags(table->flags),
2575 				name, filepath);
2576 
2577 			if (err != DB_SUCCESS) {
2578 				/* We failed to find a sensible
2579 				tablespace file */
2580 
2581 				table->ibd_file_missing = TRUE;
2582 			}
2583 			if (filepath) {
2584 				mem_free(filepath);
2585 			}
2586 		}
2587 	}
2588 
2589 	dict_load_columns(table, heap);
2590 
2591 	if (cached) {
2592 		dict_table_add_to_cache(table, TRUE, heap);
2593 	} else {
2594 		dict_table_add_system_columns(table, heap);
2595 	}
2596 
2597 	mem_heap_empty(heap);
2598 
2599 	/* If there is no tablespace for the table then we only need to
2600 	load the index definitions. So that we can IMPORT the tablespace
2601 	later. When recovering table locks for resurrected incomplete
2602 	transactions, the tablespace should exist, because DDL operations
2603 	were not allowed while the table is being locked by a transaction. */
2604 	dict_err_ignore_t index_load_err =
2605 		!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2606 		&& table->ibd_file_missing
2607 		? DICT_ERR_IGNORE_ALL
2608 		: ignore_err;
2609 	err = dict_load_indexes(table, heap, index_load_err);
2610 
2611 	if (err == DB_INDEX_CORRUPT) {
2612 		/* Refuse to load the table if the table has a corrupted
2613 		cluster index */
2614 		if (!srv_load_corrupted) {
2615 			fprintf(stderr, "InnoDB: Error: Load table ");
2616 			ut_print_name(stderr, NULL, TRUE, table->name);
2617 			fprintf(stderr, " failed, the table has corrupted"
2618 					" clustered indexes. Turn on"
2619 					" 'innodb_force_load_corrupted'"
2620 					" to drop it\n");
2621 
2622 			dict_table_remove_from_cache(table);
2623 			table = NULL;
2624 			goto func_exit;
2625 		} else {
2626 			dict_index_t*	clust_index;
2627 			clust_index = dict_table_get_first_index(table);
2628 
2629 			if (dict_index_is_corrupted(clust_index)) {
2630 				table->corrupted = TRUE;
2631 			}
2632 		}
2633 	}
2634 
2635 	/* Initialize table foreign_child value. Its value could be
2636 	changed when dict_load_foreigns() is called below */
2637 	table->fk_max_recusive_level = 0;
2638 
2639 	/* If the force recovery flag is set, we open the table irrespective
2640 	of the error condition, since the user may want to dump data from the
2641 	clustered index. However we load the foreign key information only if
2642 	all indexes were loaded. */
2643 	if (!cached || table->ibd_file_missing) {
2644 		/* Don't attempt to load the indexes from disk. */
2645 	} else if (err == DB_SUCCESS) {
2646 		err = dict_load_foreigns(table->name, NULL, true, true,
2647 					 ignore_err);
2648 
2649 		if (err != DB_SUCCESS) {
2650 			ib_logf(IB_LOG_LEVEL_WARN,
2651 				"Load table '%s' failed, the table has missing "
2652 				"foreign key indexes. Turn off "
2653 				"'foreign_key_checks' and try again.",
2654 				table->name);
2655 
2656 			dict_table_remove_from_cache(table);
2657 			table = NULL;
2658 		} else {
2659 			table->fk_max_recusive_level = 0;
2660 		}
2661 	} else {
2662 		dict_index_t*   index;
2663 
2664 		/* Make sure that at least the clustered index was loaded.
2665 		Otherwise refuse to load the table */
2666 		index = dict_table_get_first_index(table);
2667 
2668 		if (!srv_force_recovery
2669 		    || !index
2670 		    || !dict_index_is_clust(index)) {
2671 
2672 			dict_table_remove_from_cache(table);
2673 			table = NULL;
2674 
2675 		} else if (dict_index_is_corrupted(index)
2676 			   && !table->ibd_file_missing) {
2677 
2678 			/* It is possible we force to load a corrupted
2679 			clustered index if srv_load_corrupted is set.
2680 			Mark the table as corrupted in this case */
2681 			table->corrupted = TRUE;
2682 		}
2683 	}
2684 
2685 func_exit:
2686 	mem_heap_free(heap);
2687 
2688 	ut_ad(!table
2689 	      || ignore_err != DICT_ERR_IGNORE_NONE
2690 	      || table->ibd_file_missing
2691 	      || !table->corrupted);
2692 
2693 	if (table && table->fts) {
2694 		if (!(dict_table_has_fts_index(table)
2695 		      || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)
2696 		      || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID))) {
2697 			/* the table->fts could be created in dict_load_column
2698 			when a user defined FTS_DOC_ID is present, but no
2699 			FTS */
2700 			fts_optimize_remove_table(table);
2701 			fts_free(table);
2702 		} else {
2703 			fts_optimize_add_table(table);
2704 		}
2705 	}
2706 
2707 	ut_ad(err != DB_SUCCESS || dict_foreign_set_validate(*table));
2708 
2709 	return(table);
2710 }
2711 
2712 /***********************************************************************//**
2713 Loads a table object based on the table id.
2714 @return	table; NULL if table does not exist */
2715 UNIV_INTERN
2716 dict_table_t*
dict_load_table_on_id(table_id_t table_id,dict_err_ignore_t ignore_err)2717 dict_load_table_on_id(
2718 /*==================*/
2719 	table_id_t		table_id,	/*!< in: table id */
2720 	dict_err_ignore_t	ignore_err)	/*!< in: errors to ignore
2721 						when loading the table */
2722 {
2723 	byte		id_buf[8];
2724 	btr_pcur_t	pcur;
2725 	mem_heap_t*	heap;
2726 	dtuple_t*	tuple;
2727 	dfield_t*	dfield;
2728 	dict_index_t*	sys_table_ids;
2729 	dict_table_t*	sys_tables;
2730 	const rec_t*	rec;
2731 	const byte*	field;
2732 	ulint		len;
2733 	dict_table_t*	table;
2734 	mtr_t		mtr;
2735 
2736 	ut_ad(mutex_own(&(dict_sys->mutex)));
2737 
2738 	table = NULL;
2739 
2740 	/* NOTE that the operation of this function is protected by
2741 	the dictionary mutex, and therefore no deadlocks can occur
2742 	with other dictionary operations. */
2743 
2744 	mtr_start(&mtr);
2745 	/*---------------------------------------------------*/
2746 	/* Get the secondary index based on ID for table SYS_TABLES */
2747 	sys_tables = dict_sys->sys_tables;
2748 	sys_table_ids = dict_table_get_next_index(
2749 		dict_table_get_first_index(sys_tables));
2750 	ut_ad(!dict_table_is_comp(sys_tables));
2751 	ut_ad(!dict_index_is_clust(sys_table_ids));
2752 	heap = mem_heap_create(256);
2753 
2754 	tuple  = dtuple_create(heap, 1);
2755 	dfield = dtuple_get_nth_field(tuple, 0);
2756 
2757 	/* Write the table id in byte format to id_buf */
2758 	mach_write_to_8(id_buf, table_id);
2759 
2760 	dfield_set_data(dfield, id_buf, 8);
2761 	dict_index_copy_types(tuple, sys_table_ids, 1);
2762 
2763 	btr_pcur_open_on_user_rec(sys_table_ids, tuple, PAGE_CUR_GE,
2764 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2765 
2766 	rec = btr_pcur_get_rec(&pcur);
2767 
2768 	if (page_rec_is_user_rec(rec)) {
2769 		/*---------------------------------------------------*/
2770 		/* Now we have the record in the secondary index
2771 		containing the table ID and NAME */
2772 check_rec:
2773 		field = rec_get_nth_field_old(
2774 			rec, DICT_FLD__SYS_TABLE_IDS__ID, &len);
2775 		ut_ad(len == 8);
2776 
2777 		/* Check if the table id in record is the one searched for */
2778 		if (table_id == mach_read_from_8(field)) {
2779 			if (rec_get_deleted_flag(rec, 0)) {
2780 				/* Until purge has completed, there
2781 				may be delete-marked duplicate records
2782 				for the same SYS_TABLES.ID, but different
2783 				SYS_TABLES.NAME. */
2784 				while (btr_pcur_move_to_next(&pcur, &mtr)) {
2785 					rec = btr_pcur_get_rec(&pcur);
2786 
2787 					if (page_rec_is_user_rec(rec)) {
2788 						goto check_rec;
2789 					}
2790 				}
2791 			} else {
2792 				/* Now we get the table name from the record */
2793 				field = rec_get_nth_field_old(rec,
2794 					DICT_FLD__SYS_TABLE_IDS__NAME, &len);
2795 				/* Load the table definition to memory */
2796 				table = dict_load_table(
2797 					mem_heap_strdupl(
2798 						heap, (char*) field, len),
2799 					TRUE, ignore_err);
2800 			}
2801 		}
2802 	}
2803 
2804 	btr_pcur_close(&pcur);
2805 	mtr_commit(&mtr);
2806 	mem_heap_free(heap);
2807 
2808 	return(table);
2809 }
2810 
2811 /********************************************************************//**
2812 This function is called when the database is booted. Loads system table
2813 index definitions except for the clustered index which is added to the
2814 dictionary cache at booting before calling this function. */
2815 UNIV_INTERN
2816 void
dict_load_sys_table(dict_table_t * table)2817 dict_load_sys_table(
2818 /*================*/
2819 	dict_table_t*	table)	/*!< in: system table */
2820 {
2821 	mem_heap_t*	heap;
2822 
2823 	ut_ad(mutex_own(&(dict_sys->mutex)));
2824 
2825 	heap = mem_heap_create(1000);
2826 
2827 	dict_load_indexes(table, heap, DICT_ERR_IGNORE_NONE);
2828 
2829 	mem_heap_free(heap);
2830 }
2831 
2832 /********************************************************************//**
2833 Loads foreign key constraint col names (also for the referenced table).
2834 Members that must be set (and valid) in foreign:
2835 foreign->heap
2836 foreign->n_fields
2837 foreign->id ('\0'-terminated)
2838 Members that will be created and set by this function:
2839 foreign->foreign_col_names[i]
2840 foreign->referenced_col_names[i]
2841 (for i=0..foreign->n_fields-1) */
2842 static
2843 void
dict_load_foreign_cols(dict_foreign_t * foreign)2844 dict_load_foreign_cols(
2845 /*===================*/
2846 	dict_foreign_t*	foreign)/*!< in/out: foreign constraint object */
2847 {
2848 	dict_table_t*	sys_foreign_cols;
2849 	dict_index_t*	sys_index;
2850 	btr_pcur_t	pcur;
2851 	dtuple_t*	tuple;
2852 	dfield_t*	dfield;
2853 	const rec_t*	rec;
2854 	const byte*	field;
2855 	ulint		len;
2856 	ulint		i;
2857 	mtr_t		mtr;
2858 	size_t		id_len;
2859 
2860 	ut_ad(mutex_own(&(dict_sys->mutex)));
2861 
2862 	id_len = strlen(foreign->id);
2863 
2864 	foreign->foreign_col_names = static_cast<const char**>(
2865 		mem_heap_alloc(foreign->heap,
2866 			       foreign->n_fields * sizeof(void*)));
2867 
2868 	foreign->referenced_col_names = static_cast<const char**>(
2869 		mem_heap_alloc(foreign->heap,
2870 			       foreign->n_fields * sizeof(void*)));
2871 
2872 	mtr_start(&mtr);
2873 
2874 	sys_foreign_cols = dict_table_get_low("SYS_FOREIGN_COLS");
2875 
2876 	sys_index = UT_LIST_GET_FIRST(sys_foreign_cols->indexes);
2877 	ut_ad(!dict_table_is_comp(sys_foreign_cols));
2878 
2879 	tuple = dtuple_create(foreign->heap, 1);
2880 	dfield = dtuple_get_nth_field(tuple, 0);
2881 
2882 	dfield_set_data(dfield, foreign->id, id_len);
2883 	dict_index_copy_types(tuple, sys_index, 1);
2884 
2885 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2886 				  BTR_SEARCH_LEAF, &pcur, &mtr);
2887 	for (i = 0; i < foreign->n_fields; i++) {
2888 
2889 		rec = btr_pcur_get_rec(&pcur);
2890 
2891 		ut_a(btr_pcur_is_on_user_rec(&pcur));
2892 		ut_a(!rec_get_deleted_flag(rec, 0));
2893 
2894 		field = rec_get_nth_field_old(
2895 			rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
2896 
2897 		if (len != id_len || ut_memcmp(foreign->id, field, len) != 0) {
2898 			const rec_t*	pos;
2899 			ulint		pos_len;
2900 			const rec_t*	for_col_name;
2901 			ulint		for_col_name_len;
2902 			const rec_t*	ref_col_name;
2903 			ulint		ref_col_name_len;
2904 
2905 			pos = rec_get_nth_field_old(
2906 				rec, DICT_FLD__SYS_FOREIGN_COLS__POS,
2907 				&pos_len);
2908 
2909 			for_col_name = rec_get_nth_field_old(
2910 				rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME,
2911 				&for_col_name_len);
2912 
2913 			ref_col_name = rec_get_nth_field_old(
2914 				rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME,
2915 				&ref_col_name_len);
2916 
2917 			ib_logf(IB_LOG_LEVEL_ERROR,
2918 				"Unable to load columns names for foreign "
2919 				"key '%s' because it was not found in "
2920 				"InnoDB internal table SYS_FOREIGN_COLS. The "
2921 				"closest entry we found is: "
2922 				"(ID='%.*s', POS=%lu, FOR_COL_NAME='%.*s', "
2923 				"REF_COL_NAME='%.*s')",
2924 				foreign->id,
2925 				(int) len, field,
2926 				mach_read_from_4(pos),
2927 				(int) for_col_name_len, for_col_name,
2928 				(int) ref_col_name_len, ref_col_name);
2929 
2930 			ut_error;
2931 		}
2932 
2933 		field = rec_get_nth_field_old(
2934 			rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
2935 		ut_a(len == 4);
2936 		ut_a(i == mach_read_from_4(field));
2937 
2938 		field = rec_get_nth_field_old(
2939 			rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
2940 		foreign->foreign_col_names[i] = mem_heap_strdupl(
2941 			foreign->heap, (char*) field, len);
2942 
2943 		field = rec_get_nth_field_old(
2944 			rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
2945 		foreign->referenced_col_names[i] = mem_heap_strdupl(
2946 			foreign->heap, (char*) field, len);
2947 
2948 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2949 	}
2950 
2951 	btr_pcur_close(&pcur);
2952 	mtr_commit(&mtr);
2953 }
2954 
2955 /***********************************************************************//**
2956 Loads a foreign key constraint to the dictionary cache.
2957 @return	DB_SUCCESS or error code */
2958 static MY_ATTRIBUTE((nonnull(1), warn_unused_result))
2959 dberr_t
dict_load_foreign(const char * id,const char ** col_names,bool check_recursive,bool check_charsets,dict_err_ignore_t ignore_err)2960 dict_load_foreign(
2961 /*==============*/
2962 	const char*		id,
2963 				/*!< in: foreign constraint id, must be
2964 				'\0'-terminated */
2965 	const char**		col_names,
2966 				/*!< in: column names, or NULL
2967 				to use foreign->foreign_table->col_names */
2968 	bool			check_recursive,
2969 				/*!< in: whether to record the foreign table
2970 				parent count to avoid unlimited recursive
2971 				load of chained foreign tables */
2972 	bool			check_charsets,
2973 				/*!< in: whether to check charset
2974 				compatibility */
2975 	dict_err_ignore_t	ignore_err)
2976 				/*!< in: error to be ignored */
2977 {
2978 	dict_foreign_t*	foreign;
2979 	dict_table_t*	sys_foreign;
2980 	btr_pcur_t	pcur;
2981 	dict_index_t*	sys_index;
2982 	dtuple_t*	tuple;
2983 	mem_heap_t*	heap2;
2984 	dfield_t*	dfield;
2985 	const rec_t*	rec;
2986 	const byte*	field;
2987 	ulint		len;
2988 	ulint		n_fields_and_type;
2989 	mtr_t		mtr;
2990 	dict_table_t*	for_table;
2991 	dict_table_t*	ref_table;
2992 	size_t		id_len;
2993 
2994 	ut_ad(mutex_own(&(dict_sys->mutex)));
2995 
2996 	id_len = strlen(id);
2997 
2998 	heap2 = mem_heap_create(1000);
2999 
3000 	mtr_start(&mtr);
3001 
3002 	sys_foreign = dict_table_get_low("SYS_FOREIGN");
3003 
3004 	sys_index = UT_LIST_GET_FIRST(sys_foreign->indexes);
3005 	ut_ad(!dict_table_is_comp(sys_foreign));
3006 
3007 	tuple = dtuple_create(heap2, 1);
3008 	dfield = dtuple_get_nth_field(tuple, 0);
3009 
3010 	dfield_set_data(dfield, id, id_len);
3011 	dict_index_copy_types(tuple, sys_index, 1);
3012 
3013 	btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3014 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3015 	rec = btr_pcur_get_rec(&pcur);
3016 
3017 	if (!btr_pcur_is_on_user_rec(&pcur)
3018 	    || rec_get_deleted_flag(rec, 0)) {
3019 		/* Not found */
3020 
3021 		fprintf(stderr,
3022 			"InnoDB: Error: cannot load foreign constraint "
3023 			"%s: could not find the relevant record in "
3024 			"SYS_FOREIGN\n", id);
3025 
3026 		btr_pcur_close(&pcur);
3027 		mtr_commit(&mtr);
3028 		mem_heap_free(heap2);
3029 
3030 		return(DB_ERROR);
3031 	}
3032 
3033 	field = rec_get_nth_field_old(rec, DICT_FLD__SYS_FOREIGN__ID, &len);
3034 
3035 	/* Check if the id in record is the searched one */
3036 	if (len != id_len || ut_memcmp(id, field, len) != 0) {
3037 
3038 		fprintf(stderr,
3039 			"InnoDB: Error: cannot load foreign constraint "
3040 			"%s: found %.*s instead in SYS_FOREIGN\n",
3041 			id, (int) len, field);
3042 
3043 		btr_pcur_close(&pcur);
3044 		mtr_commit(&mtr);
3045 		mem_heap_free(heap2);
3046 
3047 		return(DB_ERROR);
3048 	}
3049 
3050 	/* Read the table names and the number of columns associated
3051 	with the constraint */
3052 
3053 	mem_heap_free(heap2);
3054 
3055 	foreign = dict_mem_foreign_create();
3056 
3057 	n_fields_and_type = mach_read_from_4(
3058 		rec_get_nth_field_old(
3059 			rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len));
3060 
3061 	ut_a(len == 4);
3062 
3063 	/* We store the type in the bits 24..29 of n_fields_and_type. */
3064 
3065 	foreign->type = (unsigned int) (n_fields_and_type >> 24);
3066 	foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL);
3067 
3068 	foreign->id = mem_heap_strdupl(foreign->heap, id, id_len);
3069 
3070 	field = rec_get_nth_field_old(
3071 		rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
3072 
3073 	foreign->foreign_table_name = mem_heap_strdupl(
3074 		foreign->heap, (char*) field, len);
3075 	dict_mem_foreign_table_name_lookup_set(foreign, TRUE);
3076 
3077 	field = rec_get_nth_field_old(
3078 		rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
3079 	foreign->referenced_table_name = mem_heap_strdupl(
3080 		foreign->heap, (char*) field, len);
3081 	dict_mem_referenced_table_name_lookup_set(foreign, TRUE);
3082 
3083 	btr_pcur_close(&pcur);
3084 	mtr_commit(&mtr);
3085 
3086 	dict_load_foreign_cols(foreign);
3087 
3088 	ref_table = dict_table_check_if_in_cache_low(
3089 			foreign->referenced_table_name_lookup);
3090 
3091 	/* We could possibly wind up in a deep recursive calls if
3092 	we call dict_table_get_low() again here if there
3093 	is a chain of tables concatenated together with
3094 	foreign constraints. In such case, each table is
3095 	both a parent and child of the other tables, and
3096 	act as a "link" in such table chains.
3097 	To avoid such scenario, we would need to check the
3098 	number of ancesters the current table has. If that
3099 	exceeds DICT_FK_MAX_CHAIN_LEN, we will stop loading
3100 	the child table.
3101 	Foreign constraints are loaded in a Breath First fashion,
3102 	that is, the index on FOR_NAME is scanned first, and then
3103 	index on REF_NAME. So foreign constrains in which
3104 	current table is a child (foreign table) are loaded first,
3105 	and then those constraints where current table is a
3106 	parent (referenced) table.
3107 	Thus we could check the parent (ref_table) table's
3108 	reference count (fk_max_recusive_level) to know how deep the
3109 	recursive call is. If the parent table (ref_table) is already
3110 	loaded, and its fk_max_recusive_level is larger than
3111 	DICT_FK_MAX_CHAIN_LEN, we will stop the recursive loading
3112 	by skipping loading the child table. It will not affect foreign
3113 	constraint check for DMLs since child table will be loaded
3114 	at that time for the constraint check. */
3115 	if (!ref_table
3116 	    || ref_table->fk_max_recusive_level < DICT_FK_MAX_RECURSIVE_LOAD) {
3117 
3118 		/* If the foreign table is not yet in the dictionary cache, we
3119 		have to load it so that we are able to make type comparisons
3120 		in the next function call. */
3121 
3122 		for_table = dict_table_get_low(foreign->foreign_table_name_lookup);
3123 
3124 		if (for_table && ref_table && check_recursive) {
3125 			/* This is to record the longest chain of ancesters
3126 			this table has, if the parent has more ancesters
3127 			than this table has, record it after add 1 (for this
3128 			parent */
3129 			if (ref_table->fk_max_recusive_level
3130 			    >= for_table->fk_max_recusive_level) {
3131 				for_table->fk_max_recusive_level =
3132 					 ref_table->fk_max_recusive_level + 1;
3133 			}
3134 		}
3135 	}
3136 
3137 	/* Note that there may already be a foreign constraint object in
3138 	the dictionary cache for this constraint: then the following
3139 	call only sets the pointers in it to point to the appropriate table
3140 	and index objects and frees the newly created object foreign.
3141 	Adding to the cache should always succeed since we are not creating
3142 	a new foreign key constraint but loading one from the data
3143 	dictionary. */
3144 
3145 	return(dict_foreign_add_to_cache(foreign, col_names, check_charsets,
3146 					 ignore_err));
3147 }
3148 
3149 /***********************************************************************//**
3150 Loads foreign key constraints where the table is either the foreign key
3151 holder or where the table is referenced by a foreign key. Adds these
3152 constraints to the data dictionary. Note that we know that the dictionary
3153 cache already contains all constraints where the other relevant table is
3154 already in the dictionary cache.
3155 @return	DB_SUCCESS or error code */
3156 UNIV_INTERN
3157 dberr_t
dict_load_foreigns(const char * table_name,const char ** col_names,bool check_recursive,bool check_charsets,dict_err_ignore_t ignore_err)3158 dict_load_foreigns(
3159 /*===============*/
3160 	const char*		table_name,	/*!< in: table name */
3161 	const char**		col_names,	/*!< in: column names, or NULL
3162 						to use table->col_names */
3163 	bool			check_recursive,/*!< in: Whether to check
3164 						recursive load of tables
3165 						chained by FK */
3166 	bool			check_charsets,	/*!< in: whether to check
3167 						charset compatibility */
3168 	dict_err_ignore_t	ignore_err)	/*!< in: error to be ignored */
3169 {
3170 	ulint		tuple_buf[(DTUPLE_EST_ALLOC(1) + sizeof(ulint) - 1)
3171 				/ sizeof(ulint)];
3172 	btr_pcur_t	pcur;
3173 	dtuple_t*	tuple;
3174 	dfield_t*	dfield;
3175 	dict_index_t*	sec_index;
3176 	dict_table_t*	sys_foreign;
3177 	const rec_t*	rec;
3178 	const byte*	field;
3179 	ulint		len;
3180 	dberr_t		err;
3181 	mtr_t		mtr;
3182 
3183 	ut_ad(mutex_own(&(dict_sys->mutex)));
3184 
3185 	sys_foreign = dict_table_get_low("SYS_FOREIGN");
3186 
3187 	if (sys_foreign == NULL) {
3188 		/* No foreign keys defined yet in this database */
3189 
3190 		fprintf(stderr,
3191 			"InnoDB: Error: no foreign key system tables"
3192 			" in the database\n");
3193 
3194 		return(DB_ERROR);
3195 	}
3196 
3197 	ut_ad(!dict_table_is_comp(sys_foreign));
3198 	mtr_start(&mtr);
3199 
3200 	/* Get the secondary index based on FOR_NAME from table
3201 	SYS_FOREIGN */
3202 
3203 	sec_index = dict_table_get_next_index(
3204 		dict_table_get_first_index(sys_foreign));
3205 	ut_ad(!dict_index_is_clust(sec_index));
3206 start_load:
3207 
3208 	tuple = dtuple_create_from_mem(tuple_buf, sizeof(tuple_buf), 1);
3209 	dfield = dtuple_get_nth_field(tuple, 0);
3210 
3211 	dfield_set_data(dfield, table_name, ut_strlen(table_name));
3212 	dict_index_copy_types(tuple, sec_index, 1);
3213 
3214 	btr_pcur_open_on_user_rec(sec_index, tuple, PAGE_CUR_GE,
3215 				  BTR_SEARCH_LEAF, &pcur, &mtr);
3216 loop:
3217 	rec = btr_pcur_get_rec(&pcur);
3218 
3219 	if (!btr_pcur_is_on_user_rec(&pcur)) {
3220 		/* End of index */
3221 
3222 		goto load_next_index;
3223 	}
3224 
3225 	/* Now we have the record in the secondary index containing a table
3226 	name and a foreign constraint ID */
3227 
3228 	field = rec_get_nth_field_old(
3229 		rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__NAME, &len);
3230 
3231 	/* Check if the table name in the record is the one searched for; the
3232 	following call does the comparison in the latin1_swedish_ci
3233 	charset-collation, in a case-insensitive way. */
3234 
3235 	if (0 != cmp_data_data(dfield_get_type(dfield)->mtype,
3236 			       dfield_get_type(dfield)->prtype,
3237 			       static_cast<const byte*>(
3238 				       dfield_get_data(dfield)),
3239 			       dfield_get_len(dfield),
3240 			       field, len)) {
3241 
3242 		goto load_next_index;
3243 	}
3244 
3245 	/* Since table names in SYS_FOREIGN are stored in a case-insensitive
3246 	order, we have to check that the table name matches also in a binary
3247 	string comparison. On Unix, MySQL allows table names that only differ
3248 	in character case.  If lower_case_table_names=2 then what is stored
3249 	may not be the same case, but the previous comparison showed that they
3250 	match with no-case.  */
3251 
3252 	if (rec_get_deleted_flag(rec, 0)) {
3253 		goto next_rec;
3254 	}
3255 
3256 	if ((innobase_get_lower_case_table_names() != 2)
3257 	    && (0 != ut_memcmp(field, table_name, len))) {
3258 		goto next_rec;
3259 	}
3260 
3261 	/* Now we get a foreign key constraint id */
3262 	field = rec_get_nth_field_old(
3263 		rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__ID, &len);
3264 
3265 	/* Copy the string because the page may be modified or evicted
3266 	after mtr_commit() below. */
3267 	char	fk_id[MAX_TABLE_NAME_LEN + 1];
3268 
3269 	ut_a(len <= MAX_TABLE_NAME_LEN);
3270 	memcpy(fk_id, field, len);
3271 	fk_id[len] = '\0';
3272 
3273 	btr_pcur_store_position(&pcur, &mtr);
3274 
3275 	mtr_commit(&mtr);
3276 
3277 	/* Load the foreign constraint definition to the dictionary cache */
3278 
3279 	err = dict_load_foreign(fk_id, col_names,
3280 				check_recursive, check_charsets, ignore_err);
3281 
3282 	if (err != DB_SUCCESS) {
3283 		btr_pcur_close(&pcur);
3284 
3285 		return(err);
3286 	}
3287 
3288 	mtr_start(&mtr);
3289 
3290 	btr_pcur_restore_position(BTR_SEARCH_LEAF, &pcur, &mtr);
3291 next_rec:
3292 	btr_pcur_move_to_next_user_rec(&pcur, &mtr);
3293 
3294 	goto loop;
3295 
3296 load_next_index:
3297 	btr_pcur_close(&pcur);
3298 	mtr_commit(&mtr);
3299 
3300 	sec_index = dict_table_get_next_index(sec_index);
3301 
3302 	if (sec_index != NULL) {
3303 
3304 		mtr_start(&mtr);
3305 
3306 		/* Switch to scan index on REF_NAME, fk_max_recusive_level
3307 		already been updated when scanning FOR_NAME index, no need to
3308 		update again */
3309 		check_recursive = FALSE;
3310 
3311 		goto start_load;
3312 	}
3313 
3314 	return(DB_SUCCESS);
3315 }
3316