1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file dict/dict0crea.cc
29 Database object creation
30 
31 Created 1/8/1996 Heikki Tuuri
32 *******************************************************/
33 
34 #include "dict0crea.h"
35 
36 #ifdef UNIV_NONINL
37 #include "dict0crea.ic"
38 #endif
39 
40 #include "btr0pcur.h"
41 #include "btr0btr.h"
42 #include "page0page.h"
43 #include "mach0data.h"
44 #include "dict0boot.h"
45 #include "dict0dict.h"
46 #include "que0que.h"
47 #include "row0ins.h"
48 #include "row0mysql.h"
49 #include "row0sel.h"
50 #include "pars0pars.h"
51 #include "trx0roll.h"
52 #include "usr0sess.h"
53 #include "ut0vec.h"
54 #include "dict0priv.h"
55 #include "fts0priv.h"
56 #include "ha_prototypes.h"
57 
58 /*****************************************************************//**
59 Based on a table object, this function builds the entry to be inserted
60 in the SYS_TABLES system table.
61 @return	the tuple which should be inserted */
62 static
63 dtuple_t*
dict_create_sys_tables_tuple(const dict_table_t * table,mem_heap_t * heap)64 dict_create_sys_tables_tuple(
65 /*=========================*/
66 	const dict_table_t*	table,	/*!< in: table */
67 	mem_heap_t*		heap)	/*!< in: memory heap from
68 					which the memory for the built
69 					tuple is allocated */
70 {
71 	dict_table_t*	sys_tables;
72 	dtuple_t*	entry;
73 	dfield_t*	dfield;
74 	byte*		ptr;
75 	ulint		type;
76 
77 	ut_ad(table);
78 	ut_ad(heap);
79 
80 	sys_tables = dict_sys->sys_tables;
81 
82 	entry = dtuple_create(heap, 8 + DATA_N_SYS_COLS);
83 
84 	dict_table_copy_types(entry, sys_tables);
85 
86 	/* 0: NAME -----------------------------*/
87 	dfield = dtuple_get_nth_field(
88 		entry, DICT_COL__SYS_TABLES__NAME);
89 
90 	dfield_set_data(dfield, table->name, ut_strlen(table->name));
91 
92 	/* 1: DB_TRX_ID added later */
93 	/* 2: DB_ROLL_PTR added later */
94 	/* 3: ID -------------------------------*/
95 	dfield = dtuple_get_nth_field(
96 		entry, DICT_COL__SYS_TABLES__ID);
97 
98 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 8));
99 	mach_write_to_8(ptr, table->id);
100 
101 	dfield_set_data(dfield, ptr, 8);
102 
103 	/* 4: N_COLS ---------------------------*/
104 	dfield = dtuple_get_nth_field(
105 		entry, DICT_COL__SYS_TABLES__N_COLS);
106 
107 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
108 	mach_write_to_4(ptr, table->n_def
109 			| ((table->flags & DICT_TF_COMPACT) << 31));
110 	dfield_set_data(dfield, ptr, 4);
111 
112 	/* 5: TYPE (table flags) -----------------------------*/
113 	dfield = dtuple_get_nth_field(
114 		entry, DICT_COL__SYS_TABLES__TYPE);
115 
116 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
117 
118 	/* Validate the table flags and convert them to what is saved in
119 	SYS_TABLES.TYPE.  Table flag values 0 and 1 are both written to
120 	SYS_TABLES.TYPE as 1. */
121 	type = dict_tf_to_sys_tables_type(table->flags);
122 	mach_write_to_4(ptr, type);
123 
124 	dfield_set_data(dfield, ptr, 4);
125 
126 	/* 6: MIX_ID (obsolete) ---------------------------*/
127 	dfield = dtuple_get_nth_field(
128 		entry, DICT_COL__SYS_TABLES__MIX_ID);
129 
130 	ptr = static_cast<byte*>(mem_heap_zalloc(heap, 8));
131 
132 	dfield_set_data(dfield, ptr, 8);
133 
134 	/* 7: MIX_LEN (additional flags) --------------------------*/
135 	dfield = dtuple_get_nth_field(
136 		entry, DICT_COL__SYS_TABLES__MIX_LEN);
137 
138 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
139 	/* Be sure all non-used bits are zero. */
140 	ut_a(!(table->flags2 & ~DICT_TF2_BIT_MASK));
141 	mach_write_to_4(ptr, table->flags2);
142 
143 	dfield_set_data(dfield, ptr, 4);
144 
145 	/* 8: CLUSTER_NAME ---------------------*/
146 	dfield = dtuple_get_nth_field(
147 		entry, DICT_COL__SYS_TABLES__CLUSTER_ID);
148 	dfield_set_null(dfield); /* not supported */
149 
150 	/* 9: SPACE ----------------------------*/
151 	dfield = dtuple_get_nth_field(
152 		entry, DICT_COL__SYS_TABLES__SPACE);
153 
154 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
155 	mach_write_to_4(ptr, table->space);
156 
157 	dfield_set_data(dfield, ptr, 4);
158 	/*----------------------------------*/
159 
160 	return(entry);
161 }
162 
163 /*****************************************************************//**
164 Based on a table object, this function builds the entry to be inserted
165 in the SYS_COLUMNS system table.
166 @return	the tuple which should be inserted */
167 static
168 dtuple_t*
dict_create_sys_columns_tuple(const dict_table_t * table,ulint i,mem_heap_t * heap)169 dict_create_sys_columns_tuple(
170 /*==========================*/
171 	const dict_table_t*	table,	/*!< in: table */
172 	ulint			i,	/*!< in: column number */
173 	mem_heap_t*		heap)	/*!< in: memory heap from
174 					which the memory for the built
175 					tuple is allocated */
176 {
177 	dict_table_t*		sys_columns;
178 	dtuple_t*		entry;
179 	const dict_col_t*	column;
180 	dfield_t*		dfield;
181 	byte*			ptr;
182 	const char*		col_name;
183 
184 	ut_ad(table);
185 	ut_ad(heap);
186 
187 	column = dict_table_get_nth_col(table, i);
188 
189 	sys_columns = dict_sys->sys_columns;
190 
191 	entry = dtuple_create(heap, 7 + DATA_N_SYS_COLS);
192 
193 	dict_table_copy_types(entry, sys_columns);
194 
195 	/* 0: TABLE_ID -----------------------*/
196 	dfield = dtuple_get_nth_field(entry, DICT_COL__SYS_COLUMNS__TABLE_ID);
197 
198 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 8));
199 	mach_write_to_8(ptr, table->id);
200 
201 	dfield_set_data(dfield, ptr, 8);
202 
203 	/* 1: POS ----------------------------*/
204 	dfield = dtuple_get_nth_field(entry, DICT_COL__SYS_COLUMNS__POS);
205 
206 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
207 	mach_write_to_4(ptr, i);
208 
209 	dfield_set_data(dfield, ptr, 4);
210 
211 	/* 2: DB_TRX_ID added later */
212 	/* 3: DB_ROLL_PTR added later */
213 	/* 4: NAME ---------------------------*/
214 	dfield = dtuple_get_nth_field(entry, DICT_COL__SYS_COLUMNS__NAME);
215 
216 	col_name = dict_table_get_col_name(table, i);
217 	dfield_set_data(dfield, col_name, ut_strlen(col_name));
218 
219 	/* 5: MTYPE --------------------------*/
220 	dfield = dtuple_get_nth_field(entry, DICT_COL__SYS_COLUMNS__MTYPE);
221 
222 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
223 	mach_write_to_4(ptr, column->mtype);
224 
225 	dfield_set_data(dfield, ptr, 4);
226 
227 	/* 6: PRTYPE -------------------------*/
228 	dfield = dtuple_get_nth_field(entry, DICT_COL__SYS_COLUMNS__PRTYPE);
229 
230 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
231 	mach_write_to_4(ptr, column->prtype);
232 
233 	dfield_set_data(dfield, ptr, 4);
234 
235 	/* 7: LEN ----------------------------*/
236 	dfield = dtuple_get_nth_field(entry, DICT_COL__SYS_COLUMNS__LEN);
237 
238 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
239 	mach_write_to_4(ptr, column->len);
240 
241 	dfield_set_data(dfield, ptr, 4);
242 
243 	/* 8: PREC ---------------------------*/
244 	dfield = dtuple_get_nth_field(entry, DICT_COL__SYS_COLUMNS__PREC);
245 
246 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
247 	mach_write_to_4(ptr, 0/* unused */);
248 
249 	dfield_set_data(dfield, ptr, 4);
250 	/*---------------------------------*/
251 
252 	return(entry);
253 }
254 
255 /***************************************************************//**
256 Builds a table definition to insert.
257 @return	DB_SUCCESS or error code */
258 static MY_ATTRIBUTE((nonnull, warn_unused_result))
259 dberr_t
dict_build_table_def_step(que_thr_t * thr,tab_node_t * node)260 dict_build_table_def_step(
261 /*======================*/
262 	que_thr_t*	thr,	/*!< in: query thread */
263 	tab_node_t*	node)	/*!< in: table create node */
264 {
265 	dict_table_t*	table;
266 	dtuple_t*	row;
267 	dberr_t		error;
268 	const char*	path;
269 	mtr_t		mtr;
270 	ulint		space = 0;
271 	bool		use_tablespace;
272 
273 	ut_ad(mutex_own(&(dict_sys->mutex)));
274 
275 	table = node->table;
276 	use_tablespace = DICT_TF2_FLAG_IS_SET(table, DICT_TF2_USE_TABLESPACE);
277 
278 	dict_hdr_get_new_id(&table->id, NULL, NULL);
279 
280 	thr_get_trx(thr)->table_id = table->id;
281 
282         /* Always set this bit for all new created tables */
283 	DICT_TF2_FLAG_SET(table, DICT_TF2_FTS_AUX_HEX_NAME);
284 	DBUG_EXECUTE_IF("innodb_test_wrong_fts_aux_table_name",
285 			DICT_TF2_FLAG_UNSET(table,
286 					    DICT_TF2_FTS_AUX_HEX_NAME););
287 
288 	if (use_tablespace) {
289 		/* This table will not use the system tablespace.
290 		Get a new space id. */
291 		dict_hdr_get_new_id(NULL, NULL, &space);
292 
293 		DBUG_EXECUTE_IF(
294 			"ib_create_table_fail_out_of_space_ids",
295 			space = ULINT_UNDEFINED;
296 		);
297 
298 		if (UNIV_UNLIKELY(space == ULINT_UNDEFINED)) {
299 			return(DB_ERROR);
300 		}
301 
302 		/* We create a new single-table tablespace for the table.
303 		We initially let it be 4 pages:
304 		- page 0 is the fsp header and an extent descriptor page,
305 		- page 1 is an ibuf bitmap page,
306 		- page 2 is the first inode page,
307 		- page 3 will contain the root of the clustered index of the
308 		table we create here. */
309 
310 		path = table->data_dir_path ? table->data_dir_path
311 					    : table->dir_path_of_temp_table;
312 
313 		ut_ad(dict_table_get_format(table) <= UNIV_FORMAT_MAX);
314 		ut_ad(!dict_table_zip_size(table)
315 		      || dict_table_get_format(table) >= UNIV_FORMAT_B);
316 
317 		error = fil_create_new_single_table_tablespace(
318 			space, table->name, path,
319 			dict_tf_to_fsp_flags(table->flags),
320 			table->flags2,
321 			FIL_IBD_FILE_INITIAL_SIZE);
322 
323 		table->space = (unsigned int) space;
324 
325 		if (error != DB_SUCCESS) {
326 
327 			return(error);
328 		}
329 
330 		mtr_start(&mtr);
331 
332 		fsp_header_init(table->space, FIL_IBD_FILE_INITIAL_SIZE, &mtr);
333 
334 		mtr_commit(&mtr);
335 	} else {
336 		/* Create in the system tablespace: disallow Barracuda
337 		features by keeping only the first bit which says whether
338 		the row format is redundant or compact */
339 		table->flags &= DICT_TF_COMPACT;
340 	}
341 
342 	row = dict_create_sys_tables_tuple(table, node->heap);
343 
344 	ins_node_set_new_row(node->tab_def, row);
345 
346 	return(DB_SUCCESS);
347 }
348 
349 /***************************************************************//**
350 Builds a column definition to insert. */
351 static
352 void
dict_build_col_def_step(tab_node_t * node)353 dict_build_col_def_step(
354 /*====================*/
355 	tab_node_t*	node)	/*!< in: table create node */
356 {
357 	dtuple_t*	row;
358 
359 	row = dict_create_sys_columns_tuple(node->table, node->col_no,
360 					    node->heap);
361 	ins_node_set_new_row(node->col_def, row);
362 }
363 
364 /*****************************************************************//**
365 Based on an index object, this function builds the entry to be inserted
366 in the SYS_INDEXES system table.
367 @return	the tuple which should be inserted */
368 static
369 dtuple_t*
dict_create_sys_indexes_tuple(const dict_index_t * index,mem_heap_t * heap)370 dict_create_sys_indexes_tuple(
371 /*==========================*/
372 	const dict_index_t*	index,	/*!< in: index */
373 	mem_heap_t*		heap)	/*!< in: memory heap from
374 					which the memory for the built
375 					tuple is allocated */
376 {
377 	dict_table_t*	sys_indexes;
378 	dict_table_t*	table;
379 	dtuple_t*	entry;
380 	dfield_t*	dfield;
381 	byte*		ptr;
382 
383 	ut_ad(mutex_own(&(dict_sys->mutex)));
384 	ut_ad(index);
385 	ut_ad(heap);
386 
387 	sys_indexes = dict_sys->sys_indexes;
388 
389 	table = dict_table_get_low(index->table_name);
390 
391 	entry = dtuple_create(heap, 7 + DATA_N_SYS_COLS);
392 
393 	dict_table_copy_types(entry, sys_indexes);
394 
395 	/* 0: TABLE_ID -----------------------*/
396 	dfield = dtuple_get_nth_field(
397 		entry, DICT_COL__SYS_INDEXES__TABLE_ID);
398 
399 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 8));
400 	mach_write_to_8(ptr, table->id);
401 
402 	dfield_set_data(dfield, ptr, 8);
403 
404 	/* 1: ID ----------------------------*/
405 	dfield = dtuple_get_nth_field(
406 		entry, DICT_COL__SYS_INDEXES__ID);
407 
408 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 8));
409 	mach_write_to_8(ptr, index->id);
410 
411 	dfield_set_data(dfield, ptr, 8);
412 
413 	/* 2: DB_TRX_ID added later */
414 	/* 3: DB_ROLL_PTR added later */
415 	/* 4: NAME --------------------------*/
416 	dfield = dtuple_get_nth_field(
417 		entry, DICT_COL__SYS_INDEXES__NAME);
418 
419 	dfield_set_data(dfield, index->name, ut_strlen(index->name));
420 
421 	/* 5: N_FIELDS ----------------------*/
422 	dfield = dtuple_get_nth_field(
423 		entry, DICT_COL__SYS_INDEXES__N_FIELDS);
424 
425 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
426 	mach_write_to_4(ptr, index->n_fields);
427 
428 	dfield_set_data(dfield, ptr, 4);
429 
430 	/* 6: TYPE --------------------------*/
431 	dfield = dtuple_get_nth_field(
432 		entry, DICT_COL__SYS_INDEXES__TYPE);
433 
434 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
435 	mach_write_to_4(ptr, index->type);
436 
437 	dfield_set_data(dfield, ptr, 4);
438 
439 	/* 7: SPACE --------------------------*/
440 
441 	dfield = dtuple_get_nth_field(
442 		entry, DICT_COL__SYS_INDEXES__SPACE);
443 
444 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
445 	mach_write_to_4(ptr, index->space);
446 
447 	dfield_set_data(dfield, ptr, 4);
448 
449 	/* 8: PAGE_NO --------------------------*/
450 
451 	dfield = dtuple_get_nth_field(
452 		entry, DICT_COL__SYS_INDEXES__PAGE_NO);
453 
454 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
455 	mach_write_to_4(ptr, FIL_NULL);
456 
457 	dfield_set_data(dfield, ptr, 4);
458 
459 	/*--------------------------------*/
460 
461 	return(entry);
462 }
463 
464 /*****************************************************************//**
465 Based on an index object, this function builds the entry to be inserted
466 in the SYS_FIELDS system table.
467 @return	the tuple which should be inserted */
468 static
469 dtuple_t*
dict_create_sys_fields_tuple(const dict_index_t * index,ulint fld_no,mem_heap_t * heap)470 dict_create_sys_fields_tuple(
471 /*=========================*/
472 	const dict_index_t*	index,	/*!< in: index */
473 	ulint			fld_no,	/*!< in: field number */
474 	mem_heap_t*		heap)	/*!< in: memory heap from
475 					which the memory for the built
476 					tuple is allocated */
477 {
478 	dict_table_t*	sys_fields;
479 	dtuple_t*	entry;
480 	dict_field_t*	field;
481 	dfield_t*	dfield;
482 	byte*		ptr;
483 	ibool		index_contains_column_prefix_field	= FALSE;
484 	ulint		j;
485 
486 	ut_ad(index);
487 	ut_ad(heap);
488 
489 	for (j = 0; j < index->n_fields; j++) {
490 		if (dict_index_get_nth_field(index, j)->prefix_len > 0) {
491 			index_contains_column_prefix_field = TRUE;
492 			break;
493 		}
494 	}
495 
496 	field = dict_index_get_nth_field(index, fld_no);
497 
498 	sys_fields = dict_sys->sys_fields;
499 
500 	entry = dtuple_create(heap, 3 + DATA_N_SYS_COLS);
501 
502 	dict_table_copy_types(entry, sys_fields);
503 
504 	/* 0: INDEX_ID -----------------------*/
505 	dfield = dtuple_get_nth_field(entry, DICT_COL__SYS_FIELDS__INDEX_ID);
506 
507 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 8));
508 	mach_write_to_8(ptr, index->id);
509 
510 	dfield_set_data(dfield, ptr, 8);
511 
512 	/* 1: POS; FIELD NUMBER & PREFIX LENGTH -----------------------*/
513 
514 	dfield = dtuple_get_nth_field(entry, DICT_COL__SYS_FIELDS__POS);
515 
516 	ptr = static_cast<byte*>(mem_heap_alloc(heap, 4));
517 
518 	if (index_contains_column_prefix_field) {
519 		/* If there are column prefix fields in the index, then
520 		we store the number of the field to the 2 HIGH bytes
521 		and the prefix length to the 2 low bytes, */
522 
523 		mach_write_to_4(ptr, (fld_no << 16) + field->prefix_len);
524 	} else {
525 		/* Else we store the number of the field to the 2 LOW bytes.
526 		This is to keep the storage format compatible with
527 		InnoDB versions < 4.0.14. */
528 
529 		mach_write_to_4(ptr, fld_no);
530 	}
531 
532 	dfield_set_data(dfield, ptr, 4);
533 
534 	/* 2: DB_TRX_ID added later */
535 	/* 3: DB_ROLL_PTR added later */
536 	/* 4: COL_NAME -------------------------*/
537 	dfield = dtuple_get_nth_field(entry, DICT_COL__SYS_FIELDS__COL_NAME);
538 
539 	dfield_set_data(dfield, field->name,
540 			ut_strlen(field->name));
541 	/*---------------------------------*/
542 
543 	return(entry);
544 }
545 
546 /*****************************************************************//**
547 Creates the tuple with which the index entry is searched for writing the index
548 tree root page number, if such a tree is created.
549 @return	the tuple for search */
550 static
551 dtuple_t*
dict_create_search_tuple(const dtuple_t * tuple,mem_heap_t * heap)552 dict_create_search_tuple(
553 /*=====================*/
554 	const dtuple_t*	tuple,	/*!< in: the tuple inserted in the SYS_INDEXES
555 				table */
556 	mem_heap_t*	heap)	/*!< in: memory heap from which the memory for
557 				the built tuple is allocated */
558 {
559 	dtuple_t*	search_tuple;
560 	const dfield_t*	field1;
561 	dfield_t*	field2;
562 
563 	ut_ad(tuple && heap);
564 
565 	search_tuple = dtuple_create(heap, 2);
566 
567 	field1 = dtuple_get_nth_field(tuple, 0);
568 	field2 = dtuple_get_nth_field(search_tuple, 0);
569 
570 	dfield_copy(field2, field1);
571 
572 	field1 = dtuple_get_nth_field(tuple, 1);
573 	field2 = dtuple_get_nth_field(search_tuple, 1);
574 
575 	dfield_copy(field2, field1);
576 
577 	ut_ad(dtuple_validate(search_tuple));
578 
579 	return(search_tuple);
580 }
581 
582 /***************************************************************//**
583 Builds an index definition row to insert.
584 @return	DB_SUCCESS or error code */
585 static MY_ATTRIBUTE((nonnull, warn_unused_result))
586 dberr_t
dict_build_index_def_step(que_thr_t * thr,ind_node_t * node)587 dict_build_index_def_step(
588 /*======================*/
589 	que_thr_t*	thr,	/*!< in: query thread */
590 	ind_node_t*	node)	/*!< in: index create node */
591 {
592 	dict_table_t*	table;
593 	dict_index_t*	index;
594 	dtuple_t*	row;
595 	trx_t*		trx;
596 
597 	ut_ad(mutex_own(&(dict_sys->mutex)));
598 
599 	trx = thr_get_trx(thr);
600 
601 	index = node->index;
602 
603 	table = dict_table_get_low(index->table_name);
604 
605 	if (table == NULL) {
606 		return(DB_TABLE_NOT_FOUND);
607 	}
608 
609 	if (!trx->table_id) {
610 		/* Record only the first table id. */
611 		trx->table_id = table->id;
612 	}
613 
614 	node->table = table;
615 
616 	ut_ad((UT_LIST_GET_LEN(table->indexes) > 0)
617 	      || dict_index_is_clust(index));
618 
619 	dict_hdr_get_new_id(NULL, &index->id, NULL);
620 
621 	/* Inherit the space id from the table; we store all indexes of a
622 	table in the same tablespace */
623 
624 	index->space = table->space;
625 	node->page_no = FIL_NULL;
626 	row = dict_create_sys_indexes_tuple(index, node->heap);
627 	node->ind_row = row;
628 
629 	ins_node_set_new_row(node->ind_def, row);
630 
631 	/* Note that the index was created by this transaction. */
632 	index->trx_id = trx->id;
633 	ut_ad(table->def_trx_id <= trx->id);
634 	table->def_trx_id = trx->id;
635 
636 	return(DB_SUCCESS);
637 }
638 
639 /***************************************************************//**
640 Builds a field definition row to insert. */
641 static
642 void
dict_build_field_def_step(ind_node_t * node)643 dict_build_field_def_step(
644 /*======================*/
645 	ind_node_t*	node)	/*!< in: index create node */
646 {
647 	dict_index_t*	index;
648 	dtuple_t*	row;
649 
650 	index = node->index;
651 
652 	row = dict_create_sys_fields_tuple(index, node->field_no, node->heap);
653 
654 	ins_node_set_new_row(node->field_def, row);
655 }
656 
657 /***************************************************************//**
658 Creates an index tree for the index if it is not a member of a cluster.
659 @return	DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
660 static MY_ATTRIBUTE((nonnull, warn_unused_result))
661 dberr_t
dict_create_index_tree_step(ind_node_t * node)662 dict_create_index_tree_step(
663 /*========================*/
664 	ind_node_t*	node)	/*!< in: index create node */
665 {
666 	dict_index_t*	index;
667 	dict_table_t*	sys_indexes;
668 	dtuple_t*	search_tuple;
669 	btr_pcur_t	pcur;
670 	mtr_t		mtr;
671 
672 	ut_ad(mutex_own(&(dict_sys->mutex)));
673 
674 	index = node->index;
675 
676 	sys_indexes = dict_sys->sys_indexes;
677 
678 	if (index->type == DICT_FTS) {
679 		/* FTS index does not need an index tree */
680 		return(DB_SUCCESS);
681 	}
682 
683 	/* Run a mini-transaction in which the index tree is allocated for
684 	the index and its root address is written to the index entry in
685 	sys_indexes */
686 
687 	mtr_start(&mtr);
688 
689 	search_tuple = dict_create_search_tuple(node->ind_row, node->heap);
690 
691 	btr_pcur_open(UT_LIST_GET_FIRST(sys_indexes->indexes),
692 		      search_tuple, PAGE_CUR_L, BTR_MODIFY_LEAF,
693 		      &pcur, &mtr);
694 
695 	btr_pcur_move_to_next_user_rec(&pcur, &mtr);
696 
697 
698 	dberr_t		err = DB_SUCCESS;
699 	ulint		zip_size = dict_table_zip_size(index->table);
700 
701 	if (node->index->table->ibd_file_missing
702 	    || dict_table_is_discarded(node->index->table)) {
703 
704 		node->page_no = FIL_NULL;
705 	} else {
706 		node->page_no = btr_create(
707 			index->type, index->space, zip_size,
708 			index->id, index, &mtr);
709 
710 		if (node->page_no == FIL_NULL) {
711 			err = DB_OUT_OF_FILE_SPACE;
712 		}
713 
714 		DBUG_EXECUTE_IF("ib_import_create_index_failure_1",
715 				node->page_no = FIL_NULL;
716 				err = DB_OUT_OF_FILE_SPACE; );
717 	}
718 
719 	page_rec_write_field(
720 		btr_pcur_get_rec(&pcur), DICT_FLD__SYS_INDEXES__PAGE_NO,
721 		node->page_no, &mtr);
722 
723 	btr_pcur_close(&pcur);
724 
725 	mtr_commit(&mtr);
726 
727 	return(err);
728 }
729 
730 /*******************************************************************//**
731 Drops the index tree associated with a row in SYS_INDEXES table. */
732 UNIV_INTERN
733 void
dict_drop_index_tree(rec_t * rec,mtr_t * mtr)734 dict_drop_index_tree(
735 /*=================*/
736 	rec_t*	rec,	/*!< in/out: record in the clustered index
737 			of SYS_INDEXES table */
738 	mtr_t*	mtr)	/*!< in: mtr having the latch on the record page */
739 {
740 	ulint		root_page_no;
741 	ulint		space;
742 	ulint		zip_size;
743 	const byte*	ptr;
744 	ulint		len;
745 
746 	ut_ad(mutex_own(&(dict_sys->mutex)));
747 	ut_a(!dict_table_is_comp(dict_sys->sys_indexes));
748 	ptr = rec_get_nth_field_old(
749 		rec, DICT_FLD__SYS_INDEXES__PAGE_NO, &len);
750 
751 	ut_ad(len == 4);
752 
753 	root_page_no = mtr_read_ulint(ptr, MLOG_4BYTES, mtr);
754 
755 	if (root_page_no == FIL_NULL) {
756 		/* The tree has already been freed */
757 
758 		return;
759 	}
760 
761 	ptr = rec_get_nth_field_old(
762 		rec, DICT_FLD__SYS_INDEXES__SPACE, &len);
763 
764 	ut_ad(len == 4);
765 
766 	space = mtr_read_ulint(ptr, MLOG_4BYTES, mtr);
767 	zip_size = fil_space_get_zip_size(space);
768 
769 	if (UNIV_UNLIKELY(zip_size == ULINT_UNDEFINED)) {
770 		/* It is a single table tablespace and the .ibd file is
771 		missing: do nothing */
772 
773 		return;
774 	}
775 
776 	/* We free all the pages but the root page first; this operation
777 	may span several mini-transactions */
778 
779 	btr_free_but_not_root(space, zip_size, root_page_no);
780 
781 	/* Then we free the root page in the same mini-transaction where
782 	we write FIL_NULL to the appropriate field in the SYS_INDEXES
783 	record: this mini-transaction marks the B-tree totally freed */
784 
785 	/* printf("Dropping index tree in space %lu root page %lu\n", space,
786 	root_page_no); */
787 	btr_free_root(space, zip_size, root_page_no, mtr);
788 
789 	page_rec_write_field(rec, DICT_FLD__SYS_INDEXES__PAGE_NO,
790 			     FIL_NULL, mtr);
791 }
792 
793 /*******************************************************************//**
794 Truncates the index tree associated with a row in SYS_INDEXES table.
795 @return	new root page number, or FIL_NULL on failure */
796 UNIV_INTERN
797 ulint
dict_truncate_index_tree(dict_table_t * table,ulint space,btr_pcur_t * pcur,mtr_t * mtr)798 dict_truncate_index_tree(
799 /*=====================*/
800 	dict_table_t*	table,	/*!< in: the table the index belongs to */
801 	ulint		space,	/*!< in: 0=truncate,
802 				nonzero=create the index tree in the
803 				given tablespace */
804 	btr_pcur_t*	pcur,	/*!< in/out: persistent cursor pointing to
805 				record in the clustered index of
806 				SYS_INDEXES table. The cursor may be
807 				repositioned in this call. */
808 	mtr_t*		mtr)	/*!< in: mtr having the latch
809 				on the record page. The mtr may be
810 				committed and restarted in this call. */
811 {
812 	ulint		root_page_no;
813 	ibool		drop = !space;
814 	ulint		zip_size;
815 	ulint		type;
816 	index_id_t	index_id;
817 	rec_t*		rec;
818 	const byte*	ptr;
819 	ulint		len;
820 	dict_index_t*	index;
821 	bool		has_been_dropped = false;
822 
823 	ut_ad(mutex_own(&(dict_sys->mutex)));
824 	ut_a(!dict_table_is_comp(dict_sys->sys_indexes));
825 	rec = btr_pcur_get_rec(pcur);
826 	ptr = rec_get_nth_field_old(
827 		rec, DICT_FLD__SYS_INDEXES__PAGE_NO, &len);
828 
829 	ut_ad(len == 4);
830 
831 	root_page_no = mtr_read_ulint(ptr, MLOG_4BYTES, mtr);
832 
833 	if (drop && root_page_no == FIL_NULL) {
834 		has_been_dropped = true;
835 		drop = FALSE;
836 	}
837 
838 	ptr = rec_get_nth_field_old(
839 		rec, DICT_FLD__SYS_INDEXES__SPACE, &len);
840 
841 	ut_ad(len == 4);
842 
843 	if (drop) {
844 		space = mtr_read_ulint(ptr, MLOG_4BYTES, mtr);
845 	}
846 
847 	zip_size = fil_space_get_zip_size(space);
848 
849 	if (UNIV_UNLIKELY(zip_size == ULINT_UNDEFINED)) {
850 		/* It is a single table tablespace and the .ibd file is
851 		missing: do nothing */
852 
853 		ut_print_timestamp(stderr);
854 		fprintf(stderr, "  InnoDB: Trying to TRUNCATE"
855 			" a missing .ibd file of table %s!\n", table->name);
856 		return(FIL_NULL);
857 	}
858 
859 	ptr = rec_get_nth_field_old(
860 		rec, DICT_FLD__SYS_INDEXES__TYPE, &len);
861 	ut_ad(len == 4);
862 	type = mach_read_from_4(ptr);
863 
864 	ptr = rec_get_nth_field_old(rec, DICT_FLD__SYS_INDEXES__ID, &len);
865 	ut_ad(len == 8);
866 	index_id = mach_read_from_8(ptr);
867 
868 	if (!drop) {
869 
870 		goto create;
871 	}
872 
873 	/* We free all the pages but the root page first; this operation
874 	may span several mini-transactions */
875 
876 	btr_free_but_not_root(space, zip_size, root_page_no);
877 
878 	/* Then we free the root page in the same mini-transaction where
879 	we create the b-tree and write its new root page number to the
880 	appropriate field in the SYS_INDEXES record: this mini-transaction
881 	marks the B-tree totally truncated */
882 
883 	btr_block_get(space, zip_size, root_page_no, RW_X_LATCH, NULL, mtr);
884 
885 	btr_free_root(space, zip_size, root_page_no, mtr);
886 create:
887 	/* We will temporarily write FIL_NULL to the PAGE_NO field
888 	in SYS_INDEXES, so that the database will not get into an
889 	inconsistent state in case it crashes between the mtr_commit()
890 	below and the following mtr_commit() call. */
891 	page_rec_write_field(rec, DICT_FLD__SYS_INDEXES__PAGE_NO,
892 			     FIL_NULL, mtr);
893 
894 	/* We will need to commit the mini-transaction in order to avoid
895 	deadlocks in the btr_create() call, because otherwise we would
896 	be freeing and allocating pages in the same mini-transaction. */
897 	btr_pcur_store_position(pcur, mtr);
898 	mtr_commit(mtr);
899 
900 	mtr_start(mtr);
901 	btr_pcur_restore_position(BTR_MODIFY_LEAF, pcur, mtr);
902 
903 	/* Find the index corresponding to this SYS_INDEXES record. */
904 	for (index = UT_LIST_GET_FIRST(table->indexes);
905 	     index;
906 	     index = UT_LIST_GET_NEXT(indexes, index)) {
907 		if (index->id == index_id) {
908 			if (index->type & DICT_FTS) {
909 				return(FIL_NULL);
910 			} else {
911 				if (has_been_dropped) {
912 					fprintf(stderr,	"  InnoDB: Trying to"
913 						" TRUNCATE a missing index of"
914 						" table %s!\n",
915 						index->table->name);
916 				}
917 
918 				root_page_no = btr_create(type, space, zip_size,
919 							  index_id, index, mtr);
920 				index->page = (unsigned int) root_page_no;
921 				return(root_page_no);
922 			}
923 		}
924 	}
925 
926 	ut_print_timestamp(stderr);
927 	fprintf(stderr,
928 		"  InnoDB: Index %llu of table %s is missing\n"
929 		"InnoDB: from the data dictionary during TRUNCATE!\n",
930 		(ullint) index_id,
931 		table->name);
932 
933 	return(FIL_NULL);
934 }
935 
936 /*********************************************************************//**
937 Creates a table create graph.
938 @return	own: table create node */
939 UNIV_INTERN
940 tab_node_t*
tab_create_graph_create(dict_table_t * table,mem_heap_t * heap,bool commit)941 tab_create_graph_create(
942 /*====================*/
943 	dict_table_t*	table,	/*!< in: table to create, built as a memory data
944 				structure */
945 	mem_heap_t*	heap,	/*!< in: heap where created */
946 	bool		commit)	/*!< in: true if the commit node should be
947 				added to the query graph */
948 {
949 	tab_node_t*	node;
950 
951 	node = static_cast<tab_node_t*>(
952 		mem_heap_alloc(heap, sizeof(tab_node_t)));
953 
954 	node->common.type = QUE_NODE_CREATE_TABLE;
955 
956 	node->table = table;
957 
958 	node->state = TABLE_BUILD_TABLE_DEF;
959 	node->heap = mem_heap_create(256);
960 
961 	node->tab_def = ins_node_create(INS_DIRECT, dict_sys->sys_tables,
962 					heap);
963 	node->tab_def->common.parent = node;
964 
965 	node->col_def = ins_node_create(INS_DIRECT, dict_sys->sys_columns,
966 					heap);
967 	node->col_def->common.parent = node;
968 
969 	if (commit) {
970 		node->commit_node = trx_commit_node_create(heap);
971 		node->commit_node->common.parent = node;
972 	} else {
973 		node->commit_node = 0;
974 	}
975 
976 	return(node);
977 }
978 
979 /*********************************************************************//**
980 Creates an index create graph.
981 @return	own: index create node */
982 UNIV_INTERN
983 ind_node_t*
ind_create_graph_create(dict_index_t * index,mem_heap_t * heap,bool commit)984 ind_create_graph_create(
985 /*====================*/
986 	dict_index_t*	index,	/*!< in: index to create, built as a memory data
987 				structure */
988 	mem_heap_t*	heap,	/*!< in: heap where created */
989 	bool		commit)	/*!< in: true if the commit node should be
990 				added to the query graph */
991 {
992 	ind_node_t*	node;
993 
994 	node = static_cast<ind_node_t*>(
995 		mem_heap_alloc(heap, sizeof(ind_node_t)));
996 
997 	node->common.type = QUE_NODE_CREATE_INDEX;
998 
999 	node->index = index;
1000 
1001 	node->state = INDEX_BUILD_INDEX_DEF;
1002 	node->page_no = FIL_NULL;
1003 	node->heap = mem_heap_create(256);
1004 
1005 	node->ind_def = ins_node_create(INS_DIRECT,
1006 					dict_sys->sys_indexes, heap);
1007 	node->ind_def->common.parent = node;
1008 
1009 	node->field_def = ins_node_create(INS_DIRECT,
1010 					  dict_sys->sys_fields, heap);
1011 	node->field_def->common.parent = node;
1012 
1013 	if (commit) {
1014 		node->commit_node = trx_commit_node_create(heap);
1015 		node->commit_node->common.parent = node;
1016 	} else {
1017 		node->commit_node = 0;
1018 	}
1019 
1020 	return(node);
1021 }
1022 
1023 /***********************************************************//**
1024 Creates a table. This is a high-level function used in SQL execution graphs.
1025 @return	query thread to run next or NULL */
1026 UNIV_INTERN
1027 que_thr_t*
dict_create_table_step(que_thr_t * thr)1028 dict_create_table_step(
1029 /*===================*/
1030 	que_thr_t*	thr)	/*!< in: query thread */
1031 {
1032 	tab_node_t*	node;
1033 	dberr_t		err	= DB_ERROR;
1034 	trx_t*		trx;
1035 
1036 	ut_ad(thr);
1037 	ut_ad(mutex_own(&(dict_sys->mutex)));
1038 
1039 	trx = thr_get_trx(thr);
1040 
1041 	node = static_cast<tab_node_t*>(thr->run_node);
1042 
1043 	ut_ad(que_node_get_type(node) == QUE_NODE_CREATE_TABLE);
1044 
1045 	if (thr->prev_node == que_node_get_parent(node)) {
1046 		node->state = TABLE_BUILD_TABLE_DEF;
1047 	}
1048 
1049 	if (node->state == TABLE_BUILD_TABLE_DEF) {
1050 
1051 		/* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
1052 
1053 		err = dict_build_table_def_step(thr, node);
1054 
1055 		if (err != DB_SUCCESS) {
1056 
1057 			goto function_exit;
1058 		}
1059 
1060 		node->state = TABLE_BUILD_COL_DEF;
1061 		node->col_no = 0;
1062 
1063 		thr->run_node = node->tab_def;
1064 
1065 		return(thr);
1066 	}
1067 
1068 	if (node->state == TABLE_BUILD_COL_DEF) {
1069 
1070 		if (node->col_no < (node->table)->n_def) {
1071 
1072 			dict_build_col_def_step(node);
1073 
1074 			node->col_no++;
1075 
1076 			thr->run_node = node->col_def;
1077 
1078 			return(thr);
1079 		} else {
1080 			node->state = TABLE_COMMIT_WORK;
1081 		}
1082 	}
1083 
1084 	if (node->state == TABLE_COMMIT_WORK) {
1085 
1086 		/* Table was correctly defined: do NOT commit the transaction
1087 		(CREATE TABLE does NOT do an implicit commit of the current
1088 		transaction) */
1089 
1090 		node->state = TABLE_ADD_TO_CACHE;
1091 
1092 		/* thr->run_node = node->commit_node;
1093 
1094 		return(thr); */
1095 	}
1096 
1097 	if (node->state == TABLE_ADD_TO_CACHE) {
1098 
1099 		dict_table_add_to_cache(node->table, TRUE, node->heap);
1100 
1101 		err = DB_SUCCESS;
1102 	}
1103 
1104 function_exit:
1105 	trx->error_state = err;
1106 
1107 	if (err == DB_SUCCESS) {
1108 		/* Ok: do nothing */
1109 
1110 	} else if (err == DB_LOCK_WAIT) {
1111 
1112 		return(NULL);
1113 	} else {
1114 		/* SQL error detected */
1115 
1116 		return(NULL);
1117 	}
1118 
1119 	thr->run_node = que_node_get_parent(node);
1120 
1121 	return(thr);
1122 }
1123 
1124 /***********************************************************//**
1125 Creates an index. This is a high-level function used in SQL execution
1126 graphs.
1127 @return	query thread to run next or NULL */
1128 UNIV_INTERN
1129 que_thr_t*
dict_create_index_step(que_thr_t * thr)1130 dict_create_index_step(
1131 /*===================*/
1132 	que_thr_t*	thr)	/*!< in: query thread */
1133 {
1134 	ind_node_t*	node;
1135 	dberr_t		err	= DB_ERROR;
1136 	trx_t*		trx;
1137 
1138 	ut_ad(thr);
1139 	ut_ad(mutex_own(&(dict_sys->mutex)));
1140 
1141 	trx = thr_get_trx(thr);
1142 
1143 	node = static_cast<ind_node_t*>(thr->run_node);
1144 
1145 	ut_ad(que_node_get_type(node) == QUE_NODE_CREATE_INDEX);
1146 
1147 	if (thr->prev_node == que_node_get_parent(node)) {
1148 		node->state = INDEX_BUILD_INDEX_DEF;
1149 	}
1150 
1151 	if (node->state == INDEX_BUILD_INDEX_DEF) {
1152 		/* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
1153 		err = dict_build_index_def_step(thr, node);
1154 
1155 		if (err != DB_SUCCESS) {
1156 
1157 			goto function_exit;
1158 		}
1159 
1160 		node->state = INDEX_BUILD_FIELD_DEF;
1161 		node->field_no = 0;
1162 
1163 		thr->run_node = node->ind_def;
1164 
1165 		return(thr);
1166 	}
1167 
1168 	if (node->state == INDEX_BUILD_FIELD_DEF) {
1169 
1170 		if (node->field_no < (node->index)->n_fields) {
1171 
1172 			dict_build_field_def_step(node);
1173 
1174 			node->field_no++;
1175 
1176 			thr->run_node = node->field_def;
1177 
1178 			return(thr);
1179 		} else {
1180 			node->state = INDEX_ADD_TO_CACHE;
1181 		}
1182 	}
1183 
1184 	if (node->state == INDEX_ADD_TO_CACHE) {
1185 
1186 		index_id_t	index_id = node->index->id;
1187 
1188 		err = dict_index_add_to_cache(
1189 			node->table, node->index, FIL_NULL,
1190 			trx_is_strict(trx)
1191 			|| dict_table_get_format(node->table)
1192 			>= UNIV_FORMAT_B);
1193 
1194 		node->index = dict_index_get_if_in_cache_low(index_id);
1195 		ut_a((node->index == 0) == (err != DB_SUCCESS));
1196 
1197 		if (err != DB_SUCCESS) {
1198 
1199 			goto function_exit;
1200 		}
1201 
1202 		node->state = INDEX_CREATE_INDEX_TREE;
1203 	}
1204 
1205 	if (node->state == INDEX_CREATE_INDEX_TREE) {
1206 
1207 		err = dict_create_index_tree_step(node);
1208 
1209 		DBUG_EXECUTE_IF("ib_dict_create_index_tree_fail",
1210 				err = DB_OUT_OF_MEMORY;);
1211 
1212 		if (err != DB_SUCCESS) {
1213 			/* If this is a FTS index, we will need to remove
1214 			it from fts->cache->indexes list as well */
1215 			if ((node->index->type & DICT_FTS)
1216 			    && node->table->fts) {
1217 				fts_index_cache_t*	index_cache;
1218 
1219 				rw_lock_x_lock(
1220 					&node->table->fts->cache->init_lock);
1221 
1222 				index_cache = (fts_index_cache_t*)
1223 					 fts_find_index_cache(
1224 						node->table->fts->cache,
1225 						node->index);
1226 
1227 				if (index_cache->words) {
1228 					rbt_free(index_cache->words);
1229 					index_cache->words = 0;
1230 				}
1231 
1232 				ib_vector_remove(
1233 					node->table->fts->cache->indexes,
1234 					*reinterpret_cast<void**>(index_cache));
1235 
1236 				rw_lock_x_unlock(
1237 					&node->table->fts->cache->init_lock);
1238 			}
1239 
1240 			dict_index_remove_from_cache(node->table, node->index);
1241 			node->index = NULL;
1242 
1243 			goto function_exit;
1244 		}
1245 
1246 		node->index->page = node->page_no;
1247 		/* These should have been set in
1248 		dict_build_index_def_step() and
1249 		dict_index_add_to_cache(). */
1250 		ut_ad(node->index->trx_id == trx->id);
1251 		ut_ad(node->index->table->def_trx_id == trx->id);
1252 		node->state = INDEX_COMMIT_WORK;
1253 	}
1254 
1255 	if (node->state == INDEX_COMMIT_WORK) {
1256 
1257 		/* Index was correctly defined: do NOT commit the transaction
1258 		(CREATE INDEX does NOT currently do an implicit commit of
1259 		the current transaction) */
1260 
1261 		node->state = INDEX_CREATE_INDEX_TREE;
1262 
1263 		/* thr->run_node = node->commit_node;
1264 
1265 		return(thr); */
1266 	}
1267 
1268 function_exit:
1269 	trx->error_state = err;
1270 
1271 	if (err == DB_SUCCESS) {
1272 		/* Ok: do nothing */
1273 
1274 	} else if (err == DB_LOCK_WAIT) {
1275 
1276 		return(NULL);
1277 	} else {
1278 		/* SQL error detected */
1279 
1280 		return(NULL);
1281 	}
1282 
1283 	thr->run_node = que_node_get_parent(node);
1284 
1285 	return(thr);
1286 }
1287 
1288 /****************************************************************//**
1289 Check whether a system table exists.  Additionally, if it exists,
1290 move it to the non-LRU end of the table LRU list.  This is oly used
1291 for system tables that can be upgraded or added to an older database,
1292 which include SYS_FOREIGN, SYS_FOREIGN_COLS, SYS_TABLESPACES and
1293 SYS_DATAFILES.
1294 @return DB_SUCCESS if the sys table exists, DB_CORRUPTION if it exists
1295 but is not current, DB_TABLE_NOT_FOUND if it does not exist*/
1296 static
1297 dberr_t
dict_check_if_system_table_exists(const char * tablename,ulint num_fields,ulint num_indexes)1298 dict_check_if_system_table_exists(
1299 /*==============================*/
1300 	const char*	tablename,	/*!< in: name of table */
1301 	ulint		num_fields,	/*!< in: number of fields */
1302 	ulint		num_indexes)	/*!< in: number of indexes */
1303 {
1304 	dict_table_t*	sys_table;
1305 	dberr_t		error = DB_SUCCESS;
1306 
1307 	ut_a(srv_get_active_thread_type() == SRV_NONE);
1308 
1309 	mutex_enter(&dict_sys->mutex);
1310 
1311 	sys_table = dict_table_get_low(tablename);
1312 
1313 	if (sys_table == NULL) {
1314 		error = DB_TABLE_NOT_FOUND;
1315 
1316 	} else if (UT_LIST_GET_LEN(sys_table->indexes) != num_indexes
1317 		   || sys_table->n_cols != num_fields) {
1318 		error = DB_CORRUPTION;
1319 
1320 	} else {
1321 		/* This table has already been created, and it is OK.
1322 		Ensure that it can't be evicted from the table LRU cache. */
1323 
1324 		dict_table_move_from_lru_to_non_lru(sys_table);
1325 	}
1326 
1327 	mutex_exit(&dict_sys->mutex);
1328 
1329 	return(error);
1330 }
1331 
1332 /****************************************************************//**
1333 Creates the foreign key constraints system tables inside InnoDB
1334 at server bootstrap or server start if they are not found or are
1335 not of the right form.
1336 @return	DB_SUCCESS or error code */
1337 UNIV_INTERN
1338 dberr_t
dict_create_or_check_foreign_constraint_tables(void)1339 dict_create_or_check_foreign_constraint_tables(void)
1340 /*================================================*/
1341 {
1342 	trx_t*		trx;
1343 	my_bool		srv_file_per_table_backup;
1344 	dberr_t		err;
1345 	dberr_t		sys_foreign_err;
1346 	dberr_t		sys_foreign_cols_err;
1347 
1348 	ut_a(srv_get_active_thread_type() == SRV_NONE);
1349 
1350 	/* Note: The master thread has not been started at this point. */
1351 
1352 
1353 	sys_foreign_err = dict_check_if_system_table_exists(
1354 		"SYS_FOREIGN", DICT_NUM_FIELDS__SYS_FOREIGN + 1, 3);
1355 	sys_foreign_cols_err = dict_check_if_system_table_exists(
1356 		"SYS_FOREIGN_COLS", DICT_NUM_FIELDS__SYS_FOREIGN_COLS + 1, 1);
1357 
1358 	if (sys_foreign_err == DB_SUCCESS
1359 	    && sys_foreign_cols_err == DB_SUCCESS) {
1360 		return(DB_SUCCESS);
1361 	}
1362 
1363 	trx = trx_allocate_for_mysql();
1364 
1365 	trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
1366 
1367 	trx->op_info = "creating foreign key sys tables";
1368 
1369 	row_mysql_lock_data_dictionary(trx);
1370 
1371 	/* Check which incomplete table definition to drop. */
1372 
1373 	if (sys_foreign_err == DB_CORRUPTION) {
1374 		ib_logf(IB_LOG_LEVEL_WARN,
1375 			"Dropping incompletely created "
1376 			"SYS_FOREIGN table.");
1377 		row_drop_table_for_mysql("SYS_FOREIGN", trx, TRUE);
1378 	}
1379 
1380 	if (sys_foreign_cols_err == DB_CORRUPTION) {
1381 		ib_logf(IB_LOG_LEVEL_WARN,
1382 			"Dropping incompletely created "
1383 			"SYS_FOREIGN_COLS table.");
1384 
1385 		row_drop_table_for_mysql("SYS_FOREIGN_COLS", trx, TRUE);
1386 	}
1387 
1388 	ib_logf(IB_LOG_LEVEL_WARN,
1389 		"Creating foreign key constraint system tables.");
1390 
1391 	/* NOTE: in dict_load_foreigns we use the fact that
1392 	there are 2 secondary indexes on SYS_FOREIGN, and they
1393 	are defined just like below */
1394 
1395 	/* NOTE: when designing InnoDB's foreign key support in 2001, we made
1396 	an error and made the table names and the foreign key id of type
1397 	'CHAR' (internally, really a VARCHAR). We should have made the type
1398 	VARBINARY, like in other InnoDB system tables, to get a clean
1399 	design. */
1400 
1401 	srv_file_per_table_backup = srv_file_per_table;
1402 
1403 	/* We always want SYSTEM tables to be created inside the system
1404 	tablespace. */
1405 
1406 	srv_file_per_table = 0;
1407 
1408 	err = que_eval_sql(
1409 		NULL,
1410 		"PROCEDURE CREATE_FOREIGN_SYS_TABLES_PROC () IS\n"
1411 		"BEGIN\n"
1412 		"CREATE TABLE\n"
1413 		"SYS_FOREIGN(ID CHAR, FOR_NAME CHAR,"
1414 		" REF_NAME CHAR, N_COLS INT);\n"
1415 		"CREATE UNIQUE CLUSTERED INDEX ID_IND"
1416 		" ON SYS_FOREIGN (ID);\n"
1417 		"CREATE INDEX FOR_IND"
1418 		" ON SYS_FOREIGN (FOR_NAME);\n"
1419 		"CREATE INDEX REF_IND"
1420 		" ON SYS_FOREIGN (REF_NAME);\n"
1421 		"CREATE TABLE\n"
1422 		"SYS_FOREIGN_COLS(ID CHAR, POS INT,"
1423 		" FOR_COL_NAME CHAR, REF_COL_NAME CHAR);\n"
1424 		"CREATE UNIQUE CLUSTERED INDEX ID_IND"
1425 		" ON SYS_FOREIGN_COLS (ID, POS);\n"
1426 		"END;\n",
1427 		FALSE, trx);
1428 
1429 	if (err != DB_SUCCESS) {
1430 		ib_logf(IB_LOG_LEVEL_ERROR,
1431 			"Creation of SYS_FOREIGN and SYS_FOREIGN_COLS "
1432 			"has failed with error %lu.  Tablespace is full. "
1433 			"Dropping incompletely created tables.",
1434 			(ulong) err);
1435 
1436 		ut_ad(err == DB_OUT_OF_FILE_SPACE
1437 		      || err == DB_TOO_MANY_CONCURRENT_TRXS);
1438 
1439 		row_drop_table_for_mysql("SYS_FOREIGN", trx, TRUE);
1440 		row_drop_table_for_mysql("SYS_FOREIGN_COLS", trx, TRUE);
1441 
1442 		if (err == DB_OUT_OF_FILE_SPACE) {
1443 			err = DB_MUST_GET_MORE_FILE_SPACE;
1444 		}
1445 	}
1446 
1447 	trx_commit_for_mysql(trx);
1448 
1449 	row_mysql_unlock_data_dictionary(trx);
1450 
1451 	trx_free_for_mysql(trx);
1452 
1453 	srv_file_per_table = srv_file_per_table_backup;
1454 
1455 	if (err == DB_SUCCESS) {
1456 		ib_logf(IB_LOG_LEVEL_INFO,
1457 			"Foreign key constraint system tables created");
1458 	}
1459 
1460 	/* Note: The master thread has not been started at this point. */
1461 	/* Confirm and move to the non-LRU part of the table LRU list. */
1462 	sys_foreign_err = dict_check_if_system_table_exists(
1463 		"SYS_FOREIGN", DICT_NUM_FIELDS__SYS_FOREIGN + 1, 3);
1464 	ut_a(sys_foreign_err == DB_SUCCESS);
1465 
1466 	sys_foreign_cols_err = dict_check_if_system_table_exists(
1467 		"SYS_FOREIGN_COLS", DICT_NUM_FIELDS__SYS_FOREIGN_COLS + 1, 1);
1468 	ut_a(sys_foreign_cols_err == DB_SUCCESS);
1469 
1470 	return(err);
1471 }
1472 
1473 /****************************************************************//**
1474 Evaluate the given foreign key SQL statement.
1475 @return	error code or DB_SUCCESS */
1476 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1477 dberr_t
dict_foreign_eval_sql(pars_info_t * info,const char * sql,const char * name,const char * id,trx_t * trx)1478 dict_foreign_eval_sql(
1479 /*==================*/
1480 	pars_info_t*	info,	/*!< in: info struct */
1481 	const char*	sql,	/*!< in: SQL string to evaluate */
1482 	const char*	name,	/*!< in: table name (for diagnostics) */
1483 	const char*	id,	/*!< in: foreign key id */
1484 	trx_t*		trx)	/*!< in/out: transaction */
1485 {
1486 	dberr_t	error;
1487 	FILE*	ef	= dict_foreign_err_file;
1488 
1489 	error = que_eval_sql(info, sql, FALSE, trx);
1490 
1491 	if (error == DB_DUPLICATE_KEY) {
1492 		mutex_enter(&dict_foreign_err_mutex);
1493 		rewind(ef);
1494 		ut_print_timestamp(ef);
1495 		fputs(" Error in foreign key constraint creation for table ",
1496 		      ef);
1497 		ut_print_name(ef, trx, TRUE, name);
1498 		fputs(".\nA foreign key constraint of name ", ef);
1499 		ut_print_name(ef, trx, TRUE, id);
1500 		fputs("\nalready exists."
1501 		      " (Note that internally InnoDB adds 'databasename'\n"
1502 		      "in front of the user-defined constraint name.)\n"
1503 		      "Note that InnoDB's FOREIGN KEY system tables store\n"
1504 		      "constraint names as case-insensitive, with the\n"
1505 		      "MySQL standard latin1_swedish_ci collation. If you\n"
1506 		      "create tables or databases whose names differ only in\n"
1507 		      "the character case, then collisions in constraint\n"
1508 		      "names can occur. Workaround: name your constraints\n"
1509 		      "explicitly with unique names.\n",
1510 		      ef);
1511 
1512 		mutex_exit(&dict_foreign_err_mutex);
1513 
1514 		return(error);
1515 	}
1516 
1517 	if (error != DB_SUCCESS) {
1518 		fprintf(stderr,
1519 			"InnoDB: Foreign key constraint creation failed:\n"
1520 			"InnoDB: internal error number %lu\n", (ulong) error);
1521 
1522 		mutex_enter(&dict_foreign_err_mutex);
1523 		ut_print_timestamp(ef);
1524 		fputs(" Internal error in foreign key constraint creation"
1525 		      " for table ", ef);
1526 		ut_print_name(ef, trx, TRUE, name);
1527 		fputs(".\n"
1528 		      "See the MySQL .err log in the datadir"
1529 		      " for more information.\n", ef);
1530 		mutex_exit(&dict_foreign_err_mutex);
1531 
1532 		return(error);
1533 	}
1534 
1535 	return(DB_SUCCESS);
1536 }
1537 
1538 /********************************************************************//**
1539 Add a single foreign key field definition to the data dictionary tables in
1540 the database.
1541 @return	error code or DB_SUCCESS */
1542 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1543 dberr_t
dict_create_add_foreign_field_to_dictionary(ulint field_nr,const char * table_name,const dict_foreign_t * foreign,trx_t * trx)1544 dict_create_add_foreign_field_to_dictionary(
1545 /*========================================*/
1546 	ulint			field_nr,	/*!< in: field number */
1547 	const char*		table_name,	/*!< in: table name */
1548 	const dict_foreign_t*	foreign,	/*!< in: foreign */
1549 	trx_t*			trx)		/*!< in/out: transaction */
1550 {
1551 	pars_info_t*	info = pars_info_create();
1552 
1553 	pars_info_add_str_literal(info, "id", foreign->id);
1554 
1555 	pars_info_add_int4_literal(info, "pos", field_nr);
1556 
1557 	pars_info_add_str_literal(info, "for_col_name",
1558 				  foreign->foreign_col_names[field_nr]);
1559 
1560 	pars_info_add_str_literal(info, "ref_col_name",
1561 				  foreign->referenced_col_names[field_nr]);
1562 
1563 	return(dict_foreign_eval_sql(
1564 		       info,
1565 		       "PROCEDURE P () IS\n"
1566 		       "BEGIN\n"
1567 		       "INSERT INTO SYS_FOREIGN_COLS VALUES"
1568 		       "(:id, :pos, :for_col_name, :ref_col_name);\n"
1569 		       "END;\n",
1570 		       table_name, foreign->id, trx));
1571 }
1572 
1573 /********************************************************************//**
1574 Add a foreign key definition to the data dictionary tables.
1575 @return	error code or DB_SUCCESS */
1576 UNIV_INTERN
1577 dberr_t
dict_create_add_foreign_to_dictionary(const char * name,const dict_foreign_t * foreign,trx_t * trx)1578 dict_create_add_foreign_to_dictionary(
1579 /*==================================*/
1580 	const char*		name,	/*!< in: table name */
1581 	const dict_foreign_t*	foreign,/*!< in: foreign key */
1582 	trx_t*			trx)	/*!< in/out: dictionary transaction */
1583 {
1584 	dberr_t		error;
1585 	pars_info_t*	info = pars_info_create();
1586 
1587 	pars_info_add_str_literal(info, "id", foreign->id);
1588 
1589 	pars_info_add_str_literal(info, "for_name", name);
1590 
1591 	pars_info_add_str_literal(info, "ref_name",
1592 				  foreign->referenced_table_name);
1593 
1594 	pars_info_add_int4_literal(info, "n_cols",
1595 				   foreign->n_fields + (foreign->type << 24));
1596 
1597 	error = dict_foreign_eval_sql(info,
1598 				      "PROCEDURE P () IS\n"
1599 				      "BEGIN\n"
1600 				      "INSERT INTO SYS_FOREIGN VALUES"
1601 				      "(:id, :for_name, :ref_name, :n_cols);\n"
1602 				      "END;\n"
1603 				      , name, foreign->id, trx);
1604 
1605 	if (error != DB_SUCCESS) {
1606 
1607 		return(error);
1608 	}
1609 
1610 	for (ulint i = 0; i < foreign->n_fields; i++) {
1611 		error = dict_create_add_foreign_field_to_dictionary(
1612 			i, name, foreign, trx);
1613 
1614 		if (error != DB_SUCCESS) {
1615 
1616 			return(error);
1617 		}
1618 	}
1619 
1620 	return(error);
1621 }
1622 
1623 /** Adds the given set of foreign key objects to the dictionary tables
1624 in the database. This function does not modify the dictionary cache. The
1625 caller must ensure that all foreign key objects contain a valid constraint
1626 name in foreign->id.
1627 @param[in]	local_fk_set	set of foreign key objects, to be added to
1628 the dictionary tables
1629 @param[in]	table		table to which the foreign key objects in
1630 local_fk_set belong to
1631 @param[in,out]	trx		transaction
1632 @return error code or DB_SUCCESS */
1633 UNIV_INTERN
1634 dberr_t
dict_create_add_foreigns_to_dictionary(const dict_foreign_set & local_fk_set,const dict_table_t * table,trx_t * trx)1635 dict_create_add_foreigns_to_dictionary(
1636 /*===================================*/
1637 	const dict_foreign_set&	local_fk_set,
1638 	const dict_table_t*	table,
1639 	trx_t*			trx)
1640 {
1641 	dict_foreign_t*	foreign;
1642 	dberr_t		error;
1643 
1644 	ut_ad(mutex_own(&(dict_sys->mutex)));
1645 
1646 	if (NULL == dict_table_get_low("SYS_FOREIGN")) {
1647 		fprintf(stderr,
1648 			"InnoDB: table SYS_FOREIGN not found"
1649 			" in internal data dictionary\n");
1650 
1651 		return(DB_ERROR);
1652 	}
1653 
1654 	for (dict_foreign_set::const_iterator it = local_fk_set.begin();
1655 	     it != local_fk_set.end();
1656 	     ++it) {
1657 
1658 		foreign = *it;
1659 		ut_ad(foreign->id != NULL);
1660 
1661 		error = dict_create_add_foreign_to_dictionary(table->name,
1662 							      foreign, trx);
1663 
1664 		if (error != DB_SUCCESS) {
1665 
1666 			return(error);
1667 		}
1668 	}
1669 
1670 	trx->op_info = "committing foreign key definitions";
1671 
1672 	trx_commit(trx);
1673 
1674 	trx->op_info = "";
1675 
1676 	return(DB_SUCCESS);
1677 }
1678 
1679 /****************************************************************//**
1680 Creates the tablespaces and datafiles system tables inside InnoDB
1681 at server bootstrap or server start if they are not found or are
1682 not of the right form.
1683 @return	DB_SUCCESS or error code */
1684 UNIV_INTERN
1685 dberr_t
dict_create_or_check_sys_tablespace(void)1686 dict_create_or_check_sys_tablespace(void)
1687 /*=====================================*/
1688 {
1689 	trx_t*		trx;
1690 	my_bool		srv_file_per_table_backup;
1691 	dberr_t		err;
1692 	dberr_t		sys_tablespaces_err;
1693 	dberr_t		sys_datafiles_err;
1694 
1695 	ut_a(srv_get_active_thread_type() == SRV_NONE);
1696 
1697 	/* Note: The master thread has not been started at this point. */
1698 
1699 	sys_tablespaces_err = dict_check_if_system_table_exists(
1700 		"SYS_TABLESPACES", DICT_NUM_FIELDS__SYS_TABLESPACES + 1, 1);
1701 	sys_datafiles_err = dict_check_if_system_table_exists(
1702 		"SYS_DATAFILES", DICT_NUM_FIELDS__SYS_DATAFILES + 1, 1);
1703 
1704 	if (sys_tablespaces_err == DB_SUCCESS
1705 	    && sys_datafiles_err == DB_SUCCESS) {
1706 		return(DB_SUCCESS);
1707 	}
1708 
1709 	trx = trx_allocate_for_mysql();
1710 
1711 	trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
1712 
1713 	trx->op_info = "creating tablepace and datafile sys tables";
1714 
1715 	row_mysql_lock_data_dictionary(trx);
1716 
1717 	/* Check which incomplete table definition to drop. */
1718 
1719 	if (sys_tablespaces_err == DB_CORRUPTION) {
1720 		ib_logf(IB_LOG_LEVEL_WARN,
1721 			"Dropping incompletely created "
1722 			"SYS_TABLESPACES table.");
1723 		row_drop_table_for_mysql("SYS_TABLESPACES", trx, TRUE);
1724 	}
1725 
1726 	if (sys_datafiles_err == DB_CORRUPTION) {
1727 		ib_logf(IB_LOG_LEVEL_WARN,
1728 			"Dropping incompletely created "
1729 			"SYS_DATAFILES table.");
1730 
1731 		row_drop_table_for_mysql("SYS_DATAFILES", trx, TRUE);
1732 	}
1733 
1734 	ib_logf(IB_LOG_LEVEL_INFO,
1735 		"Creating tablespace and datafile system tables.");
1736 
1737 	/* We always want SYSTEM tables to be created inside the system
1738 	tablespace. */
1739 	srv_file_per_table_backup = srv_file_per_table;
1740 	srv_file_per_table = 0;
1741 
1742 	err = que_eval_sql(
1743 		NULL,
1744 		"PROCEDURE CREATE_SYS_TABLESPACE_PROC () IS\n"
1745 		"BEGIN\n"
1746 		"CREATE TABLE SYS_TABLESPACES(\n"
1747 		" SPACE INT, NAME CHAR, FLAGS INT);\n"
1748 		"CREATE UNIQUE CLUSTERED INDEX SYS_TABLESPACES_SPACE"
1749 		" ON SYS_TABLESPACES (SPACE);\n"
1750 		"CREATE TABLE SYS_DATAFILES(\n"
1751 		" SPACE INT, PATH CHAR);\n"
1752 		"CREATE UNIQUE CLUSTERED INDEX SYS_DATAFILES_SPACE"
1753 		" ON SYS_DATAFILES (SPACE);\n"
1754 		"END;\n",
1755 		FALSE, trx);
1756 
1757 	if (err != DB_SUCCESS) {
1758 		ib_logf(IB_LOG_LEVEL_ERROR,
1759 			"Creation of SYS_TABLESPACES and SYS_DATAFILES "
1760 			"has failed with error %lu.  Tablespace is full. "
1761 			"Dropping incompletely created tables.",
1762 			(ulong) err);
1763 
1764 		ut_a(err == DB_OUT_OF_FILE_SPACE
1765 		     || err == DB_TOO_MANY_CONCURRENT_TRXS);
1766 
1767 		row_drop_table_for_mysql("SYS_TABLESPACES", trx, TRUE);
1768 		row_drop_table_for_mysql("SYS_DATAFILES", trx, TRUE);
1769 
1770 		if (err == DB_OUT_OF_FILE_SPACE) {
1771 			err = DB_MUST_GET_MORE_FILE_SPACE;
1772 		}
1773 	}
1774 
1775 	trx_commit_for_mysql(trx);
1776 
1777 	row_mysql_unlock_data_dictionary(trx);
1778 
1779 	trx_free_for_mysql(trx);
1780 
1781 	srv_file_per_table = srv_file_per_table_backup;
1782 
1783 	if (err == DB_SUCCESS) {
1784 		ib_logf(IB_LOG_LEVEL_INFO,
1785 			"Tablespace and datafile system tables created.");
1786 	}
1787 
1788 	/* Note: The master thread has not been started at this point. */
1789 	/* Confirm and move to the non-LRU part of the table LRU list. */
1790 
1791 	sys_tablespaces_err = dict_check_if_system_table_exists(
1792 		"SYS_TABLESPACES", DICT_NUM_FIELDS__SYS_TABLESPACES + 1, 1);
1793 	ut_a(sys_tablespaces_err == DB_SUCCESS);
1794 
1795 	sys_datafiles_err = dict_check_if_system_table_exists(
1796 		"SYS_DATAFILES", DICT_NUM_FIELDS__SYS_DATAFILES + 1, 1);
1797 	ut_a(sys_datafiles_err == DB_SUCCESS);
1798 
1799 	return(err);
1800 }
1801 
1802 /** Creates the zip_dict system table inside InnoDB
1803 at server bootstrap or server start if it is not found or is
1804 not of the right form.
1805 @return	DB_SUCCESS or error code */
1806 UNIV_INTERN
1807 dberr_t
dict_create_or_check_sys_zip_dict(void)1808 dict_create_or_check_sys_zip_dict(void)
1809 {
1810 	trx_t*		trx;
1811 	my_bool		srv_file_per_table_backup;
1812 	dberr_t		err;
1813 	dberr_t		sys_zip_dict_err;
1814 	dberr_t		sys_zip_dict_cols_err;
1815 
1816 	ut_a(srv_get_active_thread_type() == SRV_NONE);
1817 
1818 	/* Note: The master thread has not been started at this point. */
1819 
1820 	sys_zip_dict_err = dict_check_if_system_table_exists(
1821 		"SYS_ZIP_DICT", DICT_NUM_FIELDS__SYS_ZIP_DICT + 1, 2);
1822 	sys_zip_dict_cols_err = dict_check_if_system_table_exists(
1823 		"SYS_ZIP_DICT_COLS", DICT_NUM_FIELDS__SYS_ZIP_DICT_COLS + 1,
1824 		1);
1825 
1826 	if (sys_zip_dict_err == DB_SUCCESS &&
1827 		sys_zip_dict_cols_err == DB_SUCCESS)
1828 		return (DB_SUCCESS);
1829 
1830 	trx = trx_allocate_for_mysql();
1831 
1832 	trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
1833 
1834 	trx->op_info = "creating zip_dict and zip_dict_cols sys tables";
1835 
1836 	row_mysql_lock_data_dictionary(trx);
1837 
1838 	/* Check which incomplete table definition to drop. */
1839 
1840 	if (sys_zip_dict_err == DB_CORRUPTION) {
1841 		ib_logf(IB_LOG_LEVEL_WARN,
1842 			"Dropping incompletely created "
1843 			"SYS_ZIP_DICT table.");
1844 		row_drop_table_for_mysql("SYS_ZIP_DICT", trx, TRUE);
1845 	}
1846 	if (sys_zip_dict_cols_err == DB_CORRUPTION) {
1847 		ib_logf(IB_LOG_LEVEL_WARN,
1848 			"Dropping incompletely created "
1849 			"SYS_ZIP_DICT_COLS table.");
1850 		row_drop_table_for_mysql("SYS_ZIP_DICT_COLS", trx, TRUE);
1851 	}
1852 
1853 	ib_logf(IB_LOG_LEVEL_INFO,
1854 		"Creating zip_dict and zip_dict_cols system tables.");
1855 
1856 	/* We always want SYSTEM tables to be created inside the system
1857 	tablespace. */
1858 	srv_file_per_table_backup = srv_file_per_table;
1859 	srv_file_per_table = 0;
1860 
1861 	err = que_eval_sql(
1862 		NULL,
1863 		"PROCEDURE CREATE_SYS_ZIP_DICT_PROC () IS\n"
1864 		"BEGIN\n"
1865 		"CREATE TABLE SYS_ZIP_DICT(\n"
1866 		"  ID INT UNSIGNED NOT NULL,\n"
1867 		"  NAME CHAR("
1868 		  STRINGIFY_ARG(ZIP_DICT_MAX_NAME_LENGTH)
1869 		") NOT NULL,\n"
1870 		"  DATA BLOB NOT NULL\n"
1871 		");\n"
1872 		"CREATE UNIQUE CLUSTERED INDEX SYS_ZIP_DICT_ID"
1873 		" ON SYS_ZIP_DICT (ID);\n"
1874 		"CREATE UNIQUE INDEX SYS_ZIP_DICT_NAME"
1875 		" ON SYS_ZIP_DICT (NAME);\n"
1876 		"CREATE TABLE SYS_ZIP_DICT_COLS(\n"
1877 		"  TABLE_ID INT UNSIGNED NOT NULL,\n"
1878 		"  COLUMN_POS INT UNSIGNED NOT NULL,\n"
1879 		"  DICT_ID INT UNSIGNED NOT NULL\n"
1880 		");\n"
1881 		"CREATE UNIQUE CLUSTERED INDEX SYS_ZIP_DICT_COLS_COMPOSITE"
1882 		" ON SYS_ZIP_DICT_COLS (TABLE_ID, COLUMN_POS);\n"
1883 		"END;\n",
1884 		FALSE, trx);
1885 
1886 	if (err != DB_SUCCESS) {
1887 		ib_logf(IB_LOG_LEVEL_ERROR,
1888 			"Creation of SYS_ZIP_DICT and SYS_ZIP_DICT_COLS"
1889 			"has failed with error %lu. Tablespace is full. "
1890 			"Dropping incompletely created tables.",
1891 			(ulong) err);
1892 
1893 		ut_a(err == DB_OUT_OF_FILE_SPACE
1894 			|| err == DB_TOO_MANY_CONCURRENT_TRXS);
1895 
1896 		row_drop_table_for_mysql("SYS_ZIP_DICT", trx, TRUE);
1897 		row_drop_table_for_mysql("SYS_ZIP_DICT_COLS", trx, TRUE);
1898 
1899 		if (err == DB_OUT_OF_FILE_SPACE) {
1900 			err = DB_MUST_GET_MORE_FILE_SPACE;
1901 		}
1902 	}
1903 
1904 	trx_commit_for_mysql(trx);
1905 
1906 	row_mysql_unlock_data_dictionary(trx);
1907 
1908 	trx_free_for_mysql(trx);
1909 
1910 	srv_file_per_table = srv_file_per_table_backup;
1911 
1912 	if (err == DB_SUCCESS) {
1913 		ib_logf(IB_LOG_LEVEL_INFO,
1914 			"zip_dict and zip_dict_cols system tables created.");
1915 	}
1916 
1917 	/* Note: The master thread has not been started at this point. */
1918 	/* Confirm and move to the non-LRU part of the table LRU list. */
1919 
1920 	sys_zip_dict_err = dict_check_if_system_table_exists(
1921 		"SYS_ZIP_DICT", DICT_NUM_FIELDS__SYS_ZIP_DICT + 1, 2);
1922 	ut_a(sys_zip_dict_err == DB_SUCCESS);
1923 	sys_zip_dict_cols_err = dict_check_if_system_table_exists(
1924 		"SYS_ZIP_DICT_COLS",
1925 		DICT_NUM_FIELDS__SYS_ZIP_DICT_COLS + 1, 1);
1926 	ut_a(sys_zip_dict_cols_err == DB_SUCCESS);
1927 
1928 	return(err);
1929 }
1930 
1931 /********************************************************************//**
1932 Add a single tablespace definition to the data dictionary tables in the
1933 database.
1934 @return	error code or DB_SUCCESS */
1935 UNIV_INTERN
1936 dberr_t
dict_create_add_tablespace_to_dictionary(ulint space,const char * name,ulint flags,const char * path,trx_t * trx,bool commit)1937 dict_create_add_tablespace_to_dictionary(
1938 /*=====================================*/
1939 	ulint		space,		/*!< in: tablespace id */
1940 	const char*	name,		/*!< in: tablespace name */
1941 	ulint		flags,		/*!< in: tablespace flags */
1942 	const char*	path,		/*!< in: tablespace path */
1943 	trx_t*		trx,		/*!< in/out: transaction */
1944 	bool		commit)		/*!< in: if true then commit the
1945 					transaction */
1946 {
1947 	dberr_t		error;
1948 
1949 	pars_info_t*	info = pars_info_create();
1950 
1951 	ut_a(space > TRX_SYS_SPACE);
1952 
1953 	pars_info_add_int4_literal(info, "space", space);
1954 
1955 	pars_info_add_str_literal(info, "name", name);
1956 
1957 	pars_info_add_int4_literal(info, "flags", flags);
1958 
1959 	pars_info_add_str_literal(info, "path", path);
1960 
1961 	error = que_eval_sql(info,
1962 			     "PROCEDURE P () IS\n"
1963 			     "BEGIN\n"
1964 			     "INSERT INTO SYS_TABLESPACES VALUES"
1965 			     "(:space, :name, :flags);\n"
1966 			     "INSERT INTO SYS_DATAFILES VALUES"
1967 			     "(:space, :path);\n"
1968 			     "END;\n",
1969 			     FALSE, trx);
1970 
1971 	if (error != DB_SUCCESS) {
1972 		return(error);
1973 	}
1974 
1975 	if (commit) {
1976 		trx->op_info = "committing tablespace and datafile definition";
1977 		trx_commit(trx);
1978 	}
1979 
1980 	trx->op_info = "";
1981 
1982 	return(error);
1983 }
1984 
1985 /** Add a single compression dictionary definition to the SYS_ZIP_DICT
1986 InnoDB system table.
1987 @return	error code or DB_SUCCESS */
1988 UNIV_INTERN
1989 dberr_t
dict_create_add_zip_dict(const char * name,ulint name_len,const char * data,ulint data_len,trx_t * trx)1990 dict_create_add_zip_dict(
1991 	const char*	name,		/*!< in: dict name */
1992 	ulint		name_len,	/*!< in: dict name length */
1993 	const char*	data,		/*!< in: dict data */
1994 	ulint		data_len,	/*!< in: dict data length */
1995 	trx_t*		trx)		/*!< in/out: transaction */
1996 {
1997 	ut_ad(name);
1998 	ut_ad(data);
1999 
2000 	pars_info_t* info = pars_info_create();
2001 
2002 	pars_info_add_literal(info, "name", name, name_len,
2003 		DATA_VARCHAR, DATA_ENGLISH);
2004 	pars_info_add_literal(info, "data", data, data_len,
2005 		DATA_BLOB, DATA_BINARY_TYPE | DATA_NOT_NULL);
2006 
2007 	dberr_t error = que_eval_sql(info,
2008 		"PROCEDURE P () IS\n"
2009 		"  max_id INT;\n"
2010 		"DECLARE CURSOR cur IS\n"
2011 		"  SELECT ID FROM SYS_ZIP_DICT\n"
2012 		"  ORDER BY ID DESC;\n"
2013 		"BEGIN\n"
2014 		"  max_id := 0;\n"
2015 		"  OPEN cur;\n"
2016 		"  FETCH cur INTO max_id;\n"
2017 		"  IF (cur % NOTFOUND) THEN\n"
2018 		"    max_id := 0;\n"
2019 		"  END IF;\n"
2020 		"  CLOSE cur;\n"
2021 		"  INSERT INTO SYS_ZIP_DICT VALUES"
2022 		"    (max_id + 1, :name, :data);\n"
2023 		"END;\n",
2024 		FALSE, trx);
2025 
2026 	return error;
2027 }
2028 
2029 /** Fetch callback, just stores extracted zip_dict id in the external
2030 variable.
2031 @return TRUE if all OK */
2032 static
2033 ibool
dict_create_extract_int_aux(void * row,void * user_arg)2034 dict_create_extract_int_aux(
2035 	void*	row,		/*!< in: sel_node_t* */
2036 	void*	user_arg)	/*!< in: int32 id */
2037 {
2038 	sel_node_t*	node = static_cast<sel_node_t*>(row);
2039 	dfield_t*	dfield = que_node_get_val(node->select_list);
2040 	dtype_t*	type = dfield_get_type(dfield);
2041 	ulint		len = dfield_get_len(dfield);
2042 
2043 	ut_a(dtype_get_mtype(type) == DATA_INT);
2044 	ut_a(len == sizeof(ib_uint32_t));
2045 
2046 	memcpy(user_arg, dfield_get_data(dfield), sizeof(ib_uint32_t));
2047 
2048 	return(TRUE);
2049 }
2050 
2051 /** Add a single compression dictionary reference to the SYS_ZIP_DICT_COLS
2052 InnoDB system table.
2053 @return	error code or DB_SUCCESS */
2054 UNIV_INTERN
2055 dberr_t
dict_create_add_zip_dict_reference(ulint table_id,ulint column_pos,ulint dict_id,trx_t * trx)2056 dict_create_add_zip_dict_reference(
2057 	ulint		table_id,	/*!< in: table id */
2058 	ulint		column_pos,	/*!< in: column position */
2059 	ulint		dict_id,	/*!< in: dict id */
2060 	trx_t*		trx)		/*!< in/out: transaction */
2061 {
2062 	pars_info_t* info = pars_info_create();
2063 
2064 	pars_info_add_int4_literal(info, "table_id", table_id);
2065 	pars_info_add_int4_literal(info, "column_pos", column_pos);
2066 	pars_info_add_int4_literal(info, "dict_id", dict_id);
2067 
2068 	dberr_t error = que_eval_sql(info,
2069 		"PROCEDURE P () IS\n"
2070 		"BEGIN\n"
2071 		"  INSERT INTO SYS_ZIP_DICT_COLS VALUES"
2072 		"    (:table_id, :column_pos, :dict_id);\n"
2073 		"END;\n",
2074 		FALSE, trx);
2075 	return error;
2076 }
2077 
2078 /** Get a single compression dictionary id for the given
2079 (table id, column pos) pair.
2080 @return	error code or DB_SUCCESS */
2081 UNIV_INTERN
2082 dberr_t
dict_create_get_zip_dict_id_by_reference(ulint table_id,ulint column_pos,ulint * dict_id,trx_t * trx)2083 dict_create_get_zip_dict_id_by_reference(
2084 	ulint	table_id,	/*!< in: table id */
2085 	ulint	column_pos,	/*!< in: column position */
2086 	ulint*	dict_id,	/*!< out: dict id */
2087 	trx_t*	trx)		/*!< in/out: transaction */
2088 {
2089 	ut_ad(dict_id);
2090 
2091 	pars_info_t* info = pars_info_create();
2092 
2093 	ib_uint32_t dict_id_buf;
2094 	mach_write_to_4(reinterpret_cast<byte*>(&dict_id_buf ),
2095 		ULINT32_UNDEFINED);
2096 
2097 	pars_info_add_int4_literal(info, "table_id", table_id);
2098 	pars_info_add_int4_literal(info, "column_pos", column_pos);
2099 	pars_info_bind_function(
2100 		info, "my_func", dict_create_extract_int_aux, &dict_id_buf);
2101 
2102 	dberr_t error = que_eval_sql(info,
2103 		"PROCEDURE P () IS\n"
2104 		"DECLARE FUNCTION my_func;\n"
2105 		"DECLARE CURSOR cur IS\n"
2106 		"  SELECT DICT_ID FROM SYS_ZIP_DICT_COLS\n"
2107 		"    WHERE TABLE_ID = :table_id AND\n"
2108 		"          COLUMN_POS = :column_pos;\n"
2109 		"BEGIN\n"
2110 		"  OPEN cur;\n"
2111 		"  FETCH cur INTO my_func();\n"
2112 		"  CLOSE cur;\n"
2113 		"END;\n",
2114 		FALSE, trx);
2115 	if (error == DB_SUCCESS) {
2116 		ib_uint32_t local_dict_id = mach_read_from_4(
2117 			reinterpret_cast<const byte*>(&dict_id_buf));
2118 		if (local_dict_id == ULINT32_UNDEFINED)
2119 			error = DB_RECORD_NOT_FOUND;
2120 		else
2121 			*dict_id = local_dict_id;
2122 	}
2123 	return error;
2124 }
2125 
2126 /** Get compression dictionary id for the given name.
2127 @return	error code or DB_SUCCESS */
2128 UNIV_INTERN
2129 dberr_t
dict_create_get_zip_dict_id_by_name(const char * dict_name,ulint dict_name_len,ulint * dict_id,trx_t * trx)2130 dict_create_get_zip_dict_id_by_name(
2131 	const char*	dict_name,	/*!< in: dict name */
2132 	ulint		dict_name_len,	/*!< in: dict name length */
2133 	ulint*		dict_id,	/*!< out: dict id */
2134 	trx_t*		trx)		/*!< in/out: transaction */
2135 {
2136 	ut_ad(dict_name);
2137 	ut_ad(dict_name_len);
2138 	ut_ad(dict_id);
2139 
2140 	pars_info_t* info = pars_info_create();
2141 
2142 	pars_info_add_literal(info, "dict_name", dict_name, dict_name_len,
2143 		DATA_VARCHAR, DATA_ENGLISH);
2144 
2145 	ib_uint32_t dict_id_buf;
2146 	mach_write_to_4(reinterpret_cast<byte*>(&dict_id_buf),
2147 		ULINT32_UNDEFINED);
2148 	pars_info_bind_function(
2149 		info, "my_func", dict_create_extract_int_aux, &dict_id_buf);
2150 
2151 	dberr_t error = que_eval_sql(info,
2152 		"PROCEDURE P () IS\n"
2153 		"DECLARE FUNCTION my_func;\n"
2154 		"DECLARE CURSOR cur IS\n"
2155 		"  SELECT ID FROM SYS_ZIP_DICT\n"
2156 		"    WHERE NAME = :dict_name;\n"
2157 		"BEGIN\n"
2158 		"  OPEN cur;\n"
2159 		"  FETCH cur INTO my_func();\n"
2160 		"  CLOSE cur;\n"
2161 		"END;\n",
2162 		FALSE, trx);
2163 	if (error == DB_SUCCESS) {
2164 		ib_uint32_t local_dict_id = mach_read_from_4(
2165 			reinterpret_cast<const byte*>(&dict_id_buf));
2166 		if (local_dict_id == ULINT32_UNDEFINED)
2167 			error = DB_RECORD_NOT_FOUND;
2168 		else
2169 			*dict_id = local_dict_id;
2170 	}
2171 	return error;
2172 }
2173 
2174 /** Auxiliary enum used to indicate zip dict data extraction result code */
2175 enum zip_dict_info_aux_code {
2176 	zip_dict_info_success,		/*!< success */
2177 	zip_dict_info_not_found,	/*!< zip dict record not found */
2178 	zip_dict_info_oom,		/*!< out of memory */
2179 	zip_dict_info_corrupted_name,	/*!< corrupted zip dict name */
2180 	zip_dict_info_corrupted_data	/*!< corrupted zip dict data */
2181 };
2182 
2183 /** Auxiliary struct used to return zip dict info aling with result code */
2184 struct zip_dict_info_aux {
2185 	LEX_STRING	name;	/*!< zip dict name */
2186 	LEX_STRING	data;	/*!< zip dict data */
2187 	int		code;	/*!< result code (0 - success) */
2188 };
2189 
2190 /** Fetch callback, just stores extracted zip_dict data in the external
2191 variable.
2192 @return always returns TRUE */
2193 static
2194 ibool
dict_create_get_zip_dict_info_by_id_aux(void * row,void * user_arg)2195 dict_create_get_zip_dict_info_by_id_aux(
2196 	void*	row,		/*!< in: sel_node_t* */
2197 	void*	user_arg)	/*!< in: pointer to zip_dict_info_aux* */
2198 {
2199 	sel_node_t*		node = static_cast<sel_node_t*>(row);
2200 	zip_dict_info_aux*	result =
2201 		static_cast<zip_dict_info_aux*>(user_arg);
2202 
2203 	result->code = zip_dict_info_success;
2204 	result->name.str = 0;
2205 	result->name.length = 0;
2206 	result->data.str = 0;
2207 	result->data.length = 0;
2208 
2209 	/* NAME field */
2210 	que_node_t*	exp = node->select_list;
2211 	ut_a(exp != 0);
2212 
2213 	dfield_t*	dfield = que_node_get_val(exp);
2214 	dtype_t*	type = dfield_get_type(dfield);
2215 	ut_a(dtype_get_mtype(type) == DATA_VARCHAR);
2216 
2217 	ulint	len = dfield_get_len(dfield);
2218 	void*	data = dfield_get_data(dfield);
2219 
2220 
2221 	if (len == UNIV_SQL_NULL) {
2222 		result->code = zip_dict_info_corrupted_name;
2223 	}
2224 	else {
2225 		result->name.str =
2226 			static_cast<char*>(my_malloc(len + 1, MYF(0)));
2227 		if (result->name.str == 0) {
2228 			result->code = zip_dict_info_oom;
2229 		}
2230 		else {
2231 			memcpy(result->name.str, data, len);
2232 			result->name.str[len] = '\0';
2233 			result->name.length = len;
2234 		}
2235 	}
2236 
2237 	/* DATA field */
2238 	exp = que_node_get_next(exp);
2239 	ut_a(exp != 0);
2240 
2241 	dfield = que_node_get_val(exp);
2242 	type = dfield_get_type(dfield);
2243 	ut_a(dtype_get_mtype(type) == DATA_BLOB);
2244 
2245 	len = dfield_get_len(dfield);
2246 	data = dfield_get_data(dfield);
2247 
2248 	if (len == UNIV_SQL_NULL) {
2249 		result->code = zip_dict_info_corrupted_data;
2250 	}
2251 	else {
2252 		result->data.str =
2253 			static_cast<char*>(my_malloc(
2254 				len == 0 ? 1 : len, MYF(0)));
2255 		if (result->data.str == 0) {
2256 			result->code = zip_dict_info_oom;
2257 		}
2258 		else {
2259 			memcpy(result->data.str, data, len);
2260 			result->data.length = len;
2261 		}
2262 	}
2263 
2264 	ut_ad(que_node_get_next(exp) == 0);
2265 
2266 	if (result->code != zip_dict_info_success) {
2267 		if (result->name.str == 0) {
2268 			mem_free(result->name.str);
2269 			result->name.str = 0;
2270 			result->name.length = 0;
2271 		}
2272 		if (result->data.str == 0) {
2273 			mem_free(result->data.str);
2274 			result->data.str = 0;
2275 			result->data.length = 0;
2276 		}
2277 	}
2278 
2279 	return TRUE;
2280 }
2281 
2282 /** Get compression dictionary info (name and data) for the given id.
2283 Allocates memory for name and data on success.
2284 Must be freed with mem_free().
2285 @return	error code or DB_SUCCESS */
2286 UNIV_INTERN
2287 dberr_t
dict_create_get_zip_dict_info_by_id(ulint dict_id,char ** name,ulint * name_len,char ** data,ulint * data_len,trx_t * trx)2288 dict_create_get_zip_dict_info_by_id(
2289 	ulint	dict_id,	/*!< in: dict id */
2290 	char**	name,		/*!< out: dict name */
2291 	ulint*	name_len,	/*!< out: dict name length*/
2292 	char**	data,		/*!< out: dict data */
2293 	ulint*	data_len,	/*!< out: dict data length*/
2294 	trx_t*	trx)		/*!< in/out: transaction */
2295 {
2296 	ut_ad(name);
2297 	ut_ad(data);
2298 
2299 	zip_dict_info_aux rec;
2300 	rec.code = zip_dict_info_not_found;
2301 	pars_info_t* info = pars_info_create();
2302 
2303 	pars_info_add_int4_literal(info, "id", dict_id);
2304 	pars_info_bind_function(
2305 		info, "my_func", dict_create_get_zip_dict_info_by_id_aux,
2306 		&rec);
2307 
2308 	dberr_t error = que_eval_sql(info,
2309 		"PROCEDURE P () IS\n"
2310 		"DECLARE FUNCTION my_func;\n"
2311 		"DECLARE CURSOR cur IS\n"
2312 		"  SELECT NAME, DATA FROM SYS_ZIP_DICT\n"
2313 		"    WHERE ID = :id;\n"
2314 		"BEGIN\n"
2315 		"  OPEN cur;\n"
2316 		"  FETCH cur INTO my_func();\n"
2317 		"  CLOSE cur;\n"
2318 		"END;\n",
2319 		FALSE, trx);
2320 	if (error == DB_SUCCESS) {
2321 		switch (rec.code) {
2322 			case zip_dict_info_success:
2323 				*name = rec.name.str;
2324 				*name_len = rec.name.length;
2325 				*data = rec.data.str;
2326 				*data_len = rec.data.length;
2327 				break;
2328 			case zip_dict_info_not_found:
2329 				error = DB_RECORD_NOT_FOUND;
2330 				break;
2331 			case zip_dict_info_oom:
2332 				error = DB_OUT_OF_MEMORY;
2333 				break;
2334 			case zip_dict_info_corrupted_name:
2335 			case zip_dict_info_corrupted_data:
2336 				error = DB_INVALID_NULL;
2337 				break;
2338 			default:
2339 				ut_error;
2340 		}
2341 	}
2342 	return error;
2343 }
2344 
2345 /** Remove a single compression dictionary from the data dictionary
2346 tables in the database.
2347 @return	error code or DB_SUCCESS */
2348 UNIV_INTERN
2349 dberr_t
dict_create_remove_zip_dict(const char * name,ulint name_len,trx_t * trx)2350 dict_create_remove_zip_dict(
2351 	const char*	name,		/*!< in: dict name */
2352 	ulint		name_len,	/*!< in: dict name length */
2353 	trx_t*		trx)		/*!< in/out: transaction */
2354 {
2355 	ut_ad(name);
2356 
2357 	pars_info_t* info = pars_info_create();
2358 
2359 	ib_uint32_t dict_id_buf;
2360 	mach_write_to_4(reinterpret_cast<byte*>(&dict_id_buf),
2361 		ULINT32_UNDEFINED);
2362 	ib_uint32_t counter_buf;
2363 	mach_write_to_4(reinterpret_cast<byte*>(&counter_buf),
2364 		ULINT32_UNDEFINED);
2365 
2366 	pars_info_add_literal(info, "name", name, name_len,
2367 		DATA_VARCHAR, DATA_ENGLISH);
2368 	pars_info_bind_int4_literal(info, "dict_id", &dict_id_buf);
2369 	pars_info_bind_function(info, "find_dict_func",
2370 		dict_create_extract_int_aux, &dict_id_buf);
2371 	pars_info_bind_function(info, "count_func",
2372 		dict_create_extract_int_aux, &counter_buf);
2373 
2374 	dberr_t error = que_eval_sql(info,
2375 		"PROCEDURE P () IS\n"
2376 		"DECLARE FUNCTION find_dict_func;\n"
2377 		"DECLARE FUNCTION count_func;\n"
2378 		"DECLARE CURSOR dict_cur IS\n"
2379 		"  SELECT ID FROM SYS_ZIP_DICT\n"
2380 		"    WHERE NAME = :name\n"
2381 		"  FOR UPDATE;\n"
2382 		"DECLARE CURSOR ref_cur IS\n"
2383 		"  SELECT 1 FROM SYS_ZIP_DICT_COLS\n"
2384 		"    WHERE DICT_ID = :dict_id;\n"
2385 		"BEGIN\n"
2386 		"  OPEN dict_cur;\n"
2387 		"  FETCH dict_cur INTO find_dict_func();\n"
2388 		"  IF NOT (SQL % NOTFOUND) THEN\n"
2389 		"    OPEN ref_cur;\n"
2390 		"    FETCH ref_cur INTO count_func();\n"
2391 		"    IF SQL % NOTFOUND THEN\n"
2392 		"      DELETE FROM SYS_ZIP_DICT WHERE CURRENT OF dict_cur;\n"
2393 		"    END IF;\n"
2394 		"    CLOSE ref_cur;\n"
2395 		"  END IF;\n"
2396 		"  CLOSE dict_cur;\n"
2397 		"END;\n",
2398 		FALSE, trx);
2399 	if (error == DB_SUCCESS) {
2400 		ib_uint32_t local_dict_id = mach_read_from_4(
2401 			reinterpret_cast<const byte*>(&dict_id_buf));
2402 		if (local_dict_id == ULINT32_UNDEFINED) {
2403 			error = DB_RECORD_NOT_FOUND;
2404 		}
2405 		else {
2406 			ib_uint32_t local_counter = mach_read_from_4(
2407 				reinterpret_cast<const byte*>(&counter_buf));
2408 			if (local_counter != ULINT32_UNDEFINED)
2409 				error = DB_ROW_IS_REFERENCED;
2410 		}
2411 	}
2412 	return error;
2413 }
2414 
2415 /** Remove all compression dictionary references for the given table ID from
2416 the data dictionary tables in the database.
2417 @return	error code or DB_SUCCESS */
2418 UNIV_INTERN
2419 dberr_t
dict_create_remove_zip_dict_references_for_table(ulint table_id,trx_t * trx)2420 dict_create_remove_zip_dict_references_for_table(
2421 	ulint	table_id,	/*!< in: table id */
2422 	trx_t*	trx)		/*!< in/out: transaction */
2423 {
2424 	pars_info_t* info = pars_info_create();
2425 
2426 	pars_info_add_int4_literal(info, "table_id", table_id);
2427 
2428 	dberr_t error = que_eval_sql(info,
2429 		"PROCEDURE P () IS\n"
2430 		"BEGIN\n"
2431 		"  DELETE FROM SYS_ZIP_DICT_COLS\n"
2432 		"    WHERE TABLE_ID = :table_id;\n"
2433 		"END;\n",
2434 		FALSE, trx);
2435 	return error;
2436 }
2437