1 /*****************************************************************************
2 
3 Copyright (c) 2000, 2018, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2015, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file row/row0mysql.cc
22 Interface between Innobase row operations and MySQL.
23 Contains also create table and other data dictionary operations.
24 
25 Created 9/17/2000 Heikki Tuuri
26 *******************************************************/
27 
28 #include "univ.i"
29 #include <debug_sync.h>
30 #include <gstream.h>
31 #include <spatial.h>
32 
33 #include "row0mysql.h"
34 #include "btr0sea.h"
35 #include "dict0boot.h"
36 #include "dict0crea.h"
37 #include <sql_const.h>
38 #include "dict0dict.h"
39 #include "dict0load.h"
40 #include "dict0priv.h"
41 #include "dict0stats.h"
42 #include "dict0stats_bg.h"
43 #include "dict0defrag_bg.h"
44 #include "btr0defragment.h"
45 #include "fil0fil.h"
46 #include "fil0crypt.h"
47 #include "fsp0file.h"
48 #include "fts0fts.h"
49 #include "fts0types.h"
50 #include "ibuf0ibuf.h"
51 #include "lock0lock.h"
52 #include "log0log.h"
53 #include "pars0pars.h"
54 #include "que0que.h"
55 #include "rem0cmp.h"
56 #include "row0import.h"
57 #include "row0ins.h"
58 #include "row0merge.h"
59 #include "row0row.h"
60 #include "row0sel.h"
61 #include "row0upd.h"
62 #include "trx0purge.h"
63 #include "trx0rec.h"
64 #include "trx0roll.h"
65 #include "trx0undo.h"
66 #include "srv0start.h"
67 #include "row0ext.h"
68 #include "srv0start.h"
69 
70 #include <algorithm>
71 #include <deque>
72 #include <vector>
73 
74 #ifdef WITH_WSREP
75 #include "mysql/service_wsrep.h"
76 #include "wsrep.h"
77 #include "log.h"
78 #include "wsrep_mysqld.h"
79 #endif
80 
81 /** Provide optional 4.x backwards compatibility for 5.0 and above */
82 ibool	row_rollback_on_timeout	= FALSE;
83 
84 /** Chain node of the list of tables to drop in the background. */
85 struct row_mysql_drop_t{
86 	table_id_t			table_id;	/*!< table id */
87 	UT_LIST_NODE_T(row_mysql_drop_t)row_mysql_drop_list;
88 							/*!< list chain node */
89 };
90 
91 /** @brief List of tables we should drop in background.
92 
93 ALTER TABLE in MySQL requires that the table handler can drop the
94 table in background when there are no queries to it any
95 more.  Protected by row_drop_list_mutex. */
96 static UT_LIST_BASE_NODE_T(row_mysql_drop_t)	row_mysql_drop_list;
97 
98 /** Mutex protecting the background table drop list. */
99 static ib_mutex_t row_drop_list_mutex;
100 
101 /** Flag: has row_mysql_drop_list been initialized? */
102 static bool row_mysql_drop_list_inited;
103 
104 /*******************************************************************//**
105 Determine if the given name is a name reserved for MySQL system tables.
106 @return TRUE if name is a MySQL system table name */
107 static
108 ibool
row_mysql_is_system_table(const char * name)109 row_mysql_is_system_table(
110 /*======================*/
111 	const char*	name)
112 {
113 	if (strncmp(name, "mysql/", 6) != 0) {
114 
115 		return(FALSE);
116 	}
117 
118 	return(0 == strcmp(name + 6, "host")
119 	       || 0 == strcmp(name + 6, "user")
120 	       || 0 == strcmp(name + 6, "db"));
121 }
122 
123 #ifdef UNIV_DEBUG
124 /** Wait for the background drop list to become empty. */
125 void
row_wait_for_background_drop_list_empty()126 row_wait_for_background_drop_list_empty()
127 {
128 	bool	empty = false;
129 	while (!empty) {
130 		mutex_enter(&row_drop_list_mutex);
131 		empty = (UT_LIST_GET_LEN(row_mysql_drop_list) == 0);
132 		mutex_exit(&row_drop_list_mutex);
133 		os_thread_sleep(100000);
134 	}
135 }
136 #endif /* UNIV_DEBUG */
137 
138 /*******************************************************************//**
139 Delays an INSERT, DELETE or UPDATE operation if the purge is lagging. */
140 static
141 void
row_mysql_delay_if_needed(void)142 row_mysql_delay_if_needed(void)
143 /*===========================*/
144 {
145 	if (srv_dml_needed_delay) {
146 		os_thread_sleep(srv_dml_needed_delay);
147 	}
148 }
149 
150 /*******************************************************************//**
151 Frees the blob heap in prebuilt when no longer needed. */
152 void
row_mysql_prebuilt_free_blob_heap(row_prebuilt_t * prebuilt)153 row_mysql_prebuilt_free_blob_heap(
154 /*==============================*/
155 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct of a
156 					ha_innobase:: table handle */
157 {
158 	DBUG_ENTER("row_mysql_prebuilt_free_blob_heap");
159 
160 	DBUG_PRINT("row_mysql_prebuilt_free_blob_heap",
161 		   ("blob_heap freeing: %p", prebuilt->blob_heap));
162 
163 	mem_heap_free(prebuilt->blob_heap);
164 	prebuilt->blob_heap = NULL;
165 	DBUG_VOID_RETURN;
166 }
167 
168 /*******************************************************************//**
169 Stores a >= 5.0.3 format true VARCHAR length to dest, in the MySQL row
170 format.
171 @return pointer to the data, we skip the 1 or 2 bytes at the start
172 that are used to store the len */
173 byte*
row_mysql_store_true_var_len(byte * dest,ulint len,ulint lenlen)174 row_mysql_store_true_var_len(
175 /*=========================*/
176 	byte*	dest,	/*!< in: where to store */
177 	ulint	len,	/*!< in: length, must fit in two bytes */
178 	ulint	lenlen)	/*!< in: storage length of len: either 1 or 2 bytes */
179 {
180 	if (lenlen == 2) {
181 		ut_a(len < 256 * 256);
182 
183 		mach_write_to_2_little_endian(dest, len);
184 
185 		return(dest + 2);
186 	}
187 
188 	ut_a(lenlen == 1);
189 	ut_a(len < 256);
190 
191 	mach_write_to_1(dest, len);
192 
193 	return(dest + 1);
194 }
195 
196 /*******************************************************************//**
197 Reads a >= 5.0.3 format true VARCHAR length, in the MySQL row format, and
198 returns a pointer to the data.
199 @return pointer to the data, we skip the 1 or 2 bytes at the start
200 that are used to store the len */
201 const byte*
row_mysql_read_true_varchar(ulint * len,const byte * field,ulint lenlen)202 row_mysql_read_true_varchar(
203 /*========================*/
204 	ulint*		len,	/*!< out: variable-length field length */
205 	const byte*	field,	/*!< in: field in the MySQL format */
206 	ulint		lenlen)	/*!< in: storage length of len: either 1
207 				or 2 bytes */
208 {
209 	if (lenlen == 2) {
210 		*len = mach_read_from_2_little_endian(field);
211 
212 		return(field + 2);
213 	}
214 
215 	ut_a(lenlen == 1);
216 
217 	*len = mach_read_from_1(field);
218 
219 	return(field + 1);
220 }
221 
222 /*******************************************************************//**
223 Stores a reference to a BLOB in the MySQL format. */
224 void
row_mysql_store_blob_ref(byte * dest,ulint col_len,const void * data,ulint len)225 row_mysql_store_blob_ref(
226 /*=====================*/
227 	byte*		dest,	/*!< in: where to store */
228 	ulint		col_len,/*!< in: dest buffer size: determines into
229 				how many bytes the BLOB length is stored,
230 				the space for the length may vary from 1
231 				to 4 bytes */
232 	const void*	data,	/*!< in: BLOB data; if the value to store
233 				is SQL NULL this should be NULL pointer */
234 	ulint		len)	/*!< in: BLOB length; if the value to store
235 				is SQL NULL this should be 0; remember
236 				also to set the NULL bit in the MySQL record
237 				header! */
238 {
239 	/* MySQL might assume the field is set to zero except the length and
240 	the pointer fields */
241 
242 	memset(dest, '\0', col_len);
243 
244 	/* In dest there are 1 - 4 bytes reserved for the BLOB length,
245 	and after that 8 bytes reserved for the pointer to the data.
246 	In 32-bit architectures we only use the first 4 bytes of the pointer
247 	slot. */
248 
249 	ut_a(col_len - 8 > 1 || len < 256);
250 	ut_a(col_len - 8 > 2 || len < 256 * 256);
251 	ut_a(col_len - 8 > 3 || len < 256 * 256 * 256);
252 
253 	mach_write_to_n_little_endian(dest, col_len - 8, len);
254 
255 	memcpy(dest + col_len - 8, &data, sizeof data);
256 }
257 
258 /*******************************************************************//**
259 Reads a reference to a BLOB in the MySQL format.
260 @return pointer to BLOB data */
261 const byte*
row_mysql_read_blob_ref(ulint * len,const byte * ref,ulint col_len)262 row_mysql_read_blob_ref(
263 /*====================*/
264 	ulint*		len,		/*!< out: BLOB length */
265 	const byte*	ref,		/*!< in: BLOB reference in the
266 					MySQL format */
267 	ulint		col_len)	/*!< in: BLOB reference length
268 					(not BLOB length) */
269 {
270 	byte*	data;
271 
272 	*len = mach_read_from_n_little_endian(ref, col_len - 8);
273 
274 	memcpy(&data, ref + col_len - 8, sizeof data);
275 
276 	return(data);
277 }
278 
279 /*******************************************************************//**
280 Converting InnoDB geometry data format to MySQL data format. */
281 void
row_mysql_store_geometry(byte * dest,ulint dest_len,const byte * src,ulint src_len)282 row_mysql_store_geometry(
283 /*=====================*/
284 	byte*		dest,		/*!< in/out: where to store */
285 	ulint		dest_len,	/*!< in: dest buffer size: determines
286 					into how many bytes the GEOMETRY length
287 					is stored, the space for the length
288 					may vary from 1 to 4 bytes */
289 	const byte*	src,		/*!< in: GEOMETRY data; if the value to
290 					store is SQL NULL this should be NULL
291 					pointer */
292 	ulint		src_len)	/*!< in: GEOMETRY length; if the value
293 					to store is SQL NULL this should be 0;
294 					remember also to set the NULL bit in
295 					the MySQL record header! */
296 {
297 	/* MySQL might assume the field is set to zero except the length and
298 	the pointer fields */
299 	MEM_CHECK_DEFINED(src, src_len);
300 
301 	memset(dest, '\0', dest_len);
302 
303 	/* In dest there are 1 - 4 bytes reserved for the BLOB length,
304 	and after that 8 bytes reserved for the pointer to the data.
305 	In 32-bit architectures we only use the first 4 bytes of the pointer
306 	slot. */
307 
308 	ut_ad(dest_len - 8 > 1 || src_len < 1<<8);
309 	ut_ad(dest_len - 8 > 2 || src_len < 1<<16);
310 	ut_ad(dest_len - 8 > 3 || src_len < 1<<24);
311 
312 	mach_write_to_n_little_endian(dest, dest_len - 8, src_len);
313 
314 	memcpy(dest + dest_len - 8, &src, sizeof src);
315 }
316 
317 /*******************************************************************//**
318 Read geometry data in the MySQL format.
319 @return pointer to geometry data */
320 static
321 const byte*
row_mysql_read_geometry(ulint * len,const byte * ref,ulint col_len)322 row_mysql_read_geometry(
323 /*====================*/
324 	ulint*		len,		/*!< out: data length */
325 	const byte*	ref,		/*!< in: geometry data in the
326 					MySQL format */
327 	ulint		col_len)	/*!< in: MySQL format length */
328 {
329 	byte*		data;
330 
331 	*len = mach_read_from_n_little_endian(ref, col_len - 8);
332 
333 	memcpy(&data, ref + col_len - 8, sizeof data);
334 
335 	return(data);
336 }
337 
338 /**************************************************************//**
339 Pad a column with spaces. */
340 void
row_mysql_pad_col(ulint mbminlen,byte * pad,ulint len)341 row_mysql_pad_col(
342 /*==============*/
343 	ulint	mbminlen,	/*!< in: minimum size of a character,
344 				in bytes */
345 	byte*	pad,		/*!< out: padded buffer */
346 	ulint	len)		/*!< in: number of bytes to pad */
347 {
348 	const byte*	pad_end;
349 
350 	switch (UNIV_EXPECT(mbminlen, 1)) {
351 	default:
352 		ut_error;
353 	case 1:
354 		/* space=0x20 */
355 		memset(pad, 0x20, len);
356 		break;
357 	case 2:
358 		/* space=0x0020 */
359 		pad_end = pad + len;
360 		ut_a(!(len % 2));
361 		while (pad < pad_end) {
362 			*pad++ = 0x00;
363 			*pad++ = 0x20;
364 		};
365 		break;
366 	case 4:
367 		/* space=0x00000020 */
368 		pad_end = pad + len;
369 		ut_a(!(len % 4));
370 		while (pad < pad_end) {
371 			*pad++ = 0x00;
372 			*pad++ = 0x00;
373 			*pad++ = 0x00;
374 			*pad++ = 0x20;
375 		}
376 		break;
377 	}
378 }
379 
380 /**************************************************************//**
381 Stores a non-SQL-NULL field given in the MySQL format in the InnoDB format.
382 The counterpart of this function is row_sel_field_store_in_mysql_format() in
383 row0sel.cc.
384 @return up to which byte we used buf in the conversion */
385 byte*
row_mysql_store_col_in_innobase_format(dfield_t * dfield,byte * buf,ibool row_format_col,const byte * mysql_data,ulint col_len,ulint comp)386 row_mysql_store_col_in_innobase_format(
387 /*===================================*/
388 	dfield_t*	dfield,		/*!< in/out: dfield where dtype
389 					information must be already set when
390 					this function is called! */
391 	byte*		buf,		/*!< in/out: buffer for a converted
392 					integer value; this must be at least
393 					col_len long then! NOTE that dfield
394 					may also get a pointer to 'buf',
395 					therefore do not discard this as long
396 					as dfield is used! */
397 	ibool		row_format_col,	/*!< TRUE if the mysql_data is from
398 					a MySQL row, FALSE if from a MySQL
399 					key value;
400 					in MySQL, a true VARCHAR storage
401 					format differs in a row and in a
402 					key value: in a key value the length
403 					is always stored in 2 bytes! */
404 	const byte*	mysql_data,	/*!< in: MySQL column value, not
405 					SQL NULL; NOTE that dfield may also
406 					get a pointer to mysql_data,
407 					therefore do not discard this as long
408 					as dfield is used! */
409 	ulint		col_len,	/*!< in: MySQL column length; NOTE that
410 					this is the storage length of the
411 					column in the MySQL format row, not
412 					necessarily the length of the actual
413 					payload data; if the column is a true
414 					VARCHAR then this is irrelevant */
415 	ulint		comp)		/*!< in: nonzero=compact format */
416 {
417 	const byte*	ptr	= mysql_data;
418 	const dtype_t*	dtype;
419 	ulint		type;
420 	ulint		lenlen;
421 
422 	dtype = dfield_get_type(dfield);
423 
424 	type = dtype->mtype;
425 
426 	if (type == DATA_INT) {
427 		/* Store integer data in Innobase in a big-endian format,
428 		sign bit negated if the data is a signed integer. In MySQL,
429 		integers are stored in a little-endian format. */
430 
431 		byte*	p = buf + col_len;
432 
433 		for (;;) {
434 			p--;
435 			*p = *mysql_data;
436 			if (p == buf) {
437 				break;
438 			}
439 			mysql_data++;
440 		}
441 
442 		if (!(dtype->prtype & DATA_UNSIGNED)) {
443 
444 			*buf ^= 128;
445 		}
446 
447 		ptr = buf;
448 		buf += col_len;
449 	} else if ((type == DATA_VARCHAR
450 		    || type == DATA_VARMYSQL
451 		    || type == DATA_BINARY)) {
452 
453 		if (dtype_get_mysql_type(dtype) == DATA_MYSQL_TRUE_VARCHAR) {
454 			/* The length of the actual data is stored to 1 or 2
455 			bytes at the start of the field */
456 
457 			if (row_format_col) {
458 				if (dtype->prtype & DATA_LONG_TRUE_VARCHAR) {
459 					lenlen = 2;
460 				} else {
461 					lenlen = 1;
462 				}
463 			} else {
464 				/* In a MySQL key value, lenlen is always 2 */
465 				lenlen = 2;
466 			}
467 
468 			ptr = row_mysql_read_true_varchar(&col_len, mysql_data,
469 							  lenlen);
470 		} else {
471 			/* Remove trailing spaces from old style VARCHAR
472 			columns. */
473 
474 			/* Handle Unicode strings differently. */
475 			ulint	mbminlen	= dtype_get_mbminlen(dtype);
476 
477 			ptr = mysql_data;
478 
479 			switch (mbminlen) {
480 			default:
481 				ut_error;
482 			case 4:
483 				/* space=0x00000020 */
484 				/* Trim "half-chars", just in case. */
485 				col_len &= ~3U;
486 
487 				while (col_len >= 4
488 				       && ptr[col_len - 4] == 0x00
489 				       && ptr[col_len - 3] == 0x00
490 				       && ptr[col_len - 2] == 0x00
491 				       && ptr[col_len - 1] == 0x20) {
492 					col_len -= 4;
493 				}
494 				break;
495 			case 2:
496 				/* space=0x0020 */
497 				/* Trim "half-chars", just in case. */
498 				col_len &= ~1U;
499 
500 				while (col_len >= 2 && ptr[col_len - 2] == 0x00
501 				       && ptr[col_len - 1] == 0x20) {
502 					col_len -= 2;
503 				}
504 				break;
505 			case 1:
506 				/* space=0x20 */
507 				while (col_len > 0
508 				       && ptr[col_len - 1] == 0x20) {
509 					col_len--;
510 				}
511 			}
512 		}
513 	} else if (comp && type == DATA_MYSQL
514 		   && dtype_get_mbminlen(dtype) == 1
515 		   && dtype_get_mbmaxlen(dtype) > 1) {
516 		/* In some cases we strip trailing spaces from UTF-8 and other
517 		multibyte charsets, from FIXED-length CHAR columns, to save
518 		space. UTF-8 would otherwise normally use 3 * the string length
519 		bytes to store an ASCII string! */
520 
521 		/* We assume that this CHAR field is encoded in a
522 		variable-length character set where spaces have
523 		1:1 correspondence to 0x20 bytes, such as UTF-8.
524 
525 		Consider a CHAR(n) field, a field of n characters.
526 		It will contain between n * mbminlen and n * mbmaxlen bytes.
527 		We will try to truncate it to n bytes by stripping
528 		space padding.	If the field contains single-byte
529 		characters only, it will be truncated to n characters.
530 		Consider a CHAR(5) field containing the string
531 		".a   " where "." denotes a 3-byte character represented
532 		by the bytes "$%&". After our stripping, the string will
533 		be stored as "$%&a " (5 bytes). The string
534 		".abc " will be stored as "$%&abc" (6 bytes).
535 
536 		The space padding will be restored in row0sel.cc, function
537 		row_sel_field_store_in_mysql_format(). */
538 
539 		ulint		n_chars;
540 
541 		ut_a(!(dtype_get_len(dtype) % dtype_get_mbmaxlen(dtype)));
542 
543 		n_chars = dtype_get_len(dtype) / dtype_get_mbmaxlen(dtype);
544 
545 		/* Strip space padding. */
546 		while (col_len > n_chars && ptr[col_len - 1] == 0x20) {
547 			col_len--;
548 		}
549 	} else if (!row_format_col) {
550 		/* if mysql data is from a MySQL key value
551 		since the length is always stored in 2 bytes,
552 		we need do nothing here. */
553 	} else if (type == DATA_BLOB) {
554 
555 		ptr = row_mysql_read_blob_ref(&col_len, mysql_data, col_len);
556 	} else if (DATA_GEOMETRY_MTYPE(type)) {
557 		ptr = row_mysql_read_geometry(&col_len, mysql_data, col_len);
558 	}
559 
560 	dfield_set_data(dfield, ptr, col_len);
561 
562 	return(buf);
563 }
564 
565 /**************************************************************//**
566 Convert a row in the MySQL format to a row in the Innobase format. Note that
567 the function to convert a MySQL format key value to an InnoDB dtuple is
568 row_sel_convert_mysql_key_to_innobase() in row0sel.cc. */
569 static
570 void
row_mysql_convert_row_to_innobase(dtuple_t * row,row_prebuilt_t * prebuilt,const byte * mysql_rec,mem_heap_t ** blob_heap)571 row_mysql_convert_row_to_innobase(
572 /*==============================*/
573 	dtuple_t*	row,		/*!< in/out: Innobase row where the
574 					field type information is already
575 					copied there! */
576 	row_prebuilt_t*	prebuilt,	/*!< in: prebuilt struct where template
577 					must be of type ROW_MYSQL_WHOLE_ROW */
578 	const byte*	mysql_rec,	/*!< in: row in the MySQL format;
579 					NOTE: do not discard as long as
580 					row is used, as row may contain
581 					pointers to this record! */
582 	mem_heap_t**	blob_heap)	/*!< in: FIX_ME, remove this after
583 					server fixes its issue */
584 {
585 	const mysql_row_templ_t*templ;
586 	dfield_t*		dfield;
587 	ulint			i;
588 	ulint			n_col = 0;
589 	ulint			n_v_col = 0;
590 
591 	ut_ad(prebuilt->template_type == ROW_MYSQL_WHOLE_ROW);
592 	ut_ad(prebuilt->mysql_template);
593 
594 	for (i = 0; i < prebuilt->n_template; i++) {
595 
596 		templ = prebuilt->mysql_template + i;
597 
598 		if (templ->is_virtual) {
599 			ut_ad(n_v_col < dtuple_get_n_v_fields(row));
600 			dfield = dtuple_get_nth_v_field(row, n_v_col);
601 			n_v_col++;
602 		} else {
603 			dfield = dtuple_get_nth_field(row, n_col);
604 			n_col++;
605 		}
606 
607 		if (templ->mysql_null_bit_mask != 0) {
608 			/* Column may be SQL NULL */
609 
610 			if (mysql_rec[templ->mysql_null_byte_offset]
611 			    & (byte) (templ->mysql_null_bit_mask)) {
612 
613 				/* It is SQL NULL */
614 
615 				dfield_set_null(dfield);
616 
617 				goto next_column;
618 			}
619 		}
620 
621 		row_mysql_store_col_in_innobase_format(
622 			dfield,
623 			prebuilt->ins_upd_rec_buff + templ->mysql_col_offset,
624 			TRUE, /* MySQL row format data */
625 			mysql_rec + templ->mysql_col_offset,
626 			templ->mysql_col_len,
627 			dict_table_is_comp(prebuilt->table));
628 
629 		/* server has issue regarding handling BLOB virtual fields,
630 		and we need to duplicate it with our own memory here */
631 		if (templ->is_virtual
632 		    && DATA_LARGE_MTYPE(dfield_get_type(dfield)->mtype)) {
633 			if (*blob_heap == NULL) {
634 				*blob_heap = mem_heap_create(dfield->len);
635 			}
636 			dfield_dup(dfield, *blob_heap);
637 		}
638 next_column:
639 		;
640 	}
641 
642 	/* If there is a FTS doc id column and it is not user supplied (
643 	generated by server) then assign it a new doc id. */
644 	if (!prebuilt->table->fts) {
645 		return;
646 	}
647 
648 	ut_a(prebuilt->table->fts->doc_col != ULINT_UNDEFINED);
649 
650 	doc_id_t	doc_id;
651 
652 	if (!DICT_TF2_FLAG_IS_SET(prebuilt->table, DICT_TF2_FTS_HAS_DOC_ID)) {
653 		if (prebuilt->table->fts->cache->first_doc_id
654 		    == FTS_NULL_DOC_ID) {
655 			fts_get_next_doc_id(prebuilt->table, &doc_id);
656 		}
657 		return;
658 	}
659 
660 	dfield_t*	fts_doc_id = dtuple_get_nth_field(
661 		row, prebuilt->table->fts->doc_col);
662 
663 	if (fts_get_next_doc_id(prebuilt->table, &doc_id) == DB_SUCCESS) {
664 		ut_a(doc_id != FTS_NULL_DOC_ID);
665 		ut_ad(sizeof(doc_id) == fts_doc_id->type.len);
666 		dfield_set_data(fts_doc_id, prebuilt->ins_upd_rec_buff
667 				+ prebuilt->mysql_row_len, 8);
668 		fts_write_doc_id(fts_doc_id->data, doc_id);
669 	} else {
670 		dfield_set_null(fts_doc_id);
671 	}
672 }
673 
674 /****************************************************************//**
675 Handles user errors and lock waits detected by the database engine.
676 @return true if it was a lock wait and we should continue running the
677 query thread and in that case the thr is ALREADY in the running state. */
678 bool
row_mysql_handle_errors(dberr_t * new_err,trx_t * trx,que_thr_t * thr,trx_savept_t * savept)679 row_mysql_handle_errors(
680 /*====================*/
681 	dberr_t*	new_err,/*!< out: possible new error encountered in
682 				lock wait, or if no new error, the value
683 				of trx->error_state at the entry of this
684 				function */
685 	trx_t*		trx,	/*!< in: transaction */
686 	que_thr_t*	thr,	/*!< in: query thread, or NULL */
687 	trx_savept_t*	savept)	/*!< in: savepoint, or NULL */
688 {
689 	dberr_t	err;
690 
691 	DBUG_ENTER("row_mysql_handle_errors");
692 	DEBUG_SYNC_C("row_mysql_handle_errors");
693 
694 handle_new_error:
695 	err = trx->error_state;
696 
697 	ut_a(err != DB_SUCCESS);
698 
699 	trx->error_state = DB_SUCCESS;
700 
701 	DBUG_LOG("trx", "handle error: " << ut_strerr(err)
702 		 << ";id=" << ib::hex(trx->id) << ", " << trx);
703 
704 	switch (err) {
705 	case DB_LOCK_WAIT_TIMEOUT:
706 		if (row_rollback_on_timeout) {
707 			goto rollback;
708 		}
709 		/* fall through */
710 	case DB_DUPLICATE_KEY:
711 	case DB_FOREIGN_DUPLICATE_KEY:
712 	case DB_TOO_BIG_RECORD:
713 	case DB_UNDO_RECORD_TOO_BIG:
714 	case DB_ROW_IS_REFERENCED:
715 	case DB_NO_REFERENCED_ROW:
716 	case DB_CANNOT_ADD_CONSTRAINT:
717 	case DB_TOO_MANY_CONCURRENT_TRXS:
718 	case DB_OUT_OF_FILE_SPACE:
719 	case DB_READ_ONLY:
720 	case DB_FTS_INVALID_DOCID:
721 	case DB_INTERRUPTED:
722 	case DB_CANT_CREATE_GEOMETRY_OBJECT:
723 	case DB_TABLE_NOT_FOUND:
724 	case DB_DECRYPTION_FAILED:
725 	case DB_COMPUTE_VALUE_FAILED:
726 	rollback_to_savept:
727 		DBUG_EXECUTE_IF("row_mysql_crash_if_error", {
728 					log_buffer_flush_to_disk();
729 					DBUG_SUICIDE(); });
730 		if (savept) {
731 			/* Roll back the latest, possibly incomplete insertion
732 			or update */
733 
734 			trx_rollback_to_savepoint(trx, savept);
735 		}
736 		/* MySQL will roll back the latest SQL statement */
737 		break;
738 	case DB_LOCK_WAIT:
739 		lock_wait_suspend_thread(thr);
740 
741 		if (trx->error_state != DB_SUCCESS) {
742 			que_thr_stop_for_mysql(thr);
743 
744 			goto handle_new_error;
745 		}
746 
747 		*new_err = err;
748 
749 		DBUG_RETURN(true);
750 
751 	case DB_DEADLOCK:
752 	case DB_LOCK_TABLE_FULL:
753 	rollback:
754 		/* Roll back the whole transaction; this resolution was added
755 		to version 3.23.43 */
756 
757 		trx_rollback_to_savepoint(trx, NULL);
758 		break;
759 
760 	case DB_MUST_GET_MORE_FILE_SPACE:
761 		ib::fatal() << "The database cannot continue operation because"
762 			" of lack of space. You must add a new data file"
763 			" to my.cnf and restart the database.";
764 		break;
765 
766 	case DB_CORRUPTION:
767 	case DB_PAGE_CORRUPTED:
768 		ib::error() << "We detected index corruption in an InnoDB type"
769 			" table. You have to dump + drop + reimport the"
770 			" table or, in a case of widespread corruption,"
771 			" dump all InnoDB tables and recreate the whole"
772 			" tablespace. If the mysqld server crashes after"
773 			" the startup or when you dump the tables. "
774 			<< FORCE_RECOVERY_MSG;
775 		goto rollback_to_savept;
776 	case DB_FOREIGN_EXCEED_MAX_CASCADE:
777 		ib::error() << "Cannot delete/update rows with cascading"
778 			" foreign key constraints that exceed max depth of "
779 			<< FK_MAX_CASCADE_DEL << ". Please drop excessive"
780 			" foreign constraints and try again";
781 		goto rollback_to_savept;
782 	case DB_UNSUPPORTED:
783 		ib::error() << "Cannot delete/update rows with cascading"
784 			" foreign key constraints in timestamp-based temporal"
785 			" table. Please drop excessive"
786 			" foreign constraints and try again";
787 		goto rollback_to_savept;
788 	default:
789 		ib::fatal() << "Unknown error " << err;
790 	}
791 
792 	if (trx->error_state != DB_SUCCESS) {
793 		*new_err = trx->error_state;
794 	} else {
795 		*new_err = err;
796 	}
797 
798 	trx->error_state = DB_SUCCESS;
799 
800 	DBUG_RETURN(false);
801 }
802 
803 /********************************************************************//**
804 Create a prebuilt struct for a MySQL table handle.
805 @return own: a prebuilt struct */
806 row_prebuilt_t*
row_create_prebuilt(dict_table_t * table,ulint mysql_row_len)807 row_create_prebuilt(
808 /*================*/
809 	dict_table_t*	table,		/*!< in: Innobase table handle */
810 	ulint		mysql_row_len)	/*!< in: length in bytes of a row in
811 					the MySQL format */
812 {
813 	DBUG_ENTER("row_create_prebuilt");
814 
815 	row_prebuilt_t*	prebuilt;
816 	mem_heap_t*	heap;
817 	dict_index_t*	clust_index;
818 	dict_index_t*	temp_index;
819 	dtuple_t*	ref;
820 	ulint		ref_len;
821 	uint		srch_key_len = 0;
822 	ulint		search_tuple_n_fields;
823 
824 	search_tuple_n_fields = 2 * (dict_table_get_n_cols(table)
825 				     + dict_table_get_n_v_cols(table));
826 
827 	clust_index = dict_table_get_first_index(table);
828 
829 	/* Make sure that search_tuple is long enough for clustered index */
830 	ut_a(2 * dict_table_get_n_cols(table) >= clust_index->n_fields);
831 
832 	ref_len = dict_index_get_n_unique(clust_index);
833 
834 
835         /* Maximum size of the buffer needed for conversion of INTs from
836 	little endian format to big endian format in an index. An index
837 	can have maximum 16 columns (MAX_REF_PARTS) in it. Therfore
838 	Max size for PK: 16 * 8 bytes (BIGINT's size) = 128 bytes
839 	Max size Secondary index: 16 * 8 bytes + PK = 256 bytes. */
840 #define MAX_SRCH_KEY_VAL_BUFFER         2* (8 * MAX_REF_PARTS)
841 
842 #define PREBUILT_HEAP_INITIAL_SIZE	\
843 	( \
844 	sizeof(*prebuilt) \
845 	/* allocd in this function */ \
846 	+ DTUPLE_EST_ALLOC(search_tuple_n_fields) \
847 	+ DTUPLE_EST_ALLOC(ref_len) \
848 	/* allocd in row_prebuild_sel_graph() */ \
849 	+ sizeof(sel_node_t) \
850 	+ sizeof(que_fork_t) \
851 	+ sizeof(que_thr_t) \
852 	/* allocd in row_get_prebuilt_update_vector() */ \
853 	+ sizeof(upd_node_t) \
854 	+ sizeof(upd_t) \
855 	+ sizeof(upd_field_t) \
856 	  * dict_table_get_n_cols(table) \
857 	+ sizeof(que_fork_t) \
858 	+ sizeof(que_thr_t) \
859 	/* allocd in row_get_prebuilt_insert_row() */ \
860 	+ sizeof(ins_node_t) \
861 	/* mysql_row_len could be huge and we are not \
862 	sure if this prebuilt instance is going to be \
863 	used in inserts */ \
864 	+ (mysql_row_len < 256 ? mysql_row_len : 0) \
865 	+ DTUPLE_EST_ALLOC(dict_table_get_n_cols(table) \
866 			   + dict_table_get_n_v_cols(table)) \
867 	+ sizeof(que_fork_t) \
868 	+ sizeof(que_thr_t) \
869 	+ sizeof(*prebuilt->pcur) \
870 	+ sizeof(*prebuilt->clust_pcur) \
871 	)
872 
873 	/* Calculate size of key buffer used to store search key in
874 	InnoDB format. MySQL stores INTs in little endian format and
875 	InnoDB stores INTs in big endian format with the sign bit
876 	flipped. All other field types are stored/compared the same
877 	in MySQL and InnoDB, so we must create a buffer containing
878 	the INT key parts in InnoDB format.We need two such buffers
879 	since both start and end keys are used in records_in_range(). */
880 
881 	for (temp_index = dict_table_get_first_index(table); temp_index;
882 	     temp_index = dict_table_get_next_index(temp_index)) {
883 		DBUG_EXECUTE_IF("innodb_srch_key_buffer_max_value",
884 			ut_a(temp_index->n_user_defined_cols
885 						== MAX_REF_PARTS););
886 		uint temp_len = 0;
887 		for (uint i = 0; i < temp_index->n_uniq; i++) {
888 			ulint type = temp_index->fields[i].col->mtype;
889 			if (type == DATA_INT) {
890 				temp_len +=
891 					temp_index->fields[i].fixed_len;
892 			}
893 		}
894 		srch_key_len = std::max(srch_key_len,temp_len);
895 	}
896 
897 	ut_a(srch_key_len <= MAX_SRCH_KEY_VAL_BUFFER);
898 
899 	DBUG_EXECUTE_IF("innodb_srch_key_buffer_max_value",
900 		ut_a(srch_key_len == MAX_SRCH_KEY_VAL_BUFFER););
901 
902 	/* We allocate enough space for the objects that are likely to
903 	be created later in order to minimize the number of malloc()
904 	calls */
905 	heap = mem_heap_create(PREBUILT_HEAP_INITIAL_SIZE + 2 * srch_key_len);
906 
907 	prebuilt = static_cast<row_prebuilt_t*>(
908 		mem_heap_zalloc(heap, sizeof(*prebuilt)));
909 
910 	prebuilt->magic_n = ROW_PREBUILT_ALLOCATED;
911 	prebuilt->magic_n2 = ROW_PREBUILT_ALLOCATED;
912 
913 	prebuilt->table = table;
914 
915 	prebuilt->sql_stat_start = TRUE;
916 	prebuilt->heap = heap;
917 
918 	prebuilt->srch_key_val_len = srch_key_len;
919 	if (prebuilt->srch_key_val_len) {
920 		prebuilt->srch_key_val1 = static_cast<byte*>(
921 			mem_heap_alloc(prebuilt->heap,
922 				       2 * prebuilt->srch_key_val_len));
923 		prebuilt->srch_key_val2 = prebuilt->srch_key_val1 +
924 						prebuilt->srch_key_val_len;
925 	} else {
926 		prebuilt->srch_key_val1 = NULL;
927 		prebuilt->srch_key_val2 = NULL;
928 	}
929 
930 	prebuilt->pcur = static_cast<btr_pcur_t*>(
931 				mem_heap_zalloc(prebuilt->heap,
932 					       sizeof(btr_pcur_t)));
933 	prebuilt->clust_pcur = static_cast<btr_pcur_t*>(
934 					mem_heap_zalloc(prebuilt->heap,
935 						       sizeof(btr_pcur_t)));
936 	btr_pcur_reset(prebuilt->pcur);
937 	btr_pcur_reset(prebuilt->clust_pcur);
938 
939 	prebuilt->select_lock_type = LOCK_NONE;
940 	prebuilt->stored_select_lock_type = LOCK_NONE_UNSET;
941 
942 	prebuilt->search_tuple = dtuple_create(heap, search_tuple_n_fields);
943 
944 	ref = dtuple_create(heap, ref_len);
945 
946 	dict_index_copy_types(ref, clust_index, ref_len);
947 
948 	prebuilt->clust_ref = ref;
949 
950 	prebuilt->autoinc_error = DB_SUCCESS;
951 	prebuilt->autoinc_offset = 0;
952 
953 	/* Default to 1, we will set the actual value later in
954 	ha_innobase::get_auto_increment(). */
955 	prebuilt->autoinc_increment = 1;
956 
957 	prebuilt->autoinc_last_value = 0;
958 
959 	/* During UPDATE and DELETE we need the doc id. */
960 	prebuilt->fts_doc_id = 0;
961 
962 	prebuilt->mysql_row_len = mysql_row_len;
963 
964 	prebuilt->fts_doc_id_in_read_set = 0;
965 	prebuilt->blob_heap = NULL;
966 
967 	DBUG_RETURN(prebuilt);
968 }
969 
970 /********************************************************************//**
971 Free a prebuilt struct for a MySQL table handle. */
972 void
row_prebuilt_free(row_prebuilt_t * prebuilt,ibool dict_locked)973 row_prebuilt_free(
974 /*==============*/
975 	row_prebuilt_t*	prebuilt,	/*!< in, own: prebuilt struct */
976 	ibool		dict_locked)	/*!< in: TRUE=data dictionary locked */
977 {
978 	DBUG_ENTER("row_prebuilt_free");
979 
980 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
981 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
982 
983 	prebuilt->magic_n = ROW_PREBUILT_FREED;
984 	prebuilt->magic_n2 = ROW_PREBUILT_FREED;
985 
986 	btr_pcur_reset(prebuilt->pcur);
987 	btr_pcur_reset(prebuilt->clust_pcur);
988 
989 	ut_free(prebuilt->mysql_template);
990 
991 	if (prebuilt->ins_graph) {
992 		que_graph_free_recursive(prebuilt->ins_graph);
993 	}
994 
995 	if (prebuilt->sel_graph) {
996 		que_graph_free_recursive(prebuilt->sel_graph);
997 	}
998 
999 	if (prebuilt->upd_graph) {
1000 		que_graph_free_recursive(prebuilt->upd_graph);
1001 	}
1002 
1003 	if (prebuilt->blob_heap) {
1004 		row_mysql_prebuilt_free_blob_heap(prebuilt);
1005 	}
1006 
1007 	if (prebuilt->old_vers_heap) {
1008 		mem_heap_free(prebuilt->old_vers_heap);
1009 	}
1010 
1011 	if (prebuilt->fetch_cache[0] != NULL) {
1012 		byte*	base = prebuilt->fetch_cache[0] - 4;
1013 		byte*	ptr = base;
1014 
1015 		for (ulint i = 0; i < MYSQL_FETCH_CACHE_SIZE; i++) {
1016 			ulint	magic1 = mach_read_from_4(ptr);
1017 			ut_a(magic1 == ROW_PREBUILT_FETCH_MAGIC_N);
1018 			ptr += 4;
1019 
1020 			byte*	row = ptr;
1021 			ut_a(row == prebuilt->fetch_cache[i]);
1022 			ptr += prebuilt->mysql_row_len;
1023 
1024 			ulint	magic2 = mach_read_from_4(ptr);
1025 			ut_a(magic2 == ROW_PREBUILT_FETCH_MAGIC_N);
1026 			ptr += 4;
1027 		}
1028 
1029 		ut_free(base);
1030 	}
1031 
1032 	if (prebuilt->rtr_info) {
1033 		rtr_clean_rtr_info(prebuilt->rtr_info, true);
1034 	}
1035 	if (prebuilt->table) {
1036 		dict_table_close(prebuilt->table, dict_locked, FALSE);
1037 	}
1038 
1039 	mem_heap_free(prebuilt->heap);
1040 
1041 	DBUG_VOID_RETURN;
1042 }
1043 
1044 /*********************************************************************//**
1045 Updates the transaction pointers in query graphs stored in the prebuilt
1046 struct. */
1047 void
row_update_prebuilt_trx(row_prebuilt_t * prebuilt,trx_t * trx)1048 row_update_prebuilt_trx(
1049 /*====================*/
1050 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt struct
1051 					in MySQL handle */
1052 	trx_t*		trx)		/*!< in: transaction handle */
1053 {
1054 	ut_a(trx->magic_n == TRX_MAGIC_N);
1055 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
1056 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
1057 
1058 	prebuilt->trx = trx;
1059 
1060 	if (prebuilt->ins_graph) {
1061 		prebuilt->ins_graph->trx = trx;
1062 	}
1063 
1064 	if (prebuilt->upd_graph) {
1065 		prebuilt->upd_graph->trx = trx;
1066 	}
1067 
1068 	if (prebuilt->sel_graph) {
1069 		prebuilt->sel_graph->trx = trx;
1070 	}
1071 }
1072 
1073 /*********************************************************************//**
1074 Gets pointer to a prebuilt dtuple used in insertions. If the insert graph
1075 has not yet been built in the prebuilt struct, then this function first
1076 builds it.
1077 @return prebuilt dtuple; the column type information is also set in it */
1078 static
1079 dtuple_t*
row_get_prebuilt_insert_row(row_prebuilt_t * prebuilt)1080 row_get_prebuilt_insert_row(
1081 /*========================*/
1082 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL
1083 					handle */
1084 {
1085 	dict_table_t*		table	= prebuilt->table;
1086 
1087 	ut_ad(prebuilt && table && prebuilt->trx);
1088 
1089 	if (prebuilt->ins_node != 0) {
1090 
1091 		/* Check if indexes have been dropped or added and we
1092 		may need to rebuild the row insert template. */
1093 
1094 		if (prebuilt->trx_id == table->def_trx_id
1095 		    && prebuilt->ins_node->entry_list.size()
1096 		    == UT_LIST_GET_LEN(table->indexes)) {
1097 
1098 			return(prebuilt->ins_node->row);
1099 		}
1100 
1101 		ut_ad(prebuilt->trx_id < table->def_trx_id);
1102 
1103 		que_graph_free_recursive(prebuilt->ins_graph);
1104 
1105 		prebuilt->ins_graph = 0;
1106 	}
1107 
1108 	/* Create an insert node and query graph to the prebuilt struct */
1109 
1110 	ins_node_t*		node;
1111 
1112 	node = ins_node_create(INS_DIRECT, table, prebuilt->heap);
1113 
1114 	prebuilt->ins_node = node;
1115 
1116 	if (prebuilt->ins_upd_rec_buff == 0) {
1117 		prebuilt->ins_upd_rec_buff = static_cast<byte*>(
1118 			mem_heap_alloc(
1119 				prebuilt->heap,
1120 				DICT_TF2_FLAG_IS_SET(prebuilt->table,
1121 						     DICT_TF2_FTS_HAS_DOC_ID)
1122 				? prebuilt->mysql_row_len + 8/* FTS_DOC_ID */
1123 				: prebuilt->mysql_row_len));
1124 	}
1125 
1126 	dtuple_t*	row;
1127 
1128 	row = dtuple_create_with_vcol(
1129 			prebuilt->heap, dict_table_get_n_cols(table),
1130 			dict_table_get_n_v_cols(table));
1131 
1132 	dict_table_copy_types(row, table);
1133 
1134 	ins_node_set_new_row(node, row);
1135 
1136 	prebuilt->ins_graph = static_cast<que_fork_t*>(
1137 		que_node_get_parent(
1138 			pars_complete_graph_for_exec(
1139 				node,
1140 				prebuilt->trx, prebuilt->heap, prebuilt)));
1141 
1142 	prebuilt->ins_graph->state = QUE_FORK_ACTIVE;
1143 
1144 	prebuilt->trx_id = table->def_trx_id;
1145 
1146 	return(prebuilt->ins_node->row);
1147 }
1148 
1149 /*********************************************************************//**
1150 Sets an AUTO_INC type lock on the table mentioned in prebuilt. The
1151 AUTO_INC lock gives exclusive access to the auto-inc counter of the
1152 table. The lock is reserved only for the duration of an SQL statement.
1153 It is not compatible with another AUTO_INC or exclusive lock on the
1154 table.
1155 @return error code or DB_SUCCESS */
1156 dberr_t
row_lock_table_autoinc_for_mysql(row_prebuilt_t * prebuilt)1157 row_lock_table_autoinc_for_mysql(
1158 /*=============================*/
1159 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in the MySQL
1160 					table handle */
1161 {
1162 	trx_t*			trx	= prebuilt->trx;
1163 	ins_node_t*		node	= prebuilt->ins_node;
1164 	const dict_table_t*	table	= prebuilt->table;
1165 	que_thr_t*		thr;
1166 	dberr_t			err;
1167 	ibool			was_lock_wait;
1168 
1169 	/* If we already hold an AUTOINC lock on the table then do nothing.
1170 	Note: We peek at the value of the current owner without acquiring
1171 	the lock mutex. */
1172 	if (trx == table->autoinc_trx) {
1173 
1174 		return(DB_SUCCESS);
1175 	}
1176 
1177 	trx->op_info = "setting auto-inc lock";
1178 
1179 	row_get_prebuilt_insert_row(prebuilt);
1180 	node = prebuilt->ins_node;
1181 
1182 	/* We use the insert query graph as the dummy graph needed
1183 	in the lock module call */
1184 
1185 	thr = que_fork_get_first_thr(prebuilt->ins_graph);
1186 
1187 	que_thr_move_to_run_state_for_mysql(thr, trx);
1188 
1189 run_again:
1190 	thr->run_node = node;
1191 	thr->prev_node = node;
1192 
1193 	/* It may be that the current session has not yet started
1194 	its transaction, or it has been committed: */
1195 
1196 	trx_start_if_not_started_xa(trx, true);
1197 
1198 	err = lock_table(0, prebuilt->table, LOCK_AUTO_INC, thr);
1199 
1200 	trx->error_state = err;
1201 
1202 	if (err != DB_SUCCESS) {
1203 		que_thr_stop_for_mysql(thr);
1204 
1205 		was_lock_wait = row_mysql_handle_errors(&err, trx, thr, NULL);
1206 
1207 		if (was_lock_wait) {
1208 			goto run_again;
1209 		}
1210 
1211 		trx->op_info = "";
1212 
1213 		return(err);
1214 	}
1215 
1216 	que_thr_stop_for_mysql_no_error(thr, trx);
1217 
1218 	trx->op_info = "";
1219 
1220 	return(err);
1221 }
1222 
1223 /** Lock a table.
1224 @param[in,out]	prebuilt	table handle
1225 @return error code or DB_SUCCESS */
1226 dberr_t
row_lock_table(row_prebuilt_t * prebuilt)1227 row_lock_table(row_prebuilt_t* prebuilt)
1228 {
1229 	trx_t*		trx		= prebuilt->trx;
1230 	que_thr_t*	thr;
1231 	dberr_t		err;
1232 	ibool		was_lock_wait;
1233 
1234 	trx->op_info = "setting table lock";
1235 
1236 	if (prebuilt->sel_graph == NULL) {
1237 		/* Build a dummy select query graph */
1238 		row_prebuild_sel_graph(prebuilt);
1239 	}
1240 
1241 	/* We use the select query graph as the dummy graph needed
1242 	in the lock module call */
1243 
1244 	thr = que_fork_get_first_thr(prebuilt->sel_graph);
1245 
1246 	que_thr_move_to_run_state_for_mysql(thr, trx);
1247 
1248 run_again:
1249 	thr->run_node = thr;
1250 	thr->prev_node = thr->common.parent;
1251 
1252 	/* It may be that the current session has not yet started
1253 	its transaction, or it has been committed: */
1254 
1255 	trx_start_if_not_started_xa(trx, false);
1256 
1257 	err = lock_table(0, prebuilt->table,
1258 			 static_cast<enum lock_mode>(
1259 				 prebuilt->select_lock_type),
1260 			 thr);
1261 
1262 	trx->error_state = err;
1263 
1264 	if (err != DB_SUCCESS) {
1265 		que_thr_stop_for_mysql(thr);
1266 
1267 		was_lock_wait = row_mysql_handle_errors(&err, trx, thr, NULL);
1268 
1269 		if (was_lock_wait) {
1270 			goto run_again;
1271 		}
1272 
1273 		trx->op_info = "";
1274 
1275 		return(err);
1276 	}
1277 
1278 	que_thr_stop_for_mysql_no_error(thr, trx);
1279 
1280 	trx->op_info = "";
1281 
1282 	return(err);
1283 }
1284 
1285 /** Determine is tablespace encrypted but decryption failed, is table corrupted
1286 or is tablespace .ibd file missing.
1287 @param[in]	table		Table
1288 @param[in]	trx		Transaction
1289 @param[in]	push_warning	true if we should push warning to user
1290 @retval	DB_DECRYPTION_FAILED	table is encrypted but decryption failed
1291 @retval	DB_CORRUPTION		table is corrupted
1292 @retval	DB_TABLESPACE_NOT_FOUND	tablespace .ibd file not found */
1293 static
1294 dberr_t
row_mysql_get_table_status(const dict_table_t * table,trx_t * trx,bool push_warning=true)1295 row_mysql_get_table_status(
1296 	const dict_table_t*	table,
1297 	trx_t*			trx,
1298 	bool 			push_warning = true)
1299 {
1300 	dberr_t err;
1301 	if (const fil_space_t* space = table->space) {
1302 		if (space->crypt_data && space->crypt_data->is_encrypted()) {
1303 			// maybe we cannot access the table due to failing
1304 			// to decrypt
1305 			if (push_warning) {
1306 				ib_push_warning(trx, DB_DECRYPTION_FAILED,
1307 					"Table %s in tablespace %lu encrypted."
1308 					"However key management plugin or used key_id is not found or"
1309 					" used encryption algorithm or method does not match.",
1310 					table->name.m_name, table->space);
1311 			}
1312 
1313 			err = DB_DECRYPTION_FAILED;
1314 		} else {
1315 			if (push_warning) {
1316 				ib_push_warning(trx, DB_CORRUPTION,
1317 					"Table %s in tablespace %lu corrupted.",
1318 					table->name.m_name, table->space);
1319 			}
1320 
1321 			err = DB_CORRUPTION;
1322 		}
1323 	} else {
1324 		ib::error() << ".ibd file is missing for table "
1325 			<< table->name;
1326 		err = DB_TABLESPACE_NOT_FOUND;
1327 	}
1328 
1329 	return(err);
1330 }
1331 
1332 /** Does an insert for MySQL.
1333 @param[in]	mysql_rec	row in the MySQL format
1334 @param[in,out]	prebuilt	prebuilt struct in MySQL handle
1335 @return error code or DB_SUCCESS */
1336 dberr_t
row_insert_for_mysql(const byte * mysql_rec,row_prebuilt_t * prebuilt,ins_mode_t ins_mode)1337 row_insert_for_mysql(
1338 	const byte*	mysql_rec,
1339 	row_prebuilt_t*	prebuilt,
1340 	ins_mode_t	ins_mode)
1341 {
1342 	trx_savept_t	savept;
1343 	que_thr_t*	thr;
1344 	dberr_t		err;
1345 	ibool		was_lock_wait;
1346 	trx_t*		trx		= prebuilt->trx;
1347 	ins_node_t*	node		= prebuilt->ins_node;
1348 	dict_table_t*	table		= prebuilt->table;
1349 
1350 	/* FIX_ME: This blob heap is used to compensate an issue in server
1351 	for virtual column blob handling */
1352 	mem_heap_t*	blob_heap = NULL;
1353 
1354 	ut_ad(trx);
1355 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
1356 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
1357 
1358 	if (!prebuilt->table->space) {
1359 
1360 		ib::error() << "The table " << prebuilt->table->name
1361 			<< " doesn't have a corresponding tablespace, it was"
1362 			" discarded.";
1363 
1364 		return(DB_TABLESPACE_DELETED);
1365 
1366 	} else if (!prebuilt->table->is_readable()) {
1367 		return(row_mysql_get_table_status(prebuilt->table, trx, true));
1368 	} else if (high_level_read_only) {
1369 		return(DB_READ_ONLY);
1370 	}
1371 
1372 	DBUG_EXECUTE_IF("mark_table_corrupted", {
1373 		/* Mark the table corrupted for the clustered index */
1374 		dict_index_t*	index = dict_table_get_first_index(table);
1375 		ut_ad(dict_index_is_clust(index));
1376 		dict_set_corrupted(index, trx, "INSERT TABLE"); });
1377 
1378 	if (dict_table_is_corrupted(table)) {
1379 
1380 		ib::error() << "Table " << table->name << " is corrupt.";
1381 		return(DB_TABLE_CORRUPT);
1382 	}
1383 
1384 	trx->op_info = "inserting";
1385 
1386 	row_mysql_delay_if_needed();
1387 
1388 	if (!table->no_rollback()) {
1389 		trx_start_if_not_started_xa(trx, true);
1390 	}
1391 
1392 	row_get_prebuilt_insert_row(prebuilt);
1393 	node = prebuilt->ins_node;
1394 
1395 	row_mysql_convert_row_to_innobase(node->row, prebuilt, mysql_rec,
1396 					  &blob_heap);
1397 
1398 	if (ins_mode != ROW_INS_NORMAL) {
1399           node->vers_update_end(prebuilt, ins_mode == ROW_INS_HISTORICAL);
1400         }
1401 
1402 	savept = trx_savept_take(trx);
1403 
1404 	thr = que_fork_get_first_thr(prebuilt->ins_graph);
1405 
1406 	if (prebuilt->sql_stat_start) {
1407 		node->state = INS_NODE_SET_IX_LOCK;
1408 		prebuilt->sql_stat_start = FALSE;
1409 	} else {
1410 		node->state = INS_NODE_ALLOC_ROW_ID;
1411 	}
1412 
1413 	que_thr_move_to_run_state_for_mysql(thr, trx);
1414 
1415 run_again:
1416 	thr->run_node = node;
1417 	thr->prev_node = node;
1418 
1419 	row_ins_step(thr);
1420 
1421 	DEBUG_SYNC_C("ib_after_row_insert_step");
1422 
1423 	err = trx->error_state;
1424 
1425 	if (err != DB_SUCCESS) {
1426 error_exit:
1427 		que_thr_stop_for_mysql(thr);
1428 
1429 		/* FIXME: What's this ? */
1430 		thr->lock_state = QUE_THR_LOCK_ROW;
1431 
1432 		was_lock_wait = row_mysql_handle_errors(
1433 			&err, trx, thr, &savept);
1434 
1435 		thr->lock_state = QUE_THR_LOCK_NOLOCK;
1436 
1437 		if (was_lock_wait) {
1438 			ut_ad(node->state == INS_NODE_INSERT_ENTRIES
1439 			      || node->state == INS_NODE_ALLOC_ROW_ID);
1440 			goto run_again;
1441 		}
1442 
1443 		trx->op_info = "";
1444 
1445 		if (blob_heap != NULL) {
1446 			mem_heap_free(blob_heap);
1447 		}
1448 
1449 		return(err);
1450 	}
1451 
1452 	if (dict_table_has_fts_index(table)) {
1453 		doc_id_t	doc_id;
1454 
1455 		/* Extract the doc id from the hidden FTS column */
1456 		doc_id = fts_get_doc_id_from_row(table, node->row);
1457 
1458 		if (doc_id <= 0) {
1459 			ib::error() << "FTS_DOC_ID must be larger than 0 for table "
1460 				    << table->name;
1461 			err = DB_FTS_INVALID_DOCID;
1462 			trx->error_state = DB_FTS_INVALID_DOCID;
1463 			goto error_exit;
1464 		}
1465 
1466 		if (!DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
1467 			doc_id_t	next_doc_id
1468 				= table->fts->cache->next_doc_id;
1469 
1470 			if (doc_id < next_doc_id) {
1471 				ib::error() << "FTS_DOC_ID must be larger than "
1472 					<< next_doc_id - 1 << " for table "
1473 					<< table->name;
1474 
1475 				err = DB_FTS_INVALID_DOCID;
1476 				trx->error_state = DB_FTS_INVALID_DOCID;
1477 				goto error_exit;
1478 			}
1479 		}
1480 
1481 		if (table->skip_alter_undo) {
1482 			if (trx->fts_trx == NULL) {
1483 				trx->fts_trx = fts_trx_create(trx);
1484 			}
1485 
1486 			fts_trx_table_t ftt;
1487 			ftt.table = table;
1488 			ftt.fts_trx = trx->fts_trx;
1489 
1490 			fts_add_doc_from_tuple(&ftt, doc_id, node->row);
1491 		} else {
1492 			/* Pass NULL for the columns affected, since an INSERT affects
1493 			all FTS indexes. */
1494 			fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
1495 		}
1496 	}
1497 
1498 	que_thr_stop_for_mysql_no_error(thr, trx);
1499 
1500 	if (table->is_system_db) {
1501 		srv_stats.n_system_rows_inserted.inc(size_t(trx->id));
1502 	} else {
1503 		srv_stats.n_rows_inserted.inc(size_t(trx->id));
1504 	}
1505 
1506 	/* Not protected by dict_sys->mutex for performance
1507 	reasons, we would rather get garbage in stat_n_rows (which is
1508 	just an estimate anyway) than protecting the following code
1509 	with a latch. */
1510 	dict_table_n_rows_inc(table);
1511 
1512 	if (prebuilt->clust_index_was_generated) {
1513 		/* set row id to prebuilt */
1514 		memcpy(prebuilt->row_id, node->sys_buf, DATA_ROW_ID_LEN);
1515 	}
1516 
1517 	dict_stats_update_if_needed(table, *trx);
1518 	trx->op_info = "";
1519 
1520 	if (blob_heap != NULL) {
1521 		mem_heap_free(blob_heap);
1522 	}
1523 
1524 	return(err);
1525 }
1526 
1527 /*********************************************************************//**
1528 Builds a dummy query graph used in selects. */
1529 void
row_prebuild_sel_graph(row_prebuilt_t * prebuilt)1530 row_prebuild_sel_graph(
1531 /*===================*/
1532 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL
1533 					handle */
1534 {
1535 	sel_node_t*	node;
1536 
1537 	ut_ad(prebuilt && prebuilt->trx);
1538 
1539 	if (prebuilt->sel_graph == NULL) {
1540 
1541 		node = sel_node_create(prebuilt->heap);
1542 
1543 		prebuilt->sel_graph = static_cast<que_fork_t*>(
1544 			que_node_get_parent(
1545 				pars_complete_graph_for_exec(
1546 					static_cast<sel_node_t*>(node),
1547 					prebuilt->trx, prebuilt->heap,
1548 					prebuilt)));
1549 
1550 		prebuilt->sel_graph->state = QUE_FORK_ACTIVE;
1551 	}
1552 }
1553 
1554 /*********************************************************************//**
1555 Creates an query graph node of 'update' type to be used in the MySQL
1556 interface.
1557 @return own: update node */
1558 upd_node_t*
row_create_update_node_for_mysql(dict_table_t * table,mem_heap_t * heap)1559 row_create_update_node_for_mysql(
1560 /*=============================*/
1561 	dict_table_t*	table,	/*!< in: table to update */
1562 	mem_heap_t*	heap)	/*!< in: mem heap from which allocated */
1563 {
1564 	upd_node_t*	node;
1565 
1566 	DBUG_ENTER("row_create_update_node_for_mysql");
1567 
1568 	node = upd_node_create(heap);
1569 
1570 	node->in_mysql_interface = true;
1571 	node->is_delete = NO_DELETE;
1572 	node->searched_update = FALSE;
1573 	node->select = NULL;
1574 	node->pcur = btr_pcur_create_for_mysql();
1575 
1576 	DBUG_PRINT("info", ("node: %p, pcur: %p", node, node->pcur));
1577 
1578 	node->table = table;
1579 
1580 	node->update = upd_create(dict_table_get_n_cols(table)
1581 				  + dict_table_get_n_v_cols(table), heap);
1582 
1583 	node->update_n_fields = dict_table_get_n_cols(table);
1584 
1585 	UT_LIST_INIT(node->columns, &sym_node_t::col_var_list);
1586 
1587 	node->has_clust_rec_x_lock = TRUE;
1588 	node->cmpl_info = 0;
1589 
1590 	node->table_sym = NULL;
1591 	node->col_assign_list = NULL;
1592 
1593 	DBUG_RETURN(node);
1594 }
1595 
1596 /*********************************************************************//**
1597 Gets pointer to a prebuilt update vector used in updates. If the update
1598 graph has not yet been built in the prebuilt struct, then this function
1599 first builds it.
1600 @return prebuilt update vector */
1601 upd_t*
row_get_prebuilt_update_vector(row_prebuilt_t * prebuilt)1602 row_get_prebuilt_update_vector(
1603 /*===========================*/
1604 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL
1605 					handle */
1606 {
1607 	if (prebuilt->upd_node == NULL) {
1608 
1609 		/* Not called before for this handle: create an update node
1610 		and query graph to the prebuilt struct */
1611 
1612 		prebuilt->upd_node = row_create_update_node_for_mysql(
1613 			prebuilt->table, prebuilt->heap);
1614 
1615 		prebuilt->upd_graph = static_cast<que_fork_t*>(
1616 			que_node_get_parent(
1617 				pars_complete_graph_for_exec(
1618 					prebuilt->upd_node,
1619 					prebuilt->trx, prebuilt->heap,
1620 					prebuilt)));
1621 
1622 		prebuilt->upd_graph->state = QUE_FORK_ACTIVE;
1623 	}
1624 
1625 	return(prebuilt->upd_node->update);
1626 }
1627 
1628 /********************************************************************
1629 Handle an update of a column that has an FTS index. */
1630 static
1631 void
row_fts_do_update(trx_t * trx,dict_table_t * table,doc_id_t old_doc_id,doc_id_t new_doc_id)1632 row_fts_do_update(
1633 /*==============*/
1634 	trx_t*		trx,		/* in: transaction */
1635 	dict_table_t*	table,		/* in: Table with FTS index */
1636 	doc_id_t	old_doc_id,	/* in: old document id */
1637 	doc_id_t	new_doc_id)	/* in: new document id */
1638 {
1639 	if(trx->fts_next_doc_id) {
1640 		fts_trx_add_op(trx, table, old_doc_id, FTS_DELETE, NULL);
1641 		if(new_doc_id != FTS_NULL_DOC_ID)
1642 		fts_trx_add_op(trx, table, new_doc_id, FTS_INSERT, NULL);
1643 	}
1644 }
1645 
1646 /************************************************************************
1647 Handles FTS matters for an update or a delete.
1648 NOTE: should not be called if the table does not have an FTS index. .*/
1649 static
1650 dberr_t
row_fts_update_or_delete(row_prebuilt_t * prebuilt)1651 row_fts_update_or_delete(
1652 /*=====================*/
1653 	row_prebuilt_t*	prebuilt)	/* in: prebuilt struct in MySQL
1654 					handle */
1655 {
1656 	trx_t*		trx = prebuilt->trx;
1657 	dict_table_t*	table = prebuilt->table;
1658 	upd_node_t*	node = prebuilt->upd_node;
1659 	doc_id_t	old_doc_id = prebuilt->fts_doc_id;
1660 
1661 	DBUG_ENTER("row_fts_update_or_delete");
1662 
1663 	ut_a(dict_table_has_fts_index(prebuilt->table));
1664 
1665 	/* Deletes are simple; get them out of the way first. */
1666 	if (node->is_delete == PLAIN_DELETE) {
1667 		/* A delete affects all FTS indexes, so we pass NULL */
1668 		fts_trx_add_op(trx, table, old_doc_id, FTS_DELETE, NULL);
1669 	} else {
1670 		doc_id_t	new_doc_id;
1671 		new_doc_id = fts_read_doc_id((byte*) &trx->fts_next_doc_id);
1672 
1673 		if (new_doc_id == 0) {
1674 			ib::error() << "InnoDB FTS: Doc ID cannot be 0";
1675 			return(DB_FTS_INVALID_DOCID);
1676 		}
1677 		row_fts_do_update(trx, table, old_doc_id, new_doc_id);
1678 	}
1679 
1680 	DBUG_RETURN(DB_SUCCESS);
1681 }
1682 
1683 /*********************************************************************//**
1684 Initialize the Doc ID system for FK table with FTS index */
1685 static
1686 void
init_fts_doc_id_for_ref(dict_table_t * table,ulint * depth)1687 init_fts_doc_id_for_ref(
1688 /*====================*/
1689 	dict_table_t*	table,		/*!< in: table */
1690 	ulint*		depth)		/*!< in: recusive call depth */
1691 {
1692 	dict_foreign_t* foreign;
1693 
1694 	table->fk_max_recusive_level = 0;
1695 
1696 	(*depth)++;
1697 
1698 	/* Limit on tables involved in cascading delete/update */
1699 	if (*depth > FK_MAX_CASCADE_DEL) {
1700 		return;
1701 	}
1702 
1703 	/* Loop through this table's referenced list and also
1704 	recursively traverse each table's foreign table list */
1705 	for (dict_foreign_set::iterator it = table->referenced_set.begin();
1706 	     it != table->referenced_set.end();
1707 	     ++it) {
1708 
1709 		foreign = *it;
1710 
1711 		ut_ad(foreign->foreign_table != NULL);
1712 
1713 		if (foreign->foreign_table->fts != NULL) {
1714 			fts_init_doc_id(foreign->foreign_table);
1715 		}
1716 
1717 		if (!foreign->foreign_table->referenced_set.empty()
1718 		    && foreign->foreign_table != table) {
1719 			init_fts_doc_id_for_ref(
1720 				foreign->foreign_table, depth);
1721 		}
1722 	}
1723 }
1724 
1725 /** Does an update or delete of a row for MySQL.
1726 @param[in,out]	prebuilt	prebuilt struct in MySQL handle
1727 @return error code or DB_SUCCESS */
1728 dberr_t
row_update_for_mysql(row_prebuilt_t * prebuilt)1729 row_update_for_mysql(row_prebuilt_t* prebuilt)
1730 {
1731 	trx_savept_t	savept;
1732 	dberr_t		err;
1733 	que_thr_t*	thr;
1734 	dict_index_t*	clust_index;
1735 	upd_node_t*	node;
1736 	dict_table_t*	table		= prebuilt->table;
1737 	trx_t*		trx		= prebuilt->trx;
1738 	ulint		fk_depth	= 0;
1739 	bool		got_s_lock	= false;
1740 
1741 	DBUG_ENTER("row_update_for_mysql");
1742 
1743 	ut_ad(trx);
1744 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
1745 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
1746 	ut_a(prebuilt->template_type == ROW_MYSQL_WHOLE_ROW);
1747 	ut_ad(table->stat_initialized);
1748 
1749 	if (!table->is_readable()) {
1750 		return(row_mysql_get_table_status(table, trx, true));
1751 	}
1752 
1753 	if (high_level_read_only) {
1754 		return(DB_READ_ONLY);
1755 	}
1756 
1757 	DEBUG_SYNC_C("innodb_row_update_for_mysql_begin");
1758 
1759 	trx->op_info = "updating or deleting";
1760 
1761 	row_mysql_delay_if_needed();
1762 
1763 	init_fts_doc_id_for_ref(table, &fk_depth);
1764 
1765 	if (!table->no_rollback()) {
1766 		trx_start_if_not_started_xa(trx, true);
1767 	}
1768 
1769 	if (dict_table_is_referenced_by_foreign_key(table)) {
1770 		/* Share lock the data dictionary to prevent any
1771 		table dictionary (for foreign constraint) change.
1772 		This is similar to row_ins_check_foreign_constraint
1773 		check protect by the dictionary lock as well.
1774 		In the future, this can be removed once the Foreign
1775 		key MDL is implemented */
1776 		row_mysql_freeze_data_dictionary(trx);
1777 		init_fts_doc_id_for_ref(table, &fk_depth);
1778 		row_mysql_unfreeze_data_dictionary(trx);
1779 	}
1780 
1781 	node = prebuilt->upd_node;
1782 	const bool is_delete = node->is_delete == PLAIN_DELETE;
1783 	ut_ad(node->table == table);
1784 
1785 	clust_index = dict_table_get_first_index(table);
1786 
1787 	btr_pcur_copy_stored_position(node->pcur,
1788 				      prebuilt->pcur->btr_cur.index
1789 				      == clust_index
1790 				      ? prebuilt->pcur
1791 				      : prebuilt->clust_pcur);
1792 
1793 	ut_a(node->pcur->rel_pos == BTR_PCUR_ON);
1794 
1795 	/* MySQL seems to call rnd_pos before updating each row it
1796 	has cached: we can get the correct cursor position from
1797 	prebuilt->pcur; NOTE that we cannot build the row reference
1798 	from mysql_rec if the clustered index was automatically
1799 	generated for the table: MySQL does not know anything about
1800 	the row id used as the clustered index key */
1801 
1802 	savept = trx_savept_take(trx);
1803 
1804 	thr = que_fork_get_first_thr(prebuilt->upd_graph);
1805 
1806 	node->state = UPD_NODE_UPDATE_CLUSTERED;
1807 
1808 	ut_ad(!prebuilt->sql_stat_start);
1809 
1810 	que_thr_move_to_run_state_for_mysql(thr, trx);
1811 
1812 	ut_ad(!prebuilt->versioned_write || node->table->versioned());
1813 
1814 	if (prebuilt->versioned_write) {
1815 		if (node->is_delete == VERSIONED_DELETE) {
1816                   node->vers_make_delete(trx);
1817                 } else if (node->update->affects_versioned()) {
1818                   node->vers_make_update(trx);
1819                 }
1820 	}
1821 
1822 	for (;;) {
1823 		thr->run_node = node;
1824 		thr->prev_node = node;
1825 		thr->fk_cascade_depth = 0;
1826 
1827 		row_upd_step(thr);
1828 
1829 		err = trx->error_state;
1830 
1831 		if (err == DB_SUCCESS) {
1832 			break;
1833 		}
1834 
1835 		que_thr_stop_for_mysql(thr);
1836 
1837 		if (err == DB_RECORD_NOT_FOUND) {
1838 			trx->error_state = DB_SUCCESS;
1839 			goto error;
1840 		}
1841 
1842 		thr->lock_state= QUE_THR_LOCK_ROW;
1843 
1844 		DEBUG_SYNC(trx->mysql_thd, "row_update_for_mysql_error");
1845 
1846 		bool was_lock_wait = row_mysql_handle_errors(
1847 			&err, trx, thr, &savept);
1848 		thr->lock_state= QUE_THR_LOCK_NOLOCK;
1849 
1850 		if (!was_lock_wait) {
1851 			goto error;
1852 		}
1853 	}
1854 
1855 	que_thr_stop_for_mysql_no_error(thr, trx);
1856 
1857 	if (dict_table_has_fts_index(table)
1858 	    && trx->fts_next_doc_id != UINT64_UNDEFINED) {
1859 		err = row_fts_update_or_delete(prebuilt);
1860 		if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
1861 			ut_ad(!"unexpected error");
1862 			goto error;
1863 		}
1864 	}
1865 
1866 	/* Completed cascading operations (if any) */
1867 	if (got_s_lock) {
1868 		row_mysql_unfreeze_data_dictionary(trx);
1869 	}
1870 
1871 	bool	update_statistics;
1872 	ut_ad(is_delete == (node->is_delete == PLAIN_DELETE));
1873 
1874 	if (is_delete) {
1875 		/* Not protected by dict_sys->mutex for performance
1876 		reasons, we would rather get garbage in stat_n_rows (which is
1877 		just an estimate anyway) than protecting the following code
1878 		with a latch. */
1879 		dict_table_n_rows_dec(prebuilt->table);
1880 
1881 		if (table->is_system_db) {
1882 			srv_stats.n_system_rows_deleted.inc(size_t(trx->id));
1883 		} else {
1884 			srv_stats.n_rows_deleted.inc(size_t(trx->id));
1885 		}
1886 
1887 		update_statistics = !srv_stats_include_delete_marked;
1888 	} else {
1889 		if (table->is_system_db) {
1890 			srv_stats.n_system_rows_updated.inc(size_t(trx->id));
1891 		} else {
1892 			srv_stats.n_rows_updated.inc(size_t(trx->id));
1893 		}
1894 
1895 		update_statistics
1896 			= !(node->cmpl_info & UPD_NODE_NO_ORD_CHANGE);
1897 	}
1898 
1899 	if (update_statistics) {
1900 		dict_stats_update_if_needed(prebuilt->table, *trx);
1901 	} else {
1902 		/* Always update the table modification counter. */
1903 		prebuilt->table->stat_modified_counter++;
1904 	}
1905 
1906 	trx->op_info = "";
1907 
1908 	DBUG_RETURN(err);
1909 
1910 error:
1911 	trx->op_info = "";
1912 	if (got_s_lock) {
1913 		row_mysql_unfreeze_data_dictionary(trx);
1914 	}
1915 
1916 	DBUG_RETURN(err);
1917 }
1918 
1919 /** This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
1920 session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
1921 Before calling this function row_search_for_mysql() must have
1922 initialized prebuilt->new_rec_locks to store the information which new
1923 record locks really were set. This function removes a newly set
1924 clustered index record lock under prebuilt->pcur or
1925 prebuilt->clust_pcur.  Thus, this implements a 'mini-rollback' that
1926 releases the latest clustered index record lock we set.
1927 @param[in,out]	prebuilt		prebuilt struct in MySQL handle
1928 @param[in]	has_latches_on_recs	TRUE if called so that we have the
1929 					latches on the records under pcur
1930 					and clust_pcur, and we do not need
1931 					to reposition the cursors. */
1932 void
row_unlock_for_mysql(row_prebuilt_t * prebuilt,ibool has_latches_on_recs)1933 row_unlock_for_mysql(
1934 	row_prebuilt_t*	prebuilt,
1935 	ibool		has_latches_on_recs)
1936 {
1937 	btr_pcur_t*	pcur		= prebuilt->pcur;
1938 	btr_pcur_t*	clust_pcur	= prebuilt->clust_pcur;
1939 	trx_t*		trx		= prebuilt->trx;
1940 
1941 	ut_ad(prebuilt != NULL);
1942 	ut_ad(trx != NULL);
1943 
1944 	if (UNIV_UNLIKELY
1945 	    (!srv_locks_unsafe_for_binlog
1946 	     && trx->isolation_level > TRX_ISO_READ_COMMITTED)) {
1947 
1948 		ib::error() << "Calling row_unlock_for_mysql though"
1949 			" innodb_locks_unsafe_for_binlog is FALSE and this"
1950 			" session is not using READ COMMITTED isolation"
1951 			" level.";
1952 		return;
1953 	}
1954 	if (dict_index_is_spatial(prebuilt->index)) {
1955 		return;
1956 	}
1957 
1958 	trx->op_info = "unlock_row";
1959 
1960 	if (prebuilt->new_rec_locks >= 1) {
1961 
1962 		const rec_t*	rec;
1963 		dict_index_t*	index;
1964 		trx_id_t	rec_trx_id;
1965 		mtr_t		mtr;
1966 
1967 		mtr_start(&mtr);
1968 
1969 		/* Restore the cursor position and find the record */
1970 
1971 		if (!has_latches_on_recs) {
1972 			btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, &mtr);
1973 		}
1974 
1975 		rec = btr_pcur_get_rec(pcur);
1976 		index = btr_pcur_get_btr_cur(pcur)->index;
1977 
1978 		if (prebuilt->new_rec_locks >= 2) {
1979 			/* Restore the cursor position and find the record
1980 			in the clustered index. */
1981 
1982 			if (!has_latches_on_recs) {
1983 				btr_pcur_restore_position(BTR_SEARCH_LEAF,
1984 							  clust_pcur, &mtr);
1985 			}
1986 
1987 			rec = btr_pcur_get_rec(clust_pcur);
1988 			index = btr_pcur_get_btr_cur(clust_pcur)->index;
1989 		}
1990 
1991 		if (!dict_index_is_clust(index)) {
1992 			/* This is not a clustered index record.  We
1993 			do not know how to unlock the record. */
1994 			goto no_unlock;
1995 		}
1996 
1997 		/* If the record has been modified by this
1998 		transaction, do not unlock it. */
1999 
2000 		if (index->trx_id_offset) {
2001 			rec_trx_id = trx_read_trx_id(rec
2002 						     + index->trx_id_offset);
2003 		} else {
2004 			mem_heap_t*	heap			= NULL;
2005 			rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
2006 			rec_offs* offsets				= offsets_;
2007 
2008 			rec_offs_init(offsets_);
2009 			offsets = rec_get_offsets(rec, index, offsets,
2010 						  index->n_core_fields,
2011 						  ULINT_UNDEFINED, &heap);
2012 
2013 			rec_trx_id = row_get_rec_trx_id(rec, index, offsets);
2014 
2015 			if (UNIV_LIKELY_NULL(heap)) {
2016 				mem_heap_free(heap);
2017 			}
2018 		}
2019 
2020 		if (rec_trx_id != trx->id) {
2021 			/* We did not update the record: unlock it */
2022 
2023 			rec = btr_pcur_get_rec(pcur);
2024 
2025 			lock_rec_unlock(
2026 				trx,
2027 				btr_pcur_get_block(pcur),
2028 				rec,
2029 				static_cast<enum lock_mode>(
2030 					prebuilt->select_lock_type));
2031 
2032 			if (prebuilt->new_rec_locks >= 2) {
2033 				rec = btr_pcur_get_rec(clust_pcur);
2034 
2035 				lock_rec_unlock(
2036 					trx,
2037 					btr_pcur_get_block(clust_pcur),
2038 					rec,
2039 					static_cast<enum lock_mode>(
2040 						prebuilt->select_lock_type));
2041 			}
2042 		}
2043 no_unlock:
2044 		mtr_commit(&mtr);
2045 	}
2046 
2047 	trx->op_info = "";
2048 }
2049 
2050 /*********************************************************************//**
2051 Locks the data dictionary in shared mode from modifications, for performing
2052 foreign key check, rollback, or other operation invisible to MySQL. */
2053 void
row_mysql_freeze_data_dictionary_func(trx_t * trx,const char * file,unsigned line)2054 row_mysql_freeze_data_dictionary_func(
2055 /*==================================*/
2056 	trx_t*		trx,	/*!< in/out: transaction */
2057 	const char*	file,	/*!< in: file name */
2058 	unsigned	line)	/*!< in: line number */
2059 {
2060 	ut_a(trx->dict_operation_lock_mode == 0);
2061 
2062 	rw_lock_s_lock_inline(&dict_operation_lock, 0, file, line);
2063 
2064 	trx->dict_operation_lock_mode = RW_S_LATCH;
2065 }
2066 
2067 /*********************************************************************//**
2068 Unlocks the data dictionary shared lock. */
2069 void
row_mysql_unfreeze_data_dictionary(trx_t * trx)2070 row_mysql_unfreeze_data_dictionary(
2071 /*===============================*/
2072 	trx_t*	trx)	/*!< in/out: transaction */
2073 {
2074 	ut_ad(lock_trx_has_sys_table_locks(trx) == NULL);
2075 
2076 	ut_a(trx->dict_operation_lock_mode == RW_S_LATCH);
2077 
2078 	rw_lock_s_unlock(&dict_operation_lock);
2079 
2080 	trx->dict_operation_lock_mode = 0;
2081 }
2082 
2083 /** Write query start time as SQL field data to a buffer. Needed by InnoDB.
2084 @param	thd	Thread object
2085 @param	buf	Buffer to hold start time data */
2086 void thd_get_query_start_data(THD *thd, char *buf);
2087 
2088 /** Insert history row when evaluating foreign key referential action.
2089 
2090 1. Create new dtuple_t 'row' from node->historical_row;
2091 2. Update its row_end to current timestamp;
2092 3. Insert it to a table;
2093 4. Update table statistics.
2094 
2095 This is used in UPDATE CASCADE/SET NULL of a system versioned referenced table.
2096 
2097 node->historical_row: dtuple_t containing pointers of row changed by refertial
2098 action.
2099 
2100 @param[in]	thr	current query thread
2101 @param[in]	node	a node which just updated a row in a foreign table
2102 @return DB_SUCCESS or some error */
row_update_vers_insert(que_thr_t * thr,upd_node_t * node)2103 static dberr_t row_update_vers_insert(que_thr_t* thr, upd_node_t* node)
2104 {
2105 	trx_t* trx = thr_get_trx(thr);
2106 	dfield_t* row_end;
2107 	char row_end_data[8];
2108 	dict_table_t* table = node->table;
2109 	page_size_t page_size= dict_table_page_size(table);
2110 	ut_ad(table->versioned());
2111 
2112 	dtuple_t*       row;
2113 	const ulint     n_cols        = dict_table_get_n_cols(table);
2114 	const ulint     n_v_cols      = dict_table_get_n_v_cols(table);
2115 
2116 	ut_ad(n_cols == dtuple_get_n_fields(node->historical_row));
2117 	ut_ad(n_v_cols == dtuple_get_n_v_fields(node->historical_row));
2118 
2119 	row = dtuple_create_with_vcol(node->historical_heap, n_cols, n_v_cols);
2120 
2121 	dict_table_copy_types(row, table);
2122 
2123 	ins_node_t* insert_node =
2124 		ins_node_create(INS_DIRECT, table, node->historical_heap);
2125 
2126 	if (!insert_node) {
2127 		trx->error_state = DB_OUT_OF_MEMORY;
2128 		goto exit;
2129 	}
2130 
2131 	insert_node->common.parent = thr;
2132 	ins_node_set_new_row(insert_node, row);
2133 
2134 	ut_ad(n_cols > DATA_N_SYS_COLS);
2135 	// Exclude DB_ROW_ID, DB_TRX_ID, DB_ROLL_PTR
2136 	for (ulint i = 0; i < n_cols - DATA_N_SYS_COLS; i++) {
2137 		dfield_t *src= dtuple_get_nth_field(node->historical_row, i);
2138 		dfield_t *dst= dtuple_get_nth_field(row, i);
2139 		dfield_copy(dst, src);
2140 		if (dfield_is_ext(src)) {
2141 			byte *field_data
2142 				= static_cast<byte*>(dfield_get_data(src));
2143 			ulint ext_len;
2144 			ulint field_len = dfield_get_len(src);
2145 
2146 			ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
2147 
2148 			ut_a(memcmp(field_data + field_len
2149 				     - BTR_EXTERN_FIELD_REF_SIZE,
2150 				     field_ref_zero,
2151 				     BTR_EXTERN_FIELD_REF_SIZE));
2152 
2153 			byte *data = btr_copy_externally_stored_field(
2154 				&ext_len, field_data, page_size, field_len,
2155 				node->historical_heap);
2156 			dfield_set_data(dst, data, ext_len);
2157 		}
2158 	}
2159 
2160 	for (ulint i = 0; i < n_v_cols; i++) {
2161 		dfield_t *dst= dtuple_get_nth_v_field(row, i);
2162 		dfield_t *src= dtuple_get_nth_v_field(node->historical_row, i);
2163 		dfield_copy(dst, src);
2164 	}
2165 
2166 	node->historical_row = NULL;
2167 
2168 	row_end = dtuple_get_nth_field(row, table->vers_end);
2169 	if (dict_table_get_nth_col(table, table->vers_end)->vers_native()) {
2170 		mach_write_to_8(row_end_data, trx->id);
2171 		dfield_set_data(row_end, row_end_data, 8);
2172 	} else {
2173 		thd_get_query_start_data(trx->mysql_thd, row_end_data);
2174 		dfield_set_data(row_end, row_end_data, 7);
2175 	}
2176 
2177 	for (;;) {
2178 		thr->run_node = insert_node;
2179 		thr->prev_node = insert_node;
2180 
2181 		row_ins_step(thr);
2182 
2183 		switch (trx->error_state) {
2184 		case DB_LOCK_WAIT:
2185 			que_thr_stop_for_mysql(thr);
2186 			lock_wait_suspend_thread(thr);
2187 
2188 			if (trx->error_state == DB_SUCCESS) {
2189 				continue;
2190 			}
2191 
2192 			/* fall through */
2193 		default:
2194 			/* Other errors are handled for the parent node. */
2195 			thr->fk_cascade_depth = 0;
2196 			goto exit;
2197 
2198 		case DB_SUCCESS:
2199 			srv_stats.n_rows_inserted.inc(
2200 				static_cast<size_t>(trx->id));
2201 			dict_stats_update_if_needed(table, *trx);
2202 			goto exit;
2203 		}
2204 	}
2205 exit:
2206 	que_graph_free_recursive(insert_node);
2207 	mem_heap_free(node->historical_heap);
2208 	node->historical_heap = NULL;
2209 	return trx->error_state;
2210 }
2211 
2212 /**********************************************************************//**
2213 Does a cascaded delete or set null in a foreign key operation.
2214 @return error code or DB_SUCCESS */
2215 dberr_t
row_update_cascade_for_mysql(que_thr_t * thr,upd_node_t * node,dict_table_t * table)2216 row_update_cascade_for_mysql(
2217 /*=========================*/
2218         que_thr_t*      thr,    /*!< in: query thread */
2219         upd_node_t*     node,   /*!< in: update node used in the cascade
2220                                 or set null operation */
2221         dict_table_t*   table)  /*!< in: table where we do the operation */
2222 {
2223         /* Increment fk_cascade_depth to record the recursive call depth on
2224         a single update/delete that affects multiple tables chained
2225         together with foreign key relations. */
2226 
2227         if (++thr->fk_cascade_depth > FK_MAX_CASCADE_DEL) {
2228                 return(DB_FOREIGN_EXCEED_MAX_CASCADE);
2229         }
2230 
2231 	const trx_t* trx = thr_get_trx(thr);
2232 
2233 	if (table->versioned()) {
2234 		if (node->is_delete == PLAIN_DELETE) {
2235                   node->vers_make_delete(trx);
2236                 } else if (node->update->affects_versioned()) {
2237 			dberr_t err = row_update_vers_insert(thr, node);
2238 			if (err != DB_SUCCESS) {
2239 				return err;
2240 			}
2241                         node->vers_make_update(trx);
2242                 }
2243 	}
2244 
2245 	for (;;) {
2246 		thr->run_node = node;
2247 		thr->prev_node = node;
2248 
2249 		DEBUG_SYNC_C("foreign_constraint_update_cascade");
2250 		{
2251 			TABLE *mysql_table = thr->prebuilt->m_mysql_table;
2252 			thr->prebuilt->m_mysql_table = NULL;
2253 			row_upd_step(thr);
2254 			thr->prebuilt->m_mysql_table = mysql_table;
2255 		}
2256 
2257 		switch (trx->error_state) {
2258 		case DB_LOCK_WAIT:
2259 			que_thr_stop_for_mysql(thr);
2260 			lock_wait_suspend_thread(thr);
2261 
2262 			if (trx->error_state == DB_SUCCESS) {
2263 				continue;
2264 			}
2265 
2266 			/* fall through */
2267 		default:
2268 			/* Other errors are handled for the parent node. */
2269 			thr->fk_cascade_depth = 0;
2270 			return trx->error_state;
2271 
2272 		case DB_SUCCESS:
2273 			thr->fk_cascade_depth = 0;
2274 			bool stats;
2275 
2276 			if (node->is_delete == PLAIN_DELETE) {
2277 				/* Not protected by
2278 				dict_sys->mutex for
2279 				performance reasons, we would rather
2280 				get garbage in stat_n_rows (which is
2281 				just an estimate anyway) than
2282 				protecting the following code with a
2283 				latch. */
2284 				dict_table_n_rows_dec(node->table);
2285 
2286 				stats = !srv_stats_include_delete_marked;
2287 				srv_stats.n_rows_deleted.inc(size_t(trx->id));
2288 			} else {
2289 				stats = !(node->cmpl_info
2290 					  & UPD_NODE_NO_ORD_CHANGE);
2291 				srv_stats.n_rows_updated.inc(size_t(trx->id));
2292 			}
2293 
2294 			if (stats) {
2295 				dict_stats_update_if_needed(node->table, *trx);
2296 			} else {
2297 				/* Always update the table
2298 				modification counter. */
2299 				node->table->stat_modified_counter++;
2300 			}
2301 
2302 			return(DB_SUCCESS);
2303 		}
2304 	}
2305 }
2306 
2307 /*********************************************************************//**
2308 Locks the data dictionary exclusively for performing a table create or other
2309 data dictionary modification operation. */
2310 void
row_mysql_lock_data_dictionary_func(trx_t * trx,const char * file,unsigned line)2311 row_mysql_lock_data_dictionary_func(
2312 /*================================*/
2313 	trx_t*		trx,	/*!< in/out: transaction */
2314 	const char*	file,	/*!< in: file name */
2315 	unsigned	line)	/*!< in: line number */
2316 {
2317 	ut_a(trx->dict_operation_lock_mode == 0
2318 	     || trx->dict_operation_lock_mode == RW_X_LATCH);
2319 
2320 	/* Serialize data dictionary operations with dictionary mutex:
2321 	no deadlocks or lock waits can occur then in these operations */
2322 
2323 	rw_lock_x_lock_inline(&dict_operation_lock, 0, file, line);
2324 	trx->dict_operation_lock_mode = RW_X_LATCH;
2325 
2326 	mutex_enter(&dict_sys->mutex);
2327 }
2328 
2329 /*********************************************************************//**
2330 Unlocks the data dictionary exclusive lock. */
2331 void
row_mysql_unlock_data_dictionary(trx_t * trx)2332 row_mysql_unlock_data_dictionary(
2333 /*=============================*/
2334 	trx_t*	trx)	/*!< in/out: transaction */
2335 {
2336 	ut_ad(lock_trx_has_sys_table_locks(trx) == NULL);
2337 
2338 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2339 
2340 	/* Serialize data dictionary operations with dictionary mutex:
2341 	no deadlocks can occur then in these operations */
2342 
2343 	mutex_exit(&dict_sys->mutex);
2344 	rw_lock_x_unlock(&dict_operation_lock);
2345 
2346 	trx->dict_operation_lock_mode = 0;
2347 }
2348 
2349 /*********************************************************************//**
2350 Creates a table for MySQL. On failure the transaction will be rolled back
2351 and the 'table' object will be freed.
2352 @return error code or DB_SUCCESS */
2353 dberr_t
row_create_table_for_mysql(dict_table_t * table,trx_t * trx,fil_encryption_t mode,uint32_t key_id)2354 row_create_table_for_mysql(
2355 /*=======================*/
2356 	dict_table_t*	table,	/*!< in, own: table definition
2357 				(will be freed, or on DB_SUCCESS
2358 				added to the data dictionary cache) */
2359 	trx_t*		trx,	/*!< in/out: transaction */
2360 	fil_encryption_t mode,	/*!< in: encryption mode */
2361 	uint32_t	key_id)	/*!< in: encryption key_id */
2362 {
2363 	tab_node_t*	node;
2364 	mem_heap_t*	heap;
2365 	que_thr_t*	thr;
2366 	dberr_t		err;
2367 
2368 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X));
2369 	ut_ad(mutex_own(&dict_sys->mutex));
2370 	ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
2371 
2372 	DBUG_EXECUTE_IF(
2373 		"ib_create_table_fail_at_start_of_row_create_table_for_mysql",
2374 		goto err_exit;
2375 	);
2376 
2377 	trx->op_info = "creating table";
2378 
2379 	if (row_mysql_is_system_table(table->name.m_name)) {
2380 
2381 		ib::error() << "Trying to create a MySQL system table "
2382 			<< table->name << " of type InnoDB. MySQL system"
2383 			" tables must be of the MyISAM type!";
2384 #ifndef DBUG_OFF
2385 err_exit:
2386 #endif /* !DBUG_OFF */
2387 		dict_mem_table_free(table);
2388 
2389 		trx->op_info = "";
2390 
2391 		return(DB_ERROR);
2392 	}
2393 
2394 	trx_start_if_not_started_xa(trx, true);
2395 
2396 	heap = mem_heap_create(512);
2397 
2398 	switch (trx_get_dict_operation(trx)) {
2399 	case TRX_DICT_OP_NONE:
2400 		trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
2401 	case TRX_DICT_OP_TABLE:
2402 		break;
2403 	case TRX_DICT_OP_INDEX:
2404 		/* If the transaction was previously flagged as
2405 		TRX_DICT_OP_INDEX, we should be creating auxiliary
2406 		tables for full-text indexes. */
2407 		ut_ad(strstr(table->name.m_name, "/FTS_") != NULL);
2408 	}
2409 
2410 	node = tab_create_graph_create(table, heap, mode, key_id);
2411 
2412 	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
2413 
2414 	ut_a(thr == que_fork_start_command(
2415 			static_cast<que_fork_t*>(que_node_get_parent(thr))));
2416 
2417 	que_run_threads(thr);
2418 
2419 	err = trx->error_state;
2420 
2421 	/* Update SYS_TABLESPACES and SYS_DATAFILES if a new file-per-table
2422 	tablespace was created. */
2423 	if (err == DB_SUCCESS && dict_table_is_file_per_table(table)) {
2424 		err = dict_replace_tablespace_in_dictionary(
2425 			table->space_id, table->name.m_name,
2426 			table->space->flags,
2427 			table->space->chain.start->name, trx);
2428 
2429 		if (err != DB_SUCCESS) {
2430 
2431 			/* We must delete the link file. */
2432 			RemoteDatafile::delete_link_file(table->name.m_name);
2433 		}
2434 	}
2435 
2436 	switch (err) {
2437 	case DB_SUCCESS:
2438 		break;
2439 	case DB_OUT_OF_FILE_SPACE:
2440 		trx->error_state = DB_SUCCESS;
2441 		trx_rollback_to_savepoint(trx, NULL);
2442 
2443 		ib::warn() << "Cannot create table "
2444 			<< table->name
2445 			<< " because tablespace full";
2446 
2447 		if (dict_table_open_on_name(table->name.m_name, TRUE, FALSE,
2448 					    DICT_ERR_IGNORE_NONE)) {
2449 
2450 			dict_table_close_and_drop(trx, table);
2451 		} else {
2452 			dict_mem_table_free(table);
2453 		}
2454 
2455 		break;
2456 
2457 	case DB_UNSUPPORTED:
2458 	case DB_TOO_MANY_CONCURRENT_TRXS:
2459 		/* We already have .ibd file here. it should be deleted. */
2460 
2461 		if (dict_table_is_file_per_table(table)
2462 		    && fil_delete_tablespace(table->space_id) != DB_SUCCESS) {
2463 			ib::error() << "Cannot delete the file of table "
2464 				<< table->name;
2465 		}
2466 		/* fall through */
2467 
2468 	case DB_DUPLICATE_KEY:
2469 	case DB_TABLESPACE_EXISTS:
2470 	default:
2471 		trx->error_state = DB_SUCCESS;
2472 		trx_rollback_to_savepoint(trx, NULL);
2473 		dict_mem_table_free(table);
2474 		break;
2475 	}
2476 
2477 	que_graph_free((que_t*) que_node_get_parent(thr));
2478 
2479 	trx->op_info = "";
2480 
2481 	return(err);
2482 }
2483 
2484 /*********************************************************************//**
2485 Create an index when creating a table.
2486 On failure, the caller must drop the table!
2487 @return error number or DB_SUCCESS */
2488 dberr_t
row_create_index_for_mysql(dict_index_t * index,trx_t * trx,const ulint * field_lengths)2489 row_create_index_for_mysql(
2490 /*=======================*/
2491 	dict_index_t*	index,		/*!< in, own: index definition
2492 					(will be freed) */
2493 	trx_t*		trx,		/*!< in: transaction handle */
2494 	const ulint*	field_lengths)	/*!< in: if not NULL, must contain
2495 					dict_index_get_n_fields(index)
2496 					actual field lengths for the
2497 					index columns, which are
2498 					then checked for not being too
2499 					large. */
2500 {
2501 	ind_node_t*	node;
2502 	mem_heap_t*	heap;
2503 	que_thr_t*	thr;
2504 	dberr_t		err;
2505 	ulint		i;
2506 	ulint		len;
2507 	dict_table_t*	table = index->table;
2508 
2509 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X));
2510 	ut_ad(mutex_own(&dict_sys->mutex));
2511 
2512 	for (i = 0; i < index->n_def; i++) {
2513 		/* Check that prefix_len and actual length
2514 		< DICT_MAX_INDEX_COL_LEN */
2515 
2516 		len = dict_index_get_nth_field(index, i)->prefix_len;
2517 
2518 		if (field_lengths && field_lengths[i]) {
2519 			len = ut_max(len, field_lengths[i]);
2520 		}
2521 
2522 		DBUG_EXECUTE_IF(
2523 			"ib_create_table_fail_at_create_index",
2524 			len = DICT_MAX_FIELD_LEN_BY_FORMAT(table) + 1;
2525 		);
2526 
2527 		/* Column or prefix length exceeds maximum column length */
2528 		if (len > (ulint) DICT_MAX_FIELD_LEN_BY_FORMAT(table)) {
2529 			dict_mem_index_free(index);
2530 			return DB_TOO_BIG_INDEX_COL;
2531 		}
2532 	}
2533 
2534 	trx->op_info = "creating index";
2535 
2536 	/* For temp-table we avoid insertion into SYSTEM TABLES to
2537 	maintain performance and so we have separate path that directly
2538 	just updates dictonary cache. */
2539 	if (!table->is_temporary()) {
2540 		trx_start_if_not_started_xa(trx, true);
2541 		trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
2542 		/* Note that the space id where we store the index is
2543 		inherited from the table in dict_build_index_def_step()
2544 		in dict0crea.cc. */
2545 
2546 		heap = mem_heap_create(512);
2547 		node = ind_create_graph_create(index, table->name.m_name,
2548 					       heap);
2549 
2550 		thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
2551 
2552 		ut_a(thr == que_fork_start_command(
2553 				static_cast<que_fork_t*>(
2554 					que_node_get_parent(thr))));
2555 
2556 		que_run_threads(thr);
2557 
2558 		err = trx->error_state;
2559 
2560 		index = node->index;
2561 
2562 		ut_ad(!index == (err != DB_SUCCESS));
2563 
2564 		que_graph_free((que_t*) que_node_get_parent(thr));
2565 
2566 		if (index && (index->type & DICT_FTS)) {
2567 			err = fts_create_index_tables(trx, index, table->id);
2568 		}
2569 	} else {
2570 		dict_build_index_def(table, index, trx);
2571 
2572 		err = dict_index_add_to_cache(index, FIL_NULL);
2573 		ut_ad((index == NULL) == (err != DB_SUCCESS));
2574 		if (UNIV_LIKELY(err == DB_SUCCESS)) {
2575 			ut_ad(!index->is_instant());
2576 			index->n_core_null_bytes = UT_BITS_IN_BYTES(
2577 				unsigned(index->n_nullable));
2578 
2579 			err = dict_create_index_tree_in_mem(index, trx);
2580 #ifdef BTR_CUR_HASH_ADAPT
2581 			ut_ad(!index->search_info->ref_count);
2582 #endif /* BTR_CUR_HASH_ADAPT */
2583 
2584 			if (err != DB_SUCCESS) {
2585 				dict_index_remove_from_cache(table, index);
2586 			}
2587 		}
2588 	}
2589 
2590 	trx->op_info = "";
2591 
2592 	return(err);
2593 }
2594 
2595 /*********************************************************************//**
2596 Drops a table for MySQL as a background operation. MySQL relies on Unix
2597 in ALTER TABLE to the fact that the table handler does not remove the
2598 table before all handles to it has been removed. Furhermore, the MySQL's
2599 call to drop table must be non-blocking. Therefore we do the drop table
2600 as a background operation, which is taken care of by the master thread
2601 in srv0srv.cc.
2602 @return error code or DB_SUCCESS */
2603 static
2604 dberr_t
row_drop_table_for_mysql_in_background(const char * name)2605 row_drop_table_for_mysql_in_background(
2606 /*===================================*/
2607 	const char*	name)	/*!< in: table name */
2608 {
2609 	dberr_t	error;
2610 	trx_t*	trx;
2611 
2612 	trx = trx_create();
2613 
2614 	/* If the original transaction was dropping a table referenced by
2615 	foreign keys, we must set the following to be able to drop the
2616 	table: */
2617 
2618 	trx->check_foreigns = false;
2619 
2620 	/* Try to drop the table in InnoDB */
2621 
2622 	error = row_drop_table_for_mysql(name, trx, SQLCOM_TRUNCATE);
2623 
2624 	trx_commit_for_mysql(trx);
2625 
2626 	trx->free();
2627 
2628 	return(error);
2629 }
2630 
2631 /*********************************************************************//**
2632 The master thread in srv0srv.cc calls this regularly to drop tables which
2633 we must drop in background after queries to them have ended. Such lazy
2634 dropping of tables is needed in ALTER TABLE on Unix.
2635 @return how many tables dropped + remaining tables in list */
2636 ulint
row_drop_tables_for_mysql_in_background(void)2637 row_drop_tables_for_mysql_in_background(void)
2638 /*=========================================*/
2639 {
2640 	row_mysql_drop_t*	drop;
2641 	dict_table_t*		table;
2642 	ulint			n_tables;
2643 	ulint			n_tables_dropped = 0;
2644 loop:
2645 	mutex_enter(&row_drop_list_mutex);
2646 
2647 	ut_a(row_mysql_drop_list_inited);
2648 next:
2649 	drop = UT_LIST_GET_FIRST(row_mysql_drop_list);
2650 
2651 	n_tables = UT_LIST_GET_LEN(row_mysql_drop_list);
2652 
2653 	mutex_exit(&row_drop_list_mutex);
2654 
2655 	if (drop == NULL) {
2656 		/* All tables dropped */
2657 
2658 		return(n_tables + n_tables_dropped);
2659 	}
2660 
2661 	/* On fast shutdown, just empty the list without dropping tables. */
2662 	table = srv_shutdown_state == SRV_SHUTDOWN_NONE || !srv_fast_shutdown
2663 		? dict_table_open_on_id(drop->table_id, FALSE,
2664 					DICT_TABLE_OP_OPEN_ONLY_IF_CACHED)
2665 		: NULL;
2666 
2667 	if (!table) {
2668 		n_tables_dropped++;
2669 		mutex_enter(&row_drop_list_mutex);
2670 		UT_LIST_REMOVE(row_mysql_drop_list, drop);
2671 		MONITOR_DEC(MONITOR_BACKGROUND_DROP_TABLE);
2672 		ut_free(drop);
2673 		goto next;
2674 	}
2675 
2676 	ut_a(!table->can_be_evicted);
2677 
2678 	bool skip = false;
2679 
2680 	if (!table->to_be_dropped) {
2681 skip:
2682 		dict_table_close(table, FALSE, FALSE);
2683 
2684 		mutex_enter(&row_drop_list_mutex);
2685 		UT_LIST_REMOVE(row_mysql_drop_list, drop);
2686 		if (!skip) {
2687 			UT_LIST_ADD_LAST(row_mysql_drop_list, drop);
2688 		} else {
2689 			ut_free(drop);
2690 		}
2691 		goto next;
2692 	}
2693 
2694 	if (!srv_fast_shutdown && !trx_sys.any_active_transactions()) {
2695 		lock_mutex_enter();
2696 		skip = UT_LIST_GET_LEN(table->locks) != 0;
2697 		lock_mutex_exit();
2698 		if (skip) {
2699 			/* We cannot drop tables that are locked by XA
2700 			PREPARE transactions. */
2701 			goto skip;
2702 		}
2703 	}
2704 
2705 	char* name = mem_strdup(table->name.m_name);
2706 
2707 	dict_table_close(table, FALSE, FALSE);
2708 
2709 	dberr_t err = row_drop_table_for_mysql_in_background(name);
2710 
2711 	ut_free(name);
2712 
2713 	if (err != DB_SUCCESS) {
2714 		/* If the DROP fails for some table, we return, and let the
2715 		main thread retry later */
2716 		return(n_tables + n_tables_dropped);
2717 	}
2718 
2719 	goto loop;
2720 }
2721 
2722 /*********************************************************************//**
2723 Get the background drop list length. NOTE: the caller must own the
2724 drop list mutex!
2725 @return how many tables in list */
2726 ulint
row_get_background_drop_list_len_low(void)2727 row_get_background_drop_list_len_low(void)
2728 /*======================================*/
2729 {
2730 	ulint	len;
2731 
2732 	mutex_enter(&row_drop_list_mutex);
2733 
2734 	ut_a(row_mysql_drop_list_inited);
2735 
2736 	len = UT_LIST_GET_LEN(row_mysql_drop_list);
2737 
2738 	mutex_exit(&row_drop_list_mutex);
2739 
2740 	return(len);
2741 }
2742 
2743 /** Drop garbage tables during recovery. */
2744 void
row_mysql_drop_garbage_tables()2745 row_mysql_drop_garbage_tables()
2746 {
2747 	mem_heap_t*	heap = mem_heap_create(FN_REFLEN);
2748 	btr_pcur_t	pcur;
2749 	mtr_t		mtr;
2750 	trx_t*		trx = trx_create();
2751 	trx->op_info = "dropping garbage tables";
2752 	row_mysql_lock_data_dictionary(trx);
2753 
2754 	mtr.start();
2755 	btr_pcur_open_at_index_side(
2756 		true, dict_table_get_first_index(dict_sys->sys_tables),
2757 		BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
2758 
2759 	for (;;) {
2760 		const rec_t*	rec;
2761 		const byte*	field;
2762 		ulint		len;
2763 		const char*	table_name;
2764 
2765 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2766 
2767 		if (!btr_pcur_is_on_user_rec(&pcur)) {
2768 			break;
2769 		}
2770 
2771 		rec = btr_pcur_get_rec(&pcur);
2772 		if (rec_get_deleted_flag(rec, 0)) {
2773 			continue;
2774 		}
2775 
2776 		field = rec_get_nth_field_old(rec, 0/*NAME*/, &len);
2777 		if (len == UNIV_SQL_NULL || len == 0) {
2778 			/* Corrupted SYS_TABLES.NAME */
2779 			continue;
2780 		}
2781 
2782 		table_name = mem_heap_strdupl(
2783 			heap,
2784 			reinterpret_cast<const char*>(field), len);
2785 		if (strstr(table_name, "/" TEMP_FILE_PREFIX "-")) {
2786 			btr_pcur_store_position(&pcur, &mtr);
2787 			btr_pcur_commit_specify_mtr(&pcur, &mtr);
2788 
2789 			if (dict_load_table(table_name,
2790 					    DICT_ERR_IGNORE_DROP)) {
2791 				row_drop_table_for_mysql(table_name, trx,
2792 							 SQLCOM_DROP_TABLE);
2793 				trx_commit_for_mysql(trx);
2794 			}
2795 
2796 			mtr.start();
2797 			btr_pcur_restore_position(BTR_SEARCH_LEAF,
2798 						  &pcur, &mtr);
2799 		}
2800 
2801 		mem_heap_empty(heap);
2802 	}
2803 
2804 	btr_pcur_close(&pcur);
2805 	mtr.commit();
2806 	row_mysql_unlock_data_dictionary(trx);
2807 	trx->free();
2808 	mem_heap_free(heap);
2809 }
2810 
2811 /*********************************************************************//**
2812 If a table is not yet in the drop list, adds the table to the list of tables
2813 which the master thread drops in background. We need this on Unix because in
2814 ALTER TABLE MySQL may call drop table even if the table has running queries on
2815 it. Also, if there are running foreign key checks on the table, we drop the
2816 table lazily.
2817 @return	whether background DROP TABLE was scheduled for the first time */
2818 static
2819 bool
row_add_table_to_background_drop_list(table_id_t table_id)2820 row_add_table_to_background_drop_list(table_id_t table_id)
2821 {
2822 	row_mysql_drop_t*	drop;
2823 	bool			added = true;
2824 
2825 	mutex_enter(&row_drop_list_mutex);
2826 
2827 	ut_a(row_mysql_drop_list_inited);
2828 
2829 	/* Look if the table already is in the drop list */
2830 	for (drop = UT_LIST_GET_FIRST(row_mysql_drop_list);
2831 	     drop != NULL;
2832 	     drop = UT_LIST_GET_NEXT(row_mysql_drop_list, drop)) {
2833 
2834 		if (drop->table_id == table_id) {
2835 			added = false;
2836 			goto func_exit;
2837 		}
2838 	}
2839 
2840 	drop = static_cast<row_mysql_drop_t*>(ut_malloc_nokey(sizeof *drop));
2841 	drop->table_id = table_id;
2842 
2843 	UT_LIST_ADD_LAST(row_mysql_drop_list, drop);
2844 
2845 	MONITOR_INC(MONITOR_BACKGROUND_DROP_TABLE);
2846 func_exit:
2847 	mutex_exit(&row_drop_list_mutex);
2848 	return added;
2849 }
2850 
2851 /** Reassigns the table identifier of a table.
2852 @param[in,out]	table	table
2853 @param[in,out]	trx	transaction
2854 @param[out]	new_id	new table id
2855 @return error code or DB_SUCCESS */
2856 static
2857 dberr_t
row_mysql_table_id_reassign(dict_table_t * table,trx_t * trx,table_id_t * new_id)2858 row_mysql_table_id_reassign(
2859 	dict_table_t*	table,
2860 	trx_t*		trx,
2861 	table_id_t*	new_id)
2862 {
2863 	dberr_t		err;
2864 	pars_info_t*	info	= pars_info_create();
2865 
2866 	dict_hdr_get_new_id(new_id, NULL, NULL, table, false);
2867 
2868 	pars_info_add_ull_literal(info, "old_id", table->id);
2869 	pars_info_add_ull_literal(info, "new_id", *new_id);
2870 
2871 	err = que_eval_sql(
2872 		info,
2873 		"PROCEDURE RENUMBER_TABLE_PROC () IS\n"
2874 		"BEGIN\n"
2875 		"UPDATE SYS_TABLES SET ID = :new_id\n"
2876 		" WHERE ID = :old_id;\n"
2877 		"UPDATE SYS_COLUMNS SET TABLE_ID = :new_id\n"
2878 		" WHERE TABLE_ID = :old_id;\n"
2879 		"UPDATE SYS_INDEXES SET TABLE_ID = :new_id\n"
2880 		" WHERE TABLE_ID = :old_id;\n"
2881 		"UPDATE SYS_VIRTUAL SET TABLE_ID = :new_id\n"
2882 		" WHERE TABLE_ID = :old_id;\n"
2883 		"END;\n", FALSE, trx);
2884 
2885 	return(err);
2886 }
2887 
2888 /*********************************************************************//**
2889 Setup the pre-requisites for DISCARD TABLESPACE. It will start the transaction,
2890 acquire the data dictionary lock in X mode and open the table.
2891 @return table instance or 0 if not found. */
2892 static
2893 dict_table_t*
row_discard_tablespace_begin(const char * name,trx_t * trx)2894 row_discard_tablespace_begin(
2895 /*=========================*/
2896 	const char*	name,	/*!< in: table name */
2897 	trx_t*		trx)	/*!< in: transaction handle */
2898 {
2899 	trx->op_info = "discarding tablespace";
2900 
2901 	trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
2902 
2903 	trx_start_if_not_started_xa(trx, true);
2904 
2905 	/* Serialize data dictionary operations with dictionary mutex:
2906 	this is to avoid deadlocks during data dictionary operations */
2907 
2908 	row_mysql_lock_data_dictionary(trx);
2909 
2910 	dict_table_t*	table;
2911 
2912 	table = dict_table_open_on_name(
2913 		name, TRUE, FALSE, DICT_ERR_IGNORE_FK_NOKEY);
2914 
2915 	if (table) {
2916 		dict_stats_wait_bg_to_stop_using_table(table, trx);
2917 		ut_a(!is_system_tablespace(table->space_id));
2918 		ut_ad(!table->n_foreign_key_checks_running);
2919 	}
2920 
2921 	return(table);
2922 }
2923 
2924 /*********************************************************************//**
2925 Do the foreign key constraint checks.
2926 @return DB_SUCCESS or error code. */
2927 static
2928 dberr_t
row_discard_tablespace_foreign_key_checks(const trx_t * trx,const dict_table_t * table)2929 row_discard_tablespace_foreign_key_checks(
2930 /*======================================*/
2931 	const trx_t*		trx,	/*!< in: transaction handle */
2932 	const dict_table_t*	table)	/*!< in: table to be discarded */
2933 {
2934 
2935 	if (srv_read_only_mode || !trx->check_foreigns) {
2936 		return(DB_SUCCESS);
2937 	}
2938 
2939 	/* Check if the table is referenced by foreign key constraints from
2940 	some other table (not the table itself) */
2941 	dict_foreign_set::const_iterator	it
2942 		= std::find_if(table->referenced_set.begin(),
2943 			       table->referenced_set.end(),
2944 			       dict_foreign_different_tables());
2945 
2946 	if (it == table->referenced_set.end()) {
2947 		return(DB_SUCCESS);
2948 	}
2949 
2950 	const dict_foreign_t*	foreign	= *it;
2951 	FILE*			ef	= dict_foreign_err_file;
2952 
2953 	ut_ad(foreign->foreign_table != table);
2954 	ut_ad(foreign->referenced_table == table);
2955 
2956 	/* We only allow discarding a referenced table if
2957 	FOREIGN_KEY_CHECKS is set to 0 */
2958 
2959 	mutex_enter(&dict_foreign_err_mutex);
2960 
2961 	rewind(ef);
2962 
2963 	ut_print_timestamp(ef);
2964 
2965 	fputs("  Cannot DISCARD table ", ef);
2966 	ut_print_name(ef, trx, table->name.m_name);
2967 	fputs("\n"
2968 	      "because it is referenced by ", ef);
2969 	ut_print_name(ef, trx, foreign->foreign_table_name);
2970 	putc('\n', ef);
2971 
2972 	mutex_exit(&dict_foreign_err_mutex);
2973 
2974 	return(DB_CANNOT_DROP_CONSTRAINT);
2975 }
2976 
2977 /*********************************************************************//**
2978 Cleanup after the DISCARD TABLESPACE operation.
2979 @return error code. */
2980 static
2981 dberr_t
row_discard_tablespace_end(trx_t * trx,dict_table_t * table,dberr_t err)2982 row_discard_tablespace_end(
2983 /*=======================*/
2984 	trx_t*		trx,	/*!< in/out: transaction handle */
2985 	dict_table_t*	table,	/*!< in/out: table to be discarded */
2986 	dberr_t		err)	/*!< in: error code */
2987 {
2988 	if (table != 0) {
2989 		dict_table_close(table, TRUE, FALSE);
2990 	}
2991 
2992 	DBUG_EXECUTE_IF("ib_discard_before_commit_crash",
2993 			log_write_up_to(LSN_MAX, true);
2994 			DBUG_SUICIDE(););
2995 
2996 	trx_commit_for_mysql(trx);
2997 
2998 	DBUG_EXECUTE_IF("ib_discard_after_commit_crash",
2999 			log_write_up_to(LSN_MAX, true);
3000 			DBUG_SUICIDE(););
3001 
3002 	row_mysql_unlock_data_dictionary(trx);
3003 
3004 	trx->op_info = "";
3005 
3006 	return(err);
3007 }
3008 
3009 /*********************************************************************//**
3010 Do the DISCARD TABLESPACE operation.
3011 @return DB_SUCCESS or error code. */
3012 static
3013 dberr_t
row_discard_tablespace(trx_t * trx,dict_table_t * table)3014 row_discard_tablespace(
3015 /*===================*/
3016 	trx_t*		trx,	/*!< in/out: transaction handle */
3017 	dict_table_t*	table)	/*!< in/out: table to be discarded */
3018 {
3019 	dberr_t		err;
3020 
3021 	/* How do we prevent crashes caused by ongoing operations on
3022 	the table? Old operations could try to access non-existent
3023 	pages. MySQL will block all DML on the table using MDL and a
3024 	DISCARD will not start unless all existing operations on the
3025 	table to be discarded are completed.
3026 
3027 	1) Acquire the data dictionary latch in X mode. To prevent any
3028 	internal operations that MySQL is not aware off and also for
3029 	the internal SQL parser.
3030 
3031 	2) Purge and rollback: we assign a new table id for the
3032 	table. Since purge and rollback look for the table based on
3033 	the table id, they see the table as 'dropped' and discard
3034 	their operations.
3035 
3036 	3) Insert buffer: we remove all entries for the tablespace in
3037 	the insert buffer tree. */
3038 
3039 	ibuf_delete_for_discarded_space(table->space_id);
3040 
3041 	table_id_t	new_id;
3042 
3043 	/* Set the TABLESPACE DISCARD flag in the table definition
3044 	on disk. */
3045 	err = row_import_update_discarded_flag(trx, table->id, true);
3046 
3047 	if (err != DB_SUCCESS) {
3048 		return(err);
3049 	}
3050 
3051 	/* Update the index root pages in the system tables, on disk */
3052 	err = row_import_update_index_root(trx, table, true);
3053 
3054 	if (err != DB_SUCCESS) {
3055 		return(err);
3056 	}
3057 
3058 	/* Drop all the FTS auxiliary tables. */
3059 	if (dict_table_has_fts_index(table)
3060 	    || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
3061 
3062 		fts_drop_tables(trx, table);
3063 	}
3064 
3065 	/* Assign a new space ID to the table definition so that purge
3066 	can ignore the changes. Update the system table on disk. */
3067 
3068 	err = row_mysql_table_id_reassign(table, trx, &new_id);
3069 
3070 	if (err != DB_SUCCESS) {
3071 		return(err);
3072 	}
3073 
3074 	/* Discard the physical file that is used for the tablespace. */
3075 	err = fil_delete_tablespace(table->space_id);
3076 	switch (err) {
3077 	case DB_IO_ERROR:
3078 		ib::warn() << "ALTER TABLE " << table->name
3079 			<< " DISCARD TABLESPACE failed to delete file";
3080 		break;
3081 	case DB_TABLESPACE_NOT_FOUND:
3082 		ib::warn() << "ALTER TABLE " << table->name
3083 			<< " DISCARD TABLESPACE failed to find tablespace";
3084 		break;
3085 	case DB_SUCCESS:
3086 		break;
3087 	default:
3088 		ut_error;
3089 	}
3090 
3091 	/* All persistent operations successful, update the
3092 	data dictionary memory cache. */
3093 
3094 	table->file_unreadable = true;
3095 	table->space = NULL;
3096 	table->flags2 |= DICT_TF2_DISCARDED;
3097 	dict_table_change_id_in_cache(table, new_id);
3098 
3099 	dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
3100 	if (index) index->remove_instant();
3101 
3102 	/* Reset the root page numbers. */
3103 	for (; index; index = UT_LIST_GET_NEXT(indexes, index)) {
3104 		index->page = FIL_NULL;
3105 	}
3106 
3107 	/* If the tablespace did not already exist or we couldn't
3108 	write to it, we treat that as a successful DISCARD. It is
3109 	unusable anyway. */
3110 	return DB_SUCCESS;
3111 }
3112 
3113 /*********************************************************************//**
3114 Discards the tablespace of a table which stored in an .ibd file. Discarding
3115 means that this function renames the .ibd file and assigns a new table id for
3116 the table. Also the file_unreadable flag is set.
3117 @return error code or DB_SUCCESS */
3118 dberr_t
row_discard_tablespace_for_mysql(const char * name,trx_t * trx)3119 row_discard_tablespace_for_mysql(
3120 /*=============================*/
3121 	const char*	name,	/*!< in: table name */
3122 	trx_t*		trx)	/*!< in: transaction handle */
3123 {
3124 	dberr_t		err;
3125 	dict_table_t*	table;
3126 
3127 	/* Open the table and start the transaction if not started. */
3128 
3129 	table = row_discard_tablespace_begin(name, trx);
3130 
3131 	if (table == 0) {
3132 		err = DB_TABLE_NOT_FOUND;
3133 	} else if (table->is_temporary()) {
3134 
3135 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3136 			    ER_CANNOT_DISCARD_TEMPORARY_TABLE);
3137 
3138 		err = DB_ERROR;
3139 
3140 	} else if (table->space_id == TRX_SYS_SPACE) {
3141 		char	table_name[MAX_FULL_NAME_LEN + 1];
3142 
3143 		innobase_format_name(
3144 			table_name, sizeof(table_name),
3145 			table->name.m_name);
3146 
3147 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3148 			    ER_TABLE_IN_SYSTEM_TABLESPACE, table_name);
3149 
3150 		err = DB_ERROR;
3151 
3152 	} else {
3153 		ut_ad(!table->n_foreign_key_checks_running);
3154 
3155 		bool fts_exist = (dict_table_has_fts_index(table)
3156 				  || DICT_TF2_FLAG_IS_SET(
3157 					  table, DICT_TF2_FTS_HAS_DOC_ID));
3158 
3159 		if (fts_exist) {
3160 			row_mysql_unlock_data_dictionary(trx);
3161 			fts_optimize_remove_table(table);
3162 			row_mysql_lock_data_dictionary(trx);
3163 		}
3164 
3165 		/* Do foreign key constraint checks. */
3166 
3167 		err = row_discard_tablespace_foreign_key_checks(trx, table);
3168 
3169 		if (err == DB_SUCCESS) {
3170 			err = row_discard_tablespace(trx, table);
3171 		}
3172 
3173 		if (fts_exist && err != DB_SUCCESS) {
3174 			fts_optimize_add_table(table);
3175 		}
3176 	}
3177 
3178 	return(row_discard_tablespace_end(trx, table, err));
3179 }
3180 
3181 /*********************************************************************//**
3182 Sets an exclusive lock on a table.
3183 @return error code or DB_SUCCESS */
3184 dberr_t
row_mysql_lock_table(trx_t * trx,dict_table_t * table,enum lock_mode mode,const char * op_info)3185 row_mysql_lock_table(
3186 /*=================*/
3187 	trx_t*		trx,		/*!< in/out: transaction */
3188 	dict_table_t*	table,		/*!< in: table to lock */
3189 	enum lock_mode	mode,		/*!< in: LOCK_X or LOCK_S */
3190 	const char*	op_info)	/*!< in: string for trx->op_info */
3191 {
3192 	mem_heap_t*	heap;
3193 	que_thr_t*	thr;
3194 	dberr_t		err;
3195 	sel_node_t*	node;
3196 
3197 	ut_ad(mode == LOCK_X || mode == LOCK_S);
3198 
3199 	heap = mem_heap_create(512);
3200 
3201 	trx->op_info = op_info;
3202 
3203 	node = sel_node_create(heap);
3204 	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
3205 	thr->graph->state = QUE_FORK_ACTIVE;
3206 
3207 	/* We use the select query graph as the dummy graph needed
3208 	in the lock module call */
3209 
3210 	thr = que_fork_get_first_thr(
3211 		static_cast<que_fork_t*>(que_node_get_parent(thr)));
3212 
3213 	que_thr_move_to_run_state_for_mysql(thr, trx);
3214 
3215 run_again:
3216 	thr->run_node = thr;
3217 	thr->prev_node = thr->common.parent;
3218 
3219 	err = lock_table(0, table, mode, thr);
3220 
3221 	trx->error_state = err;
3222 
3223 	if (err == DB_SUCCESS) {
3224 		que_thr_stop_for_mysql_no_error(thr, trx);
3225 	} else {
3226 		que_thr_stop_for_mysql(thr);
3227 
3228 		if (row_mysql_handle_errors(&err, trx, thr, NULL)) {
3229 			goto run_again;
3230 		}
3231 	}
3232 
3233 	que_graph_free(thr->graph);
3234 	trx->op_info = "";
3235 
3236 	return(err);
3237 }
3238 
3239 /** Drop ancillary FTS tables as part of dropping a table.
3240 @param[in,out]	table		Table cache entry
3241 @param[in,out]	trx		Transaction handle
3242 @return error code or DB_SUCCESS */
3243 UNIV_INLINE
3244 dberr_t
row_drop_ancillary_fts_tables(dict_table_t * table,trx_t * trx)3245 row_drop_ancillary_fts_tables(
3246 	dict_table_t*	table,
3247 	trx_t*		trx)
3248 {
3249 	/* Drop ancillary FTS tables */
3250 	if (dict_table_has_fts_index(table)
3251 	    || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
3252 
3253 		ut_ad(table->get_ref_count() == 0);
3254 		ut_ad(trx_is_started(trx));
3255 
3256 		dberr_t err = fts_drop_tables(trx, table);
3257 
3258 		if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
3259 			ib::error() << " Unable to remove ancillary FTS"
3260 				" tables for table "
3261 				<< table->name << " : " << err;
3262 
3263 			return(err);
3264 		}
3265 	}
3266 
3267 	/* The table->fts flag can be set on the table for which
3268 	the cluster index is being rebuilt. Such table might not have
3269 	DICT_TF2_FTS flag set. So keep this out of above
3270 	dict_table_has_fts_index condition */
3271 	if (table->fts != NULL) {
3272 		/* fts_que_graph_free_check_lock would try to acquire
3273 		dict mutex lock */
3274 		table->fts->dict_locked = true;
3275 
3276 		fts_free(table);
3277 	}
3278 
3279 	return(DB_SUCCESS);
3280 }
3281 
3282 /** Drop a table from the memory cache as part of dropping a table.
3283 @param[in]	tablename	A copy of table->name. Used when table == null
3284 @param[in,out]	table		Table cache entry
3285 @param[in,out]	trx		Transaction handle
3286 @return error code or DB_SUCCESS */
3287 UNIV_INLINE
3288 dberr_t
row_drop_table_from_cache(const char * tablename,dict_table_t * table,trx_t * trx)3289 row_drop_table_from_cache(
3290 	const char*	tablename,
3291 	dict_table_t*	table,
3292 	trx_t*		trx)
3293 {
3294 	dberr_t	err = DB_SUCCESS;
3295 	ut_ad(!table->is_temporary());
3296 
3297 	/* Remove the pointer to this table object from the list
3298 	of modified tables by the transaction because the object
3299 	is going to be destroyed below. */
3300 	trx->mod_tables.erase(table);
3301 
3302 	dict_table_remove_from_cache(table);
3303 
3304 	if (dict_load_table(tablename, DICT_ERR_IGNORE_FK_NOKEY)) {
3305 		ib::error() << "Not able to remove table "
3306 			<< ut_get_name(trx, tablename)
3307 			<< " from the dictionary cache!";
3308 		err = DB_ERROR;
3309 	}
3310 
3311 	return(err);
3312 }
3313 
3314 /** Drop a table for MySQL.
3315 If the data dictionary was not already locked by the transaction,
3316 the transaction will be committed.  Otherwise, the data dictionary
3317 will remain locked.
3318 @param[in]	name		Table name
3319 @param[in,out]	trx		Transaction handle
3320 @param[in]	sqlcom		type of SQL operation
3321 @param[in]	create_failed	true=create table failed
3322 				because e.g. foreign key column
3323 @param[in]	nonatomic	Whether it is permitted to release
3324 				and reacquire dict_operation_lock
3325 @return error code or DB_SUCCESS */
3326 dberr_t
row_drop_table_for_mysql(const char * name,trx_t * trx,enum_sql_command sqlcom,bool create_failed,bool nonatomic)3327 row_drop_table_for_mysql(
3328 	const char*		name,
3329 	trx_t*			trx,
3330 	enum_sql_command	sqlcom,
3331 	bool			create_failed,
3332 	bool			nonatomic)
3333 {
3334 	dberr_t		err;
3335 	dict_foreign_t*	foreign;
3336 	dict_table_t*	table;
3337 	char*		tablename		= NULL;
3338 	bool		locked_dictionary	= false;
3339 	pars_info_t*	info			= NULL;
3340 	mem_heap_t*	heap			= NULL;
3341 
3342 
3343 	DBUG_ENTER("row_drop_table_for_mysql");
3344 	DBUG_PRINT("row_drop_table_for_mysql", ("table: '%s'", name));
3345 
3346 	ut_a(name != NULL);
3347 
3348 	/* Serialize data dictionary operations with dictionary mutex:
3349 	no deadlocks can occur then in these operations */
3350 
3351 	trx->op_info = "dropping table";
3352 
3353 	if (trx->dict_operation_lock_mode != RW_X_LATCH) {
3354 		/* Prevent foreign key checks etc. while we are
3355 		dropping the table */
3356 
3357 		row_mysql_lock_data_dictionary(trx);
3358 
3359 		locked_dictionary = true;
3360 		nonatomic = true;
3361 	}
3362 
3363 	ut_ad(mutex_own(&dict_sys->mutex));
3364 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X));
3365 
3366 	table = dict_table_open_on_name(
3367 		name, TRUE, FALSE,
3368 		static_cast<dict_err_ignore_t>(
3369 			DICT_ERR_IGNORE_INDEX_ROOT
3370 			| DICT_ERR_IGNORE_CORRUPT));
3371 
3372 	if (!table) {
3373 		if (locked_dictionary) {
3374 			row_mysql_unlock_data_dictionary(trx);
3375 		}
3376 		trx->op_info = "";
3377 		DBUG_RETURN(DB_TABLE_NOT_FOUND);
3378 	}
3379 
3380 	const bool is_temp_name = strstr(table->name.m_name,
3381 					 "/" TEMP_FILE_PREFIX);
3382 
3383 	if (table->is_temporary()) {
3384 		ut_ad(table->space == fil_system.temp_space);
3385 		for (dict_index_t* index = dict_table_get_first_index(table);
3386 		     index != NULL;
3387 		     index = dict_table_get_next_index(index)) {
3388 			btr_free(page_id_t(SRV_TMP_SPACE_ID, index->page),
3389 				 univ_page_size);
3390 		}
3391 		/* Remove the pointer to this table object from the list
3392 		of modified tables by the transaction because the object
3393 		is going to be destroyed below. */
3394 		trx->mod_tables.erase(table);
3395 		table->release();
3396 		dict_table_remove_from_cache(table);
3397 		err = DB_SUCCESS;
3398 		goto funct_exit_all_freed;
3399 	}
3400 
3401 	/* This function is called recursively via fts_drop_tables(). */
3402 	if (!trx_is_started(trx)) {
3403 		trx_start_for_ddl(trx, TRX_DICT_OP_TABLE);
3404 	}
3405 
3406 	/* Turn on this drop bit before we could release the dictionary
3407 	latch */
3408 	table->to_be_dropped = true;
3409 
3410 	if (nonatomic) {
3411 		/* This trx did not acquire any locks on dictionary
3412 		table records yet. Thus it is safe to release and
3413 		reacquire the data dictionary latches. */
3414 		if (table->fts) {
3415 			ut_ad(!table->fts->add_wq);
3416 			ut_ad(lock_trx_has_sys_table_locks(trx) == 0);
3417 
3418 			for (;;) {
3419 				bool retry = false;
3420 				if (dict_fts_index_syncing(table)) {
3421 					retry = true;
3422 				}
3423 				if (!retry) {
3424 					break;
3425 				}
3426 				DICT_BG_YIELD(trx);
3427 			}
3428 			row_mysql_unlock_data_dictionary(trx);
3429 			fts_optimize_remove_table(table);
3430 			row_mysql_lock_data_dictionary(trx);
3431 		}
3432 
3433 		dict_stats_wait_bg_to_stop_using_table(table, trx);
3434 	}
3435 
3436 	/* make sure background stats thread is not running on the table */
3437 	ut_ad(!(table->stats_bg_flag & BG_STAT_IN_PROGRESS));
3438 	if (!table->no_rollback()) {
3439 		if (table->space != fil_system.sys_space) {
3440 			/* Delete the link file if used. */
3441 			if (DICT_TF_HAS_DATA_DIR(table->flags)) {
3442 				RemoteDatafile::delete_link_file(name);
3443 			}
3444 		}
3445 
3446 		dict_stats_recalc_pool_del(table);
3447 		dict_stats_defrag_pool_del(table, NULL);
3448 		if (btr_defragment_thread_active) {
3449 			/* During fts_drop_orphaned_tables() in
3450 			recv_recovery_rollback_active() the
3451 			btr_defragment_mutex has not yet been
3452 			initialized by btr_defragment_init(). */
3453 			btr_defragment_remove_table(table);
3454 		}
3455 
3456 		if (UNIV_LIKELY(!strstr(name, "/" TEMP_FILE_PREFIX_INNODB))) {
3457 			/* Remove any persistent statistics for this table,
3458 			in a separate transaction. */
3459 			char errstr[1024];
3460 			err = dict_stats_drop_table(name, errstr,
3461 						    sizeof errstr);
3462 			if (err != DB_SUCCESS) {
3463 				ib::warn() << errstr;
3464 			}
3465 		}
3466 	}
3467 
3468 	dict_table_prevent_eviction(table);
3469 	dict_table_close(table, TRUE, FALSE);
3470 
3471 	/* Check if the table is referenced by foreign key constraints from
3472 	some other table (not the table itself) */
3473 
3474 	if (!srv_read_only_mode && trx->check_foreigns) {
3475 
3476 		for (dict_foreign_set::iterator it
3477 			= table->referenced_set.begin();
3478 		     it != table->referenced_set.end();
3479 		     ++it) {
3480 
3481 			foreign = *it;
3482 
3483 			const bool	ref_ok = sqlcom == SQLCOM_DROP_DB
3484 				&& dict_tables_have_same_db(
3485 					name,
3486 					foreign->foreign_table_name_lookup);
3487 
3488 			/* We should allow dropping a referenced table if creating
3489 			that referenced table has failed for some reason. For example
3490 			if referenced table is created but it column types that are
3491 			referenced do not match. */
3492 			if (foreign->foreign_table != table &&
3493 			    !create_failed && !ref_ok) {
3494 
3495 				FILE*	ef	= dict_foreign_err_file;
3496 
3497 				/* We only allow dropping a referenced table
3498 				if FOREIGN_KEY_CHECKS is set to 0 */
3499 
3500 				err = DB_CANNOT_DROP_CONSTRAINT;
3501 
3502 				mutex_enter(&dict_foreign_err_mutex);
3503 				rewind(ef);
3504 				ut_print_timestamp(ef);
3505 
3506 				fputs("  Cannot drop table ", ef);
3507 				ut_print_name(ef, trx, name);
3508 				fputs("\n"
3509 				      "because it is referenced by ", ef);
3510 				ut_print_name(ef, trx,
3511 					      foreign->foreign_table_name);
3512 				putc('\n', ef);
3513 				mutex_exit(&dict_foreign_err_mutex);
3514 
3515 				goto funct_exit;
3516 			}
3517 		}
3518 	}
3519 
3520 	DBUG_EXECUTE_IF("row_drop_table_add_to_background", goto defer;);
3521 
3522 	/* TODO: could we replace the counter n_foreign_key_checks_running
3523 	with lock checks on the table? Acquire here an exclusive lock on the
3524 	table, and rewrite lock0lock.cc and the lock wait in srv0srv.cc so that
3525 	they can cope with the table having been dropped here? Foreign key
3526 	checks take an IS or IX lock on the table. */
3527 
3528 	if (table->n_foreign_key_checks_running > 0) {
3529 defer:
3530 		if (!is_temp_name) {
3531 			heap = mem_heap_create(FN_REFLEN);
3532 			const char* tmp_name
3533 				= dict_mem_create_temporary_tablename(
3534 					heap, table->name.m_name, table->id);
3535 			ib::info() << "Deferring DROP TABLE " << table->name
3536 				   << "; renaming to " << tmp_name;
3537 			err = row_rename_table_for_mysql(
3538 				table->name.m_name, tmp_name, trx,
3539 				false, false);
3540 		} else {
3541 			err = DB_SUCCESS;
3542 		}
3543 		if (err == DB_SUCCESS) {
3544 			row_add_table_to_background_drop_list(table->id);
3545 		}
3546 		goto funct_exit;
3547 	}
3548 
3549 	/* Remove all locks that are on the table or its records, if there
3550 	are no references to the table but it has record locks, we release
3551 	the record locks unconditionally. One use case is:
3552 
3553 		CREATE TABLE t2 (PRIMARY KEY (a)) SELECT * FROM t1;
3554 
3555 	If after the user transaction has done the SELECT and there is a
3556 	problem in completing the CREATE TABLE operation, MySQL will drop
3557 	the table. InnoDB will create a new background transaction to do the
3558 	actual drop, the trx instance that is passed to this function. To
3559 	preserve existing behaviour we remove the locks but ideally we
3560 	shouldn't have to. There should never be record locks on a table
3561 	that is going to be dropped. */
3562 
3563 	if (table->get_ref_count() > 0 || table->n_rec_locks > 0
3564 	    || lock_table_has_locks(table)) {
3565 		goto defer;
3566 	}
3567 
3568 	/* The "to_be_dropped" marks table that is to be dropped, but
3569 	has not been dropped, instead, was put in the background drop
3570 	list due to being used by concurrent DML operations. Clear it
3571 	here since there are no longer any concurrent activities on it,
3572 	and it is free to be dropped */
3573 	table->to_be_dropped = false;
3574 
3575 	switch (trx_get_dict_operation(trx)) {
3576 	case TRX_DICT_OP_NONE:
3577 		trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
3578 		trx->table_id = table->id;
3579 	case TRX_DICT_OP_TABLE:
3580 		break;
3581 	case TRX_DICT_OP_INDEX:
3582 		/* If the transaction was previously flagged as
3583 		TRX_DICT_OP_INDEX, we should be dropping auxiliary
3584 		tables for full-text indexes. */
3585 		ut_ad(strstr(table->name.m_name, "/FTS_"));
3586 	}
3587 
3588 	/* Mark all indexes unavailable in the data dictionary cache
3589 	before starting to drop the table. */
3590 
3591 	unsigned*	page_no;
3592 	unsigned*	page_nos;
3593 	heap = mem_heap_create(
3594 		200 + UT_LIST_GET_LEN(table->indexes) * sizeof *page_nos);
3595 	tablename = mem_heap_strdup(heap, name);
3596 
3597 	page_no = page_nos = static_cast<unsigned*>(
3598 		mem_heap_alloc(
3599 			heap,
3600 			UT_LIST_GET_LEN(table->indexes) * sizeof *page_no));
3601 
3602 	for (dict_index_t* index = dict_table_get_first_index(table);
3603 	     index != NULL;
3604 	     index = dict_table_get_next_index(index)) {
3605 		rw_lock_x_lock(dict_index_get_lock(index));
3606 		/* Save the page numbers so that we can restore them
3607 		if the operation fails. */
3608 		*page_no++ = index->page;
3609 		/* Mark the index unusable. */
3610 		index->page = FIL_NULL;
3611 		rw_lock_x_unlock(dict_index_get_lock(index));
3612 	}
3613 
3614 	/* Deleting a row from SYS_INDEXES table will invoke
3615 	dict_drop_index_tree(). */
3616 	info = pars_info_create();
3617 
3618 	pars_info_add_str_literal(info, "name", name);
3619 
3620 	if (sqlcom != SQLCOM_TRUNCATE
3621 	    && strchr(name, '/')
3622 	    && dict_table_get_low("SYS_FOREIGN")
3623 	    && dict_table_get_low("SYS_FOREIGN_COLS")) {
3624 		err = que_eval_sql(
3625 			info,
3626 			"PROCEDURE DROP_FOREIGN_PROC () IS\n"
3627 			"fid CHAR;\n"
3628 
3629 			"DECLARE CURSOR fk IS\n"
3630 			"SELECT ID FROM SYS_FOREIGN\n"
3631 			"WHERE FOR_NAME = :name\n"
3632 			"AND TO_BINARY(FOR_NAME) = TO_BINARY(:name)\n"
3633 			"FOR UPDATE;\n"
3634 
3635 			"BEGIN\n"
3636 			"OPEN fk;\n"
3637 			"WHILE 1 = 1 LOOP\n"
3638 			"  FETCH fk INTO fid;\n"
3639 			"  IF (SQL % NOTFOUND) THEN RETURN; END IF;\n"
3640 			"  DELETE FROM SYS_FOREIGN_COLS WHERE ID=fid;\n"
3641 			"  DELETE FROM SYS_FOREIGN WHERE ID=fid;\n"
3642 			"END LOOP;\n"
3643 			"CLOSE fk;\n"
3644 			"END;\n", FALSE, trx);
3645 		if (err == DB_SUCCESS) {
3646 			info = pars_info_create();
3647 			pars_info_add_str_literal(info, "name", name);
3648 			goto do_drop;
3649 		}
3650 	} else {
3651 do_drop:
3652 		if (dict_table_get_low("SYS_VIRTUAL")) {
3653 			err = que_eval_sql(
3654 				info,
3655 				"PROCEDURE DROP_VIRTUAL_PROC () IS\n"
3656 				"tid CHAR;\n"
3657 
3658 				"BEGIN\n"
3659 				"SELECT ID INTO tid FROM SYS_TABLES\n"
3660 				"WHERE NAME = :name FOR UPDATE;\n"
3661 				"IF (SQL % NOTFOUND) THEN RETURN;"
3662 				" END IF;\n"
3663 				"DELETE FROM SYS_VIRTUAL"
3664 				" WHERE TABLE_ID = tid;\n"
3665 				"END;\n", FALSE, trx);
3666 			if (err == DB_SUCCESS) {
3667 				info = pars_info_create();
3668 				pars_info_add_str_literal(
3669 					info, "name", name);
3670 			}
3671 		} else {
3672 			err = DB_SUCCESS;
3673 		}
3674 
3675 		err = err == DB_SUCCESS ? que_eval_sql(
3676 			info,
3677 			"PROCEDURE DROP_TABLE_PROC () IS\n"
3678 			"tid CHAR;\n"
3679 			"iid CHAR;\n"
3680 
3681 			"DECLARE CURSOR cur_idx IS\n"
3682 			"SELECT ID FROM SYS_INDEXES\n"
3683 			"WHERE TABLE_ID = tid FOR UPDATE;\n"
3684 
3685 			"BEGIN\n"
3686 			"SELECT ID INTO tid FROM SYS_TABLES\n"
3687 			"WHERE NAME = :name FOR UPDATE;\n"
3688 			"IF (SQL % NOTFOUND) THEN RETURN; END IF;\n"
3689 
3690 			"OPEN cur_idx;\n"
3691 			"WHILE 1 = 1 LOOP\n"
3692 			"  FETCH cur_idx INTO iid;\n"
3693 			"  IF (SQL % NOTFOUND) THEN EXIT; END IF;\n"
3694 			"  DELETE FROM SYS_FIELDS\n"
3695 			"  WHERE INDEX_ID = iid;\n"
3696 			"  DELETE FROM SYS_INDEXES\n"
3697 			"  WHERE ID = iid AND TABLE_ID = tid;\n"
3698 			"END LOOP;\n"
3699 			"CLOSE cur_idx;\n"
3700 
3701 			"DELETE FROM SYS_COLUMNS WHERE TABLE_ID=tid;\n"
3702 			"DELETE FROM SYS_TABLES WHERE NAME=:name;\n"
3703 
3704 			"END;\n", FALSE, trx) : err;
3705 
3706 		if (err == DB_SUCCESS && table->space
3707 		    && dict_table_get_low("SYS_TABLESPACES")
3708 		    && dict_table_get_low("SYS_DATAFILES")) {
3709 			info = pars_info_create();
3710 			pars_info_add_int4_literal(info, "id",
3711 						   lint(table->space_id));
3712 			err = que_eval_sql(
3713 				info,
3714 				"PROCEDURE DROP_SPACE_PROC () IS\n"
3715 				"BEGIN\n"
3716 				"DELETE FROM SYS_TABLESPACES\n"
3717 				"WHERE SPACE = :id;\n"
3718 				"DELETE FROM SYS_DATAFILES\n"
3719 				"WHERE SPACE = :id;\n"
3720 				"END;\n", FALSE, trx);
3721 		}
3722 	}
3723 
3724 	switch (err) {
3725 		fil_space_t* space;
3726 		char* filepath;
3727 	case DB_SUCCESS:
3728 		if (!table->no_rollback()) {
3729 			err = row_drop_ancillary_fts_tables(table, trx);
3730 			if (err != DB_SUCCESS) {
3731 				break;
3732 			}
3733 		}
3734 
3735 		space = table->space;
3736 		ut_ad(!space || space->id == table->space_id);
3737 		/* Determine the tablespace filename before we drop
3738 		dict_table_t. */
3739 		if (DICT_TF_HAS_DATA_DIR(table->flags)) {
3740 			dict_get_and_save_data_dir_path(table, true);
3741 			ut_ad(table->data_dir_path || !space);
3742 			filepath = space ? NULL : fil_make_filepath(
3743 				table->data_dir_path,
3744 				table->name.m_name, IBD,
3745 				table->data_dir_path != NULL);
3746 		} else {
3747 			filepath = space ? NULL : fil_make_filepath(
3748 				NULL, table->name.m_name, IBD, false);
3749 		}
3750 
3751 		/* Free the dict_table_t object. */
3752 		err = row_drop_table_from_cache(tablename, table, trx);
3753 		if (err != DB_SUCCESS) {
3754 			ut_free(filepath);
3755 			break;
3756 		}
3757 
3758 		/* Do not attempt to drop known-to-be-missing tablespaces,
3759 		nor the system tablespace. */
3760 		if (!space) {
3761 			fil_delete_file(filepath);
3762 			ut_free(filepath);
3763 			break;
3764 		}
3765 
3766 		ut_ad(!filepath);
3767 
3768 		if (space->id != TRX_SYS_SPACE) {
3769 			err = fil_delete_tablespace(space->id);
3770 		}
3771 		break;
3772 
3773 	case DB_OUT_OF_FILE_SPACE:
3774 		err = DB_MUST_GET_MORE_FILE_SPACE;
3775 		trx->error_state = err;
3776 		row_mysql_handle_errors(&err, trx, NULL, NULL);
3777 
3778 		/* raise error */
3779 		ut_error;
3780 		break;
3781 
3782 	case DB_TOO_MANY_CONCURRENT_TRXS:
3783 		/* Cannot even find a free slot for the
3784 		the undo log. We can directly exit here
3785 		and return the DB_TOO_MANY_CONCURRENT_TRXS
3786 		error. */
3787 
3788 	default:
3789 		/* This is some error we do not expect. Print
3790 		the error number and rollback the transaction */
3791 		ib::error() << "Unknown error code " << err << " while"
3792 			" dropping table: "
3793 			<< ut_get_name(trx, tablename) << ".";
3794 
3795 		trx->error_state = DB_SUCCESS;
3796 		trx_rollback_to_savepoint(trx, NULL);
3797 		trx->error_state = DB_SUCCESS;
3798 
3799 		/* Mark all indexes available in the data dictionary
3800 		cache again. */
3801 
3802 		page_no = page_nos;
3803 
3804 		for (dict_index_t* index = dict_table_get_first_index(table);
3805 		     index != NULL;
3806 		     index = dict_table_get_next_index(index)) {
3807 			rw_lock_x_lock(dict_index_get_lock(index));
3808 			ut_a(index->page == FIL_NULL);
3809 			index->page = *page_no++;
3810 			rw_lock_x_unlock(dict_index_get_lock(index));
3811 		}
3812 	}
3813 
3814 	if (err != DB_SUCCESS && table != NULL) {
3815 		/* Drop table has failed with error but as drop table is not
3816 		transaction safe we should mark the table as corrupted to avoid
3817 		unwarranted follow-up action on this table that can result
3818 		in more serious issues. */
3819 
3820 		table->corrupted = true;
3821 		for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
3822 		     index != NULL;
3823 		     index = UT_LIST_GET_NEXT(indexes, index)) {
3824 			dict_set_corrupted(index, trx, "DROP TABLE");
3825 		}
3826 	}
3827 
3828 funct_exit:
3829 	if (heap) {
3830 		mem_heap_free(heap);
3831 	}
3832 
3833 funct_exit_all_freed:
3834 	if (locked_dictionary) {
3835 
3836 		if (trx_is_started(trx)) {
3837 
3838 			trx_commit_for_mysql(trx);
3839 		}
3840 
3841 		/* Add the table to fts queue if drop table fails */
3842 		if (err != DB_SUCCESS && table->fts) {
3843 			fts_optimize_add_table(table);
3844 		}
3845 
3846 		row_mysql_unlock_data_dictionary(trx);
3847 	}
3848 
3849 	trx->op_info = "";
3850 
3851 	srv_wake_master_thread();
3852 
3853 	DBUG_RETURN(err);
3854 }
3855 
3856 /** Drop a table after failed CREATE TABLE. */
row_drop_table_after_create_fail(const char * name,trx_t * trx)3857 dberr_t row_drop_table_after_create_fail(const char* name, trx_t* trx)
3858 {
3859 	ib::warn() << "Dropping incompletely created " << name << " table.";
3860 	return row_drop_table_for_mysql(name, trx, SQLCOM_DROP_DB, true);
3861 }
3862 
3863 /*******************************************************************//**
3864 Drop all foreign keys in a database, see Bug#18942.
3865 Called at the end of row_drop_database_for_mysql().
3866 @return error code or DB_SUCCESS */
3867 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3868 dberr_t
drop_all_foreign_keys_in_db(const char * name,trx_t * trx)3869 drop_all_foreign_keys_in_db(
3870 /*========================*/
3871 	const char*	name,	/*!< in: database name which ends to '/' */
3872 	trx_t*		trx)	/*!< in: transaction handle */
3873 {
3874 	pars_info_t*	pinfo;
3875 	dberr_t		err;
3876 
3877 	ut_a(name[strlen(name) - 1] == '/');
3878 
3879 	pinfo = pars_info_create();
3880 
3881 	pars_info_add_str_literal(pinfo, "dbname", name);
3882 
3883 /** true if for_name is not prefixed with dbname */
3884 #define TABLE_NOT_IN_THIS_DB \
3885 "SUBSTR(for_name, 0, LENGTH(:dbname)) <> :dbname"
3886 
3887 	err = que_eval_sql(pinfo,
3888 			   "PROCEDURE DROP_ALL_FOREIGN_KEYS_PROC () IS\n"
3889 			   "foreign_id CHAR;\n"
3890 			   "for_name CHAR;\n"
3891 			   "found INT;\n"
3892 			   "DECLARE CURSOR cur IS\n"
3893 			   "SELECT ID, FOR_NAME FROM SYS_FOREIGN\n"
3894 			   "WHERE FOR_NAME >= :dbname\n"
3895 			   "LOCK IN SHARE MODE\n"
3896 			   "ORDER BY FOR_NAME;\n"
3897 			   "BEGIN\n"
3898 			   "found := 1;\n"
3899 			   "OPEN cur;\n"
3900 			   "WHILE found = 1 LOOP\n"
3901 			   "        FETCH cur INTO foreign_id, for_name;\n"
3902 			   "        IF (SQL % NOTFOUND) THEN\n"
3903 			   "                found := 0;\n"
3904 			   "        ELSIF (" TABLE_NOT_IN_THIS_DB ") THEN\n"
3905 			   "                found := 0;\n"
3906 			   "        ELSIF (1=1) THEN\n"
3907 			   "                DELETE FROM SYS_FOREIGN_COLS\n"
3908 			   "                WHERE ID = foreign_id;\n"
3909 			   "                DELETE FROM SYS_FOREIGN\n"
3910 			   "                WHERE ID = foreign_id;\n"
3911 			   "        END IF;\n"
3912 			   "END LOOP;\n"
3913 			   "CLOSE cur;\n"
3914 			   "COMMIT WORK;\n"
3915 			   "END;\n",
3916 			   FALSE, /* do not reserve dict mutex,
3917 				  we are already holding it */
3918 			   trx);
3919 
3920 	return(err);
3921 }
3922 
3923 /** Drop a database for MySQL.
3924 @param[in]	name	database name which ends at '/'
3925 @param[in]	trx	transaction handle
3926 @param[out]	found	number of dropped tables/partitions
3927 @return error code or DB_SUCCESS */
3928 dberr_t
row_drop_database_for_mysql(const char * name,trx_t * trx,ulint * found)3929 row_drop_database_for_mysql(
3930 	const char*	name,
3931 	trx_t*		trx,
3932 	ulint*		found)
3933 {
3934 	dict_table_t*	table;
3935 	char*		table_name;
3936 	dberr_t		err	= DB_SUCCESS;
3937 	ulint		namelen	= strlen(name);
3938 	bool		is_partition = false;
3939 
3940 	ut_ad(found != NULL);
3941 
3942 	DBUG_ENTER("row_drop_database_for_mysql");
3943 
3944 	DBUG_PRINT("row_drop_database_for_mysql", ("db: '%s'", name));
3945 
3946 	ut_a(name != NULL);
3947 	/* Assert DB name or partition name. */
3948 	if (name[namelen - 1] == '#') {
3949 		ut_ad(name[namelen - 2] != '/');
3950 		is_partition = true;
3951 		trx->op_info = "dropping partitions";
3952 	} else {
3953 		ut_a(name[namelen - 1] == '/');
3954 		trx->op_info = "dropping database";
3955 	}
3956 
3957 	*found = 0;
3958 
3959 	trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
3960 
3961 	trx_start_if_not_started_xa(trx, true);
3962 
3963 loop:
3964 	row_mysql_lock_data_dictionary(trx);
3965 
3966 	while ((table_name = dict_get_first_table_name_in_db(name))) {
3967 		/* Drop parent table if it is a fts aux table, to
3968 		avoid accessing dropped fts aux tables in information
3969 		scheam when parent table still exists.
3970 		Note: Drop parent table will drop fts aux tables. */
3971 		char*		parent_table_name = NULL;
3972 		table_id_t	table_id;
3973 		index_id_t	index_id;
3974 
3975 		if (fts_check_aux_table(
3976 				table_name, &table_id, &index_id)) {
3977 			dict_table_t* parent_table = dict_table_open_on_id(
3978 					table_id, TRUE, DICT_TABLE_OP_NORMAL);
3979 			if (parent_table != NULL) {
3980 				parent_table_name = mem_strdupl(
3981 					parent_table->name.m_name,
3982 					strlen(parent_table->name.m_name));
3983 				dict_table_close(parent_table, TRUE, FALSE);
3984 			}
3985 		}
3986 
3987 		if (parent_table_name != NULL) {
3988 			ut_free(table_name);
3989 			table_name = parent_table_name;
3990 		}
3991 
3992 		ut_a(memcmp(table_name, name, namelen) == 0);
3993 
3994 		table = dict_table_open_on_name(
3995 			table_name, TRUE, FALSE, static_cast<dict_err_ignore_t>(
3996 				DICT_ERR_IGNORE_INDEX_ROOT
3997 				| DICT_ERR_IGNORE_CORRUPT));
3998 
3999 		if (!table) {
4000 			ib::error() << "Cannot load table " << table_name
4001 				<< " from InnoDB internal data dictionary"
4002 				" during drop database";
4003 			ut_free(table_name);
4004 			err = DB_TABLE_NOT_FOUND;
4005 			break;
4006 
4007 		}
4008 
4009 		if (!table->name.is_temporary()) {
4010 			/* There could be orphan temp tables left from
4011 			interrupted alter table. Leave them, and handle
4012 			the rest.*/
4013 			if (table->can_be_evicted
4014 			    && (name[namelen - 1] != '#')) {
4015 				ib::warn() << "Orphan table encountered during"
4016 					" DROP DATABASE. This is possible if '"
4017 					<< table->name << ".frm' was lost.";
4018 			}
4019 
4020 			if (!table->is_readable() && !table->space) {
4021 				ib::warn() << "Missing .ibd file for table "
4022 					<< table->name << ".";
4023 			}
4024 		}
4025 
4026 		dict_table_close(table, TRUE, FALSE);
4027 
4028 		/* The dict_table_t object must not be accessed before
4029 		dict_table_open() or after dict_table_close(). But this is OK
4030 		if we are holding, the dict_sys->mutex. */
4031 		ut_ad(mutex_own(&dict_sys->mutex));
4032 
4033 		/* Disable statistics on the found table. */
4034 		if (!dict_stats_stop_bg(table)) {
4035 			row_mysql_unlock_data_dictionary(trx);
4036 
4037 			os_thread_sleep(250000);
4038 
4039 			ut_free(table_name);
4040 
4041 			goto loop;
4042 		}
4043 
4044 		/* Wait until MySQL does not have any queries running on
4045 		the table */
4046 
4047 		if (table->get_ref_count() > 0) {
4048 			row_mysql_unlock_data_dictionary(trx);
4049 
4050 			ib::warn() << "MySQL is trying to drop database "
4051 				<< ut_get_name(trx, name) << " though"
4052 				" there are still open handles to table "
4053 				<< table->name << ".";
4054 
4055 			os_thread_sleep(1000000);
4056 
4057 			ut_free(table_name);
4058 
4059 			goto loop;
4060 		}
4061 
4062 		err = row_drop_table_for_mysql(
4063 			table_name, trx, SQLCOM_DROP_DB);
4064 		trx_commit_for_mysql(trx);
4065 
4066 		if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
4067 			ib::error() << "DROP DATABASE "
4068 				<< ut_get_name(trx, name) << " failed"
4069 				" with error (" << err << ") for"
4070 				" table " << ut_get_name(trx, table_name);
4071 			ut_free(table_name);
4072 			break;
4073 		}
4074 
4075 		ut_free(table_name);
4076 		(*found)++;
4077 	}
4078 
4079 	/* Partitioning does not yet support foreign keys. */
4080 	if (err == DB_SUCCESS && !is_partition) {
4081 		/* after dropping all tables try to drop all leftover
4082 		foreign keys in case orphaned ones exist */
4083 		err = drop_all_foreign_keys_in_db(name, trx);
4084 
4085 		if (err != DB_SUCCESS) {
4086 			const std::string&	db = ut_get_name(trx, name);
4087 			ib::error() << "DROP DATABASE " << db << " failed with"
4088 				" error " << err << " while dropping all"
4089 				" foreign keys";
4090 		}
4091 	}
4092 
4093 	trx_commit_for_mysql(trx);
4094 
4095 	row_mysql_unlock_data_dictionary(trx);
4096 
4097 	trx->op_info = "";
4098 
4099 	DBUG_RETURN(err);
4100 }
4101 
4102 /****************************************************************//**
4103 Delete a single constraint.
4104 @return error code or DB_SUCCESS */
4105 static MY_ATTRIBUTE((nonnull, warn_unused_result))
4106 dberr_t
row_delete_constraint_low(const char * id,trx_t * trx)4107 row_delete_constraint_low(
4108 /*======================*/
4109 	const char*	id,		/*!< in: constraint id */
4110 	trx_t*		trx)		/*!< in: transaction handle */
4111 {
4112 	pars_info_t*	info = pars_info_create();
4113 
4114 	pars_info_add_str_literal(info, "id", id);
4115 
4116 	return(que_eval_sql(info,
4117 			    "PROCEDURE DELETE_CONSTRAINT () IS\n"
4118 			    "BEGIN\n"
4119 			    "DELETE FROM SYS_FOREIGN_COLS WHERE ID = :id;\n"
4120 			    "DELETE FROM SYS_FOREIGN WHERE ID = :id;\n"
4121 			    "END;\n"
4122 			    , FALSE, trx));
4123 }
4124 
4125 /****************************************************************//**
4126 Delete a single constraint.
4127 @return error code or DB_SUCCESS */
4128 static MY_ATTRIBUTE((nonnull, warn_unused_result))
4129 dberr_t
row_delete_constraint(const char * id,const char * database_name,mem_heap_t * heap,trx_t * trx)4130 row_delete_constraint(
4131 /*==================*/
4132 	const char*	id,		/*!< in: constraint id */
4133 	const char*	database_name,	/*!< in: database name, with the
4134 					trailing '/' */
4135 	mem_heap_t*	heap,		/*!< in: memory heap */
4136 	trx_t*		trx)		/*!< in: transaction handle */
4137 {
4138 	dberr_t	err;
4139 
4140 	/* New format constraints have ids <databasename>/<constraintname>. */
4141 	err = row_delete_constraint_low(
4142 		mem_heap_strcat(heap, database_name, id), trx);
4143 
4144 	if ((err == DB_SUCCESS) && !strchr(id, '/')) {
4145 		/* Old format < 4.0.18 constraints have constraint ids
4146 		NUMBER_NUMBER. We only try deleting them if the
4147 		constraint name does not contain a '/' character, otherwise
4148 		deleting a new format constraint named 'foo/bar' from
4149 		database 'baz' would remove constraint 'bar' from database
4150 		'foo', if it existed. */
4151 
4152 		err = row_delete_constraint_low(id, trx);
4153 	}
4154 
4155 	return(err);
4156 }
4157 
4158 /*********************************************************************//**
4159 Renames a table for MySQL.
4160 @return error code or DB_SUCCESS */
4161 dberr_t
row_rename_table_for_mysql(const char * old_name,const char * new_name,trx_t * trx,bool commit,bool use_fk)4162 row_rename_table_for_mysql(
4163 /*=======================*/
4164 	const char*	old_name,	/*!< in: old table name */
4165 	const char*	new_name,	/*!< in: new table name */
4166 	trx_t*		trx,		/*!< in/out: transaction */
4167 	bool		commit,		/*!< in: whether to commit trx */
4168 	bool		use_fk)		/*!< in: whether to parse and enforce
4169 					FOREIGN KEY constraints */
4170 {
4171 	dict_table_t*	table			= NULL;
4172 	ibool		dict_locked		= FALSE;
4173 	dberr_t		err			= DB_ERROR;
4174 	mem_heap_t*	heap			= NULL;
4175 	const char**	constraints_to_drop	= NULL;
4176 	ulint		n_constraints_to_drop	= 0;
4177 	ibool		old_is_tmp, new_is_tmp;
4178 	pars_info_t*	info			= NULL;
4179 	int		retry;
4180 	bool		aux_fts_rename		= false;
4181 	char*		is_part 		= NULL;
4182 
4183 	ut_a(old_name != NULL);
4184 	ut_a(new_name != NULL);
4185 	ut_ad(trx->state == TRX_STATE_ACTIVE);
4186 
4187 	if (high_level_read_only) {
4188 		return(DB_READ_ONLY);
4189 
4190 	} else if (row_mysql_is_system_table(new_name)) {
4191 
4192 		ib::error() << "Trying to create a MySQL system table "
4193 			<< new_name << " of type InnoDB. MySQL system tables"
4194 			" must be of the MyISAM type!";
4195 
4196 		goto funct_exit;
4197 	}
4198 
4199 	trx->op_info = "renaming table";
4200 
4201 	old_is_tmp = dict_table_t::is_temporary_name(old_name);
4202 	new_is_tmp = dict_table_t::is_temporary_name(new_name);
4203 
4204 	dict_locked = trx->dict_operation_lock_mode == RW_X_LATCH;
4205 
4206 	table = dict_table_open_on_name(old_name, dict_locked, FALSE,
4207 					DICT_ERR_IGNORE_FK_NOKEY);
4208 
4209 	/* We look for pattern #P# to see if the table is partitioned
4210 	MySQL table. */
4211 #ifdef __WIN__
4212 	is_part = strstr((char *)old_name, (char *)"#p#");
4213 #else
4214 	is_part = strstr((char *)old_name, (char *)"#P#");
4215 #endif /* __WIN__ */
4216 
4217 	/* MySQL partition engine hard codes the file name
4218 	separator as "#P#". The text case is fixed even if
4219 	lower_case_table_names is set to 1 or 2. This is true
4220 	for sub-partition names as well. InnoDB always
4221 	normalises file names to lower case on Windows, this
4222 	can potentially cause problems when copying/moving
4223 	tables between platforms.
4224 
4225 	1) If boot against an installation from Windows
4226 	platform, then its partition table name could
4227 	be all be in lower case in system tables. So we
4228 	will need to check lower case name when load table.
4229 
4230 	2) If  we boot an installation from other case
4231 	sensitive platform in Windows, we might need to
4232 	check the existence of table name without lowering
4233 	case them in the system table. */
4234 	if (!table &&
4235 	    is_part &&
4236 	    innobase_get_lower_case_table_names() == 1) {
4237 		char par_case_name[MAX_FULL_NAME_LEN + 1];
4238 #ifndef __WIN__
4239 		/* Check for the table using lower
4240 		case name, including the partition
4241 		separator "P" */
4242 		memcpy(par_case_name, old_name,
4243 			strlen(old_name));
4244 		par_case_name[strlen(old_name)] = 0;
4245 		innobase_casedn_str(par_case_name);
4246 #else
4247 		/* On Windows platfrom, check
4248 		whether there exists table name in
4249 		system table whose name is
4250 		not being normalized to lower case */
4251 		normalize_table_name_c_low(
4252 			par_case_name, old_name, FALSE);
4253 #endif
4254 		table = dict_table_open_on_name(par_case_name, dict_locked, FALSE,
4255 						DICT_ERR_IGNORE_FK_NOKEY);
4256 	}
4257 
4258 	if (!table) {
4259 		err = DB_TABLE_NOT_FOUND;
4260 		goto funct_exit;
4261 
4262 	} else if (!table->is_readable() && !table->space
4263 		   && !(table->flags2 & DICT_TF2_DISCARDED)) {
4264 
4265 		err = DB_TABLE_NOT_FOUND;
4266 
4267 		ib::error() << "Table " << old_name << " does not have an .ibd"
4268 			" file in the database directory. "
4269 			<< TROUBLESHOOTING_MSG;
4270 
4271 		goto funct_exit;
4272 
4273 	} else if (use_fk && !old_is_tmp && new_is_tmp) {
4274 		/* MySQL is doing an ALTER TABLE command and it renames the
4275 		original table to a temporary table name. We want to preserve
4276 		the original foreign key constraint definitions despite the
4277 		name change. An exception is those constraints for which
4278 		the ALTER TABLE contained DROP FOREIGN KEY <foreign key id>.*/
4279 
4280 		heap = mem_heap_create(100);
4281 
4282 		err = dict_foreign_parse_drop_constraints(
4283 			heap, trx, table, &n_constraints_to_drop,
4284 			&constraints_to_drop);
4285 
4286 		if (err != DB_SUCCESS) {
4287 			goto funct_exit;
4288 		}
4289 	}
4290 
4291 	/* Is a foreign key check running on this table? */
4292 	for (retry = 0; retry < 100
4293 	     && table->n_foreign_key_checks_running > 0; ++retry) {
4294 		row_mysql_unlock_data_dictionary(trx);
4295 		os_thread_yield();
4296 		row_mysql_lock_data_dictionary(trx);
4297 	}
4298 
4299 	if (table->n_foreign_key_checks_running > 0) {
4300 		ib::error() << "In ALTER TABLE "
4301 			<< ut_get_name(trx, old_name)
4302 			<< " a FOREIGN KEY check is running. Cannot rename"
4303 			" table.";
4304 		err = DB_TABLE_IN_FK_CHECK;
4305 		goto funct_exit;
4306 	}
4307 
4308 	if (!table->is_temporary()) {
4309 		err = trx_undo_report_rename(trx, table);
4310 
4311 		if (err != DB_SUCCESS) {
4312 			goto funct_exit;
4313 		}
4314 	}
4315 
4316 	/* We use the private SQL parser of Innobase to generate the query
4317 	graphs needed in updating the dictionary data from system tables. */
4318 
4319 	info = pars_info_create();
4320 
4321 	pars_info_add_str_literal(info, "new_table_name", new_name);
4322 	pars_info_add_str_literal(info, "old_table_name", old_name);
4323 
4324 	err = que_eval_sql(info,
4325 			   "PROCEDURE RENAME_TABLE () IS\n"
4326 			   "BEGIN\n"
4327 			   "UPDATE SYS_TABLES"
4328 			   " SET NAME = :new_table_name\n"
4329 			   " WHERE NAME = :old_table_name;\n"
4330 			   "END;\n"
4331 			   , FALSE, trx);
4332 
4333 	ut_ad(err != DB_DUPLICATE_KEY);
4334 
4335 	/* SYS_TABLESPACES and SYS_DATAFILES need to be updated if
4336 	the table is in a single-table tablespace. */
4337 	if (err != DB_SUCCESS || !dict_table_is_file_per_table(table)) {
4338 	} else if (table->space) {
4339 		/* If old path and new path are the same means tablename
4340 		has not changed and only the database name holding the table
4341 		has changed so we need to make the complete filepath again. */
4342 		char*	new_path = dict_tables_have_same_db(old_name, new_name)
4343 			? os_file_make_new_pathname(
4344 				table->space->chain.start->name, new_name)
4345 			: fil_make_filepath(NULL, new_name, IBD, false);
4346 
4347 		info = pars_info_create();
4348 
4349 		pars_info_add_str_literal(info, "new_table_name", new_name);
4350 		pars_info_add_str_literal(info, "new_path_name", new_path);
4351 		pars_info_add_int4_literal(info, "space_id", table->space_id);
4352 
4353 		err = que_eval_sql(info,
4354 				   "PROCEDURE RENAME_SPACE () IS\n"
4355 				   "BEGIN\n"
4356 				   "UPDATE SYS_TABLESPACES"
4357 				   " SET NAME = :new_table_name\n"
4358 				   " WHERE SPACE = :space_id;\n"
4359 				   "UPDATE SYS_DATAFILES"
4360 				   " SET PATH = :new_path_name\n"
4361 				   " WHERE SPACE = :space_id;\n"
4362 				   "END;\n"
4363 				   , FALSE, trx);
4364 
4365 		ut_free(new_path);
4366 	}
4367 	if (err != DB_SUCCESS) {
4368 		goto end;
4369 	}
4370 
4371 	if (!new_is_tmp) {
4372 		/* Rename all constraints. */
4373 		char	new_table_name[MAX_TABLE_NAME_LEN + 1];
4374 		char	old_table_utf8[MAX_TABLE_NAME_LEN + 1];
4375 		uint	errors = 0;
4376 
4377 		strncpy(old_table_utf8, old_name, MAX_TABLE_NAME_LEN);
4378 		old_table_utf8[MAX_TABLE_NAME_LEN] = '\0';
4379 		innobase_convert_to_system_charset(
4380 			strchr(old_table_utf8, '/') + 1,
4381 			strchr(old_name, '/') +1,
4382 			MAX_TABLE_NAME_LEN, &errors);
4383 
4384 		if (errors) {
4385 			/* Table name could not be converted from charset
4386 			my_charset_filename to UTF-8. This means that the
4387 			table name is already in UTF-8 (#mysql#50). */
4388 			strncpy(old_table_utf8, old_name, MAX_TABLE_NAME_LEN);
4389 			old_table_utf8[MAX_TABLE_NAME_LEN] = '\0';
4390 		}
4391 
4392 		info = pars_info_create();
4393 
4394 		pars_info_add_str_literal(info, "new_table_name", new_name);
4395 		pars_info_add_str_literal(info, "old_table_name", old_name);
4396 		pars_info_add_str_literal(info, "old_table_name_utf8",
4397 					  old_table_utf8);
4398 
4399 		strncpy(new_table_name, new_name, MAX_TABLE_NAME_LEN);
4400 		new_table_name[MAX_TABLE_NAME_LEN] = '\0';
4401 		innobase_convert_to_system_charset(
4402 			strchr(new_table_name, '/') + 1,
4403 			strchr(new_name, '/') +1,
4404 			MAX_TABLE_NAME_LEN, &errors);
4405 
4406 		if (errors) {
4407 			/* Table name could not be converted from charset
4408 			my_charset_filename to UTF-8. This means that the
4409 			table name is already in UTF-8 (#mysql#50). */
4410 			strncpy(new_table_name, new_name, MAX_TABLE_NAME_LEN);
4411 			new_table_name[MAX_TABLE_NAME_LEN] = '\0';
4412 		}
4413 
4414 		pars_info_add_str_literal(info, "new_table_utf8", new_table_name);
4415 
4416 		err = que_eval_sql(
4417 			info,
4418 			"PROCEDURE RENAME_CONSTRAINT_IDS () IS\n"
4419 			"gen_constr_prefix CHAR;\n"
4420 			"new_db_name CHAR;\n"
4421 			"foreign_id CHAR;\n"
4422 			"new_foreign_id CHAR;\n"
4423 			"old_db_name_len INT;\n"
4424 			"old_t_name_len INT;\n"
4425 			"new_db_name_len INT;\n"
4426 			"id_len INT;\n"
4427 			"offset INT;\n"
4428 			"found INT;\n"
4429 			"BEGIN\n"
4430 			"found := 1;\n"
4431 			"old_db_name_len := INSTR(:old_table_name, '/')-1;\n"
4432 			"new_db_name_len := INSTR(:new_table_name, '/')-1;\n"
4433 			"new_db_name := SUBSTR(:new_table_name, 0,\n"
4434 			"                      new_db_name_len);\n"
4435 			"old_t_name_len := LENGTH(:old_table_name);\n"
4436 			"gen_constr_prefix := CONCAT(:old_table_name_utf8,\n"
4437 			"                            '_ibfk_');\n"
4438 			"WHILE found = 1 LOOP\n"
4439 			"       SELECT ID INTO foreign_id\n"
4440 			"        FROM SYS_FOREIGN\n"
4441 			"        WHERE FOR_NAME = :old_table_name\n"
4442 			"         AND TO_BINARY(FOR_NAME)\n"
4443 			"           = TO_BINARY(:old_table_name)\n"
4444 			"         LOCK IN SHARE MODE;\n"
4445 			"       IF (SQL % NOTFOUND) THEN\n"
4446 			"        found := 0;\n"
4447 			"       ELSE\n"
4448 			"        UPDATE SYS_FOREIGN\n"
4449 			"        SET FOR_NAME = :new_table_name\n"
4450 			"         WHERE ID = foreign_id;\n"
4451 			"        id_len := LENGTH(foreign_id);\n"
4452 			"        IF (INSTR(foreign_id, '/') > 0) THEN\n"
4453 			"               IF (INSTR(foreign_id,\n"
4454 			"                         gen_constr_prefix) > 0)\n"
4455 			"               THEN\n"
4456                         "                offset := INSTR(foreign_id, '_ibfk_') - 1;\n"
4457 			"                new_foreign_id :=\n"
4458 			"                CONCAT(:new_table_utf8,\n"
4459 			"                SUBSTR(foreign_id, offset,\n"
4460 			"                       id_len - offset));\n"
4461 			"               ELSE\n"
4462 			"                new_foreign_id :=\n"
4463 			"                CONCAT(new_db_name,\n"
4464 			"                SUBSTR(foreign_id,\n"
4465 			"                       old_db_name_len,\n"
4466 			"                       id_len - old_db_name_len));\n"
4467 			"               END IF;\n"
4468 			"               UPDATE SYS_FOREIGN\n"
4469 			"                SET ID = new_foreign_id\n"
4470 			"                WHERE ID = foreign_id;\n"
4471 			"               UPDATE SYS_FOREIGN_COLS\n"
4472 			"                SET ID = new_foreign_id\n"
4473 			"                WHERE ID = foreign_id;\n"
4474 			"        END IF;\n"
4475 			"       END IF;\n"
4476 			"END LOOP;\n"
4477 			"UPDATE SYS_FOREIGN SET REF_NAME = :new_table_name\n"
4478 			"WHERE REF_NAME = :old_table_name\n"
4479 			"  AND TO_BINARY(REF_NAME)\n"
4480 			"    = TO_BINARY(:old_table_name);\n"
4481 			"END;\n"
4482 			, FALSE, trx);
4483 
4484 	} else if (n_constraints_to_drop > 0) {
4485 		/* Drop some constraints of tmp tables. */
4486 
4487 		ulint	db_name_len = dict_get_db_name_len(old_name) + 1;
4488 		char*	db_name = mem_heap_strdupl(heap, old_name,
4489 						   db_name_len);
4490 		ulint	i;
4491 
4492 		for (i = 0; i < n_constraints_to_drop; i++) {
4493 			err = row_delete_constraint(constraints_to_drop[i],
4494 						    db_name, heap, trx);
4495 
4496 			if (err != DB_SUCCESS) {
4497 				break;
4498 			}
4499 		}
4500 	}
4501 
4502 	if (err == DB_SUCCESS
4503 	    && (dict_table_has_fts_index(table)
4504 	    || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID))
4505 	    && !dict_tables_have_same_db(old_name, new_name)) {
4506 		err = fts_rename_aux_tables(table, new_name, trx);
4507 		if (err != DB_TABLE_NOT_FOUND) {
4508 			aux_fts_rename = true;
4509 		}
4510 	}
4511 
4512 end:
4513 	if (err != DB_SUCCESS) {
4514 		if (err == DB_DUPLICATE_KEY) {
4515 			ib::error() << "Possible reasons:";
4516 			ib::error() << "(1) Table rename would cause two"
4517 				" FOREIGN KEY constraints to have the same"
4518 				" internal name in case-insensitive"
4519 				" comparison.";
4520 			ib::error() << "(2) Table "
4521 				<< ut_get_name(trx, new_name)
4522 				<< " exists in the InnoDB internal data"
4523 				" dictionary though MySQL is trying to rename"
4524 				" table " << ut_get_name(trx, old_name)
4525 				<< " to it. Have you deleted the .frm file and"
4526 				" not used DROP TABLE?";
4527 			ib::info() << TROUBLESHOOTING_MSG;
4528 			ib::error() << "If table "
4529 				<< ut_get_name(trx, new_name)
4530 				<< " is a temporary table #sql..., then"
4531 				" it can be that there are still queries"
4532 				" running on the table, and it will be dropped"
4533 				" automatically when the queries end. You can"
4534 				" drop the orphaned table inside InnoDB by"
4535 				" creating an InnoDB table with the same name"
4536 				" in another database and copying the .frm file"
4537 				" to the current database. Then MySQL thinks"
4538 				" the table exists, and DROP TABLE will"
4539 				" succeed.";
4540 		}
4541 		trx->error_state = DB_SUCCESS;
4542 		trx_rollback_to_savepoint(trx, NULL);
4543 		trx->error_state = DB_SUCCESS;
4544 	} else {
4545 		/* The following call will also rename the .ibd data file if
4546 		the table is stored in a single-table tablespace */
4547 
4548 		err = dict_table_rename_in_cache(
4549 			table, new_name, !new_is_tmp);
4550 		if (err != DB_SUCCESS) {
4551 			trx->error_state = DB_SUCCESS;
4552 			trx_rollback_to_savepoint(trx, NULL);
4553 			trx->error_state = DB_SUCCESS;
4554 			goto funct_exit;
4555 		}
4556 
4557 		/* In case of copy alter, template db_name and
4558 		table_name should be renamed only for newly
4559 		created table. */
4560 		if (table->vc_templ != NULL && !new_is_tmp) {
4561 			innobase_rename_vc_templ(table);
4562 		}
4563 
4564 		/* We only want to switch off some of the type checking in
4565 		an ALTER TABLE...ALGORITHM=COPY, not in a RENAME. */
4566 		dict_names_t	fk_tables;
4567 
4568 		err = dict_load_foreigns(
4569 			new_name, NULL,
4570 			false, !old_is_tmp || trx->check_foreigns,
4571 			DICT_ERR_IGNORE_NONE, fk_tables);
4572 
4573 		if (err != DB_SUCCESS) {
4574 
4575 			if (old_is_tmp) {
4576 				/* In case of copy alter, ignore the
4577 				loading of foreign key constraint
4578 				when foreign_key_check is disabled */
4579 				ib::error_or_warn(trx->check_foreigns)
4580 					<< "In ALTER TABLE "
4581 					<< ut_get_name(trx, new_name)
4582 					<< " has or is referenced in foreign"
4583 					" key constraints which are not"
4584 					" compatible with the new table"
4585 					" definition.";
4586 				if (!trx->check_foreigns) {
4587 					err = DB_SUCCESS;
4588 					goto funct_exit;
4589 				}
4590 			} else {
4591 				ib::error() << "In RENAME TABLE table "
4592 					<< ut_get_name(trx, new_name)
4593 					<< " is referenced in foreign key"
4594 					" constraints which are not compatible"
4595 					" with the new table definition.";
4596 			}
4597 
4598 			ut_a(DB_SUCCESS == dict_table_rename_in_cache(
4599 				table, old_name, FALSE));
4600 			trx->error_state = DB_SUCCESS;
4601 			trx_rollback_to_savepoint(trx, NULL);
4602 			trx->error_state = DB_SUCCESS;
4603 		}
4604 
4605 		/* Check whether virtual column or stored column affects
4606 		the foreign key constraint of the table. */
4607 		if (dict_foreigns_has_s_base_col(
4608 				table->foreign_set, table)) {
4609 			err = DB_NO_FK_ON_S_BASE_COL;
4610 			ut_a(DB_SUCCESS == dict_table_rename_in_cache(
4611 				table, old_name, FALSE));
4612 			trx->error_state = DB_SUCCESS;
4613 			trx_rollback_to_savepoint(trx, NULL);
4614 			trx->error_state = DB_SUCCESS;
4615 			goto funct_exit;
4616 		}
4617 
4618 		/* Fill the virtual column set in foreign when
4619 		the table undergoes copy alter operation. */
4620 		dict_mem_table_free_foreign_vcol_set(table);
4621 		dict_mem_table_fill_foreign_vcol_set(table);
4622 
4623 		while (!fk_tables.empty()) {
4624 			dict_load_table(fk_tables.front(),
4625 					DICT_ERR_IGNORE_NONE);
4626 			fk_tables.pop_front();
4627 		}
4628 
4629 		table->data_dir_path= NULL;
4630 	}
4631 
4632 funct_exit:
4633 	if (aux_fts_rename && err != DB_SUCCESS
4634 	    && table != NULL && (table->space != 0)) {
4635 
4636 		char*	orig_name = table->name.m_name;
4637 		trx_t*	trx_bg = trx_create();
4638 
4639 		/* If the first fts_rename fails, the trx would
4640 		be rolled back and committed, we can't use it any more,
4641 		so we have to start a new background trx here. */
4642 		ut_a(trx_state_eq(trx_bg, TRX_STATE_NOT_STARTED));
4643 		trx_bg->op_info = "Revert the failing rename "
4644 				  "for fts aux tables";
4645 		trx_bg->dict_operation_lock_mode = RW_X_LATCH;
4646 		trx_start_for_ddl(trx_bg, TRX_DICT_OP_TABLE);
4647 
4648 		/* If rename fails and table has its own tablespace,
4649 		we need to call fts_rename_aux_tables again to
4650 		revert the ibd file rename, which is not under the
4651 		control of trx. Also notice the parent table name
4652 		in cache is not changed yet. If the reverting fails,
4653 		the ibd data may be left in the new database, which
4654 		can be fixed only manually. */
4655 		table->name.m_name = const_cast<char*>(new_name);
4656 		fts_rename_aux_tables(table, old_name, trx_bg);
4657 		table->name.m_name = orig_name;
4658 
4659 		trx_bg->dict_operation_lock_mode = 0;
4660 		trx_commit_for_mysql(trx_bg);
4661 		trx_bg->free();
4662 	}
4663 
4664 	if (table != NULL) {
4665 		dict_table_close(table, dict_locked, FALSE);
4666 	}
4667 
4668 	if (commit) {
4669 		DEBUG_SYNC(trx->mysql_thd, "before_rename_table_commit");
4670 		trx_commit_for_mysql(trx);
4671 	}
4672 
4673 	if (UNIV_LIKELY_NULL(heap)) {
4674 		mem_heap_free(heap);
4675 	}
4676 
4677 	trx->op_info = "";
4678 
4679 	return(err);
4680 }
4681 
4682 /*********************************************************************//**
4683 Scans an index for either COUNT(*) or CHECK TABLE.
4684 If CHECK TABLE; Checks that the index contains entries in an ascending order,
4685 unique constraint is not broken, and calculates the number of index entries
4686 in the read view of the current transaction.
4687 @return DB_SUCCESS or other error */
4688 dberr_t
row_scan_index_for_mysql(row_prebuilt_t * prebuilt,const dict_index_t * index,ulint * n_rows)4689 row_scan_index_for_mysql(
4690 /*=====================*/
4691 	row_prebuilt_t*		prebuilt,	/*!< in: prebuilt struct
4692 						in MySQL handle */
4693 	const dict_index_t*	index,		/*!< in: index */
4694 	ulint*			n_rows)		/*!< out: number of entries
4695 						seen in the consistent read */
4696 {
4697 	dtuple_t*	prev_entry	= NULL;
4698 	ulint		matched_fields;
4699 	byte*		buf;
4700 	dberr_t		ret;
4701 	rec_t*		rec;
4702 	int		cmp;
4703 	ibool		contains_null;
4704 	ulint		i;
4705 	ulint		cnt;
4706 	mem_heap_t*	heap		= NULL;
4707 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
4708 	rec_offs*	offsets;
4709 	rec_offs_init(offsets_);
4710 
4711 	*n_rows = 0;
4712 
4713 	/* Don't support RTree Leaf level scan */
4714 	ut_ad(!dict_index_is_spatial(index));
4715 
4716 	if (dict_index_is_clust(index)) {
4717 		/* The clustered index of a table is always available.
4718 		During online ALTER TABLE that rebuilds the table, the
4719 		clustered index in the old table will have
4720 		index->online_log pointing to the new table. All
4721 		indexes of the old table will remain valid and the new
4722 		table will be unaccessible to MySQL until the
4723 		completion of the ALTER TABLE. */
4724 	} else if (dict_index_is_online_ddl(index)
4725 		   || (index->type & DICT_FTS)) {
4726 		/* Full Text index are implemented by auxiliary tables,
4727 		not the B-tree. We also skip secondary indexes that are
4728 		being created online. */
4729 		return(DB_SUCCESS);
4730 	}
4731 
4732 	ulint bufsize = std::max<ulint>(srv_page_size,
4733 					prebuilt->mysql_row_len);
4734 	buf = static_cast<byte*>(ut_malloc_nokey(bufsize));
4735 	heap = mem_heap_create(100);
4736 
4737 	cnt = 1000;
4738 
4739 	ret = row_search_for_mysql(buf, PAGE_CUR_G, prebuilt, 0, 0);
4740 loop:
4741 	/* Check thd->killed every 1,000 scanned rows */
4742 	if (--cnt == 0) {
4743 		if (trx_is_interrupted(prebuilt->trx)) {
4744 			ret = DB_INTERRUPTED;
4745 			goto func_exit;
4746 		}
4747 		cnt = 1000;
4748 	}
4749 
4750 	switch (ret) {
4751 	case DB_SUCCESS:
4752 		break;
4753 	case DB_DEADLOCK:
4754 	case DB_LOCK_TABLE_FULL:
4755 	case DB_LOCK_WAIT_TIMEOUT:
4756 	case DB_INTERRUPTED:
4757 		goto func_exit;
4758 	default:
4759 		ib::warn() << "CHECK TABLE on index " << index->name << " of"
4760 			" table " << index->table->name << " returned " << ret;
4761 		/* (this error is ignored by CHECK TABLE) */
4762 		/* fall through */
4763 	case DB_END_OF_INDEX:
4764 		ret = DB_SUCCESS;
4765 func_exit:
4766 		ut_free(buf);
4767 		mem_heap_free(heap);
4768 
4769 		return(ret);
4770 	}
4771 
4772 	*n_rows = *n_rows + 1;
4773 
4774 	/* else this code is doing handler::check() for CHECK TABLE */
4775 
4776 	/* row_search... returns the index record in buf, record origin offset
4777 	within buf stored in the first 4 bytes, because we have built a dummy
4778 	template */
4779 
4780 	rec = buf + mach_read_from_4(buf);
4781 
4782 	offsets = rec_get_offsets(rec, index, offsets_, index->n_core_fields,
4783 				  ULINT_UNDEFINED, &heap);
4784 
4785 	if (prev_entry != NULL) {
4786 		matched_fields = 0;
4787 
4788 		cmp = cmp_dtuple_rec_with_match(prev_entry, rec, offsets,
4789 						&matched_fields);
4790 		contains_null = FALSE;
4791 
4792 		/* In a unique secondary index we allow equal key values if
4793 		they contain SQL NULLs */
4794 
4795 		for (i = 0;
4796 		     i < dict_index_get_n_ordering_defined_by_user(index);
4797 		     i++) {
4798 			if (UNIV_SQL_NULL == dfield_get_len(
4799 				    dtuple_get_nth_field(prev_entry, i))) {
4800 
4801 				contains_null = TRUE;
4802 				break;
4803 			}
4804 		}
4805 
4806 		const char* msg;
4807 
4808 		if (cmp > 0) {
4809 			ret = DB_INDEX_CORRUPT;
4810 			msg = "index records in a wrong order in ";
4811 not_ok:
4812 			ib::error()
4813 				<< msg << index->name
4814 				<< " of table " << index->table->name
4815 				<< ": " << *prev_entry << ", "
4816 				<< rec_offsets_print(rec, offsets);
4817 			/* Continue reading */
4818 		} else if (dict_index_is_unique(index)
4819 			   && !contains_null
4820 			   && matched_fields
4821 			   >= dict_index_get_n_ordering_defined_by_user(
4822 				   index)) {
4823 			ret = DB_DUPLICATE_KEY;
4824 			msg = "duplicate key in ";
4825 			goto not_ok;
4826 		}
4827 	}
4828 
4829 	{
4830 		mem_heap_t*	tmp_heap = NULL;
4831 
4832 		/* Empty the heap on each round.  But preserve offsets[]
4833 		for the row_rec_to_index_entry() call, by copying them
4834 		into a separate memory heap when needed. */
4835 		if (UNIV_UNLIKELY(offsets != offsets_)) {
4836 			ulint	size = rec_offs_get_n_alloc(offsets)
4837 				* sizeof *offsets;
4838 
4839 			tmp_heap = mem_heap_create(size);
4840 
4841 			offsets = static_cast<rec_offs*>(
4842 				mem_heap_dup(tmp_heap, offsets, size));
4843 		}
4844 
4845 		mem_heap_empty(heap);
4846 
4847 		prev_entry = row_rec_to_index_entry(
4848 			rec, index, offsets, heap);
4849 
4850 		if (UNIV_LIKELY_NULL(tmp_heap)) {
4851 			mem_heap_free(tmp_heap);
4852 		}
4853 	}
4854 
4855 	ret = row_search_for_mysql(
4856 		buf, PAGE_CUR_G, prebuilt, 0, ROW_SEL_NEXT);
4857 
4858 	goto loop;
4859 }
4860 
4861 /*********************************************************************//**
4862 Initialize this module */
4863 void
row_mysql_init(void)4864 row_mysql_init(void)
4865 /*================*/
4866 {
4867 	mutex_create(LATCH_ID_ROW_DROP_LIST, &row_drop_list_mutex);
4868 
4869 	UT_LIST_INIT(
4870 		row_mysql_drop_list,
4871 		&row_mysql_drop_t::row_mysql_drop_list);
4872 
4873 	row_mysql_drop_list_inited = true;
4874 }
4875 
row_mysql_close()4876 void row_mysql_close()
4877 {
4878   ut_ad(!UT_LIST_GET_LEN(row_mysql_drop_list) ||
4879         srv_force_recovery >= SRV_FORCE_NO_BACKGROUND);
4880   if (row_mysql_drop_list_inited)
4881   {
4882     row_mysql_drop_list_inited= false;
4883     mutex_free(&row_drop_list_mutex);
4884 
4885     while (row_mysql_drop_t *drop= UT_LIST_GET_FIRST(row_mysql_drop_list))
4886     {
4887       UT_LIST_REMOVE(row_mysql_drop_list, drop);
4888       ut_free(drop);
4889     }
4890   }
4891 }
4892